From f3c1b39862d0d66d6e429644c20cb174bc0cd292 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 22 Jun 2026 18:13:48 +0200 Subject: [PATCH 1/4] fix(replication): handle inbound overload backpressure --- src/replication/audit.rs | 68 +- src/replication/bootstrap.rs | 83 +- src/replication/config.rs | 10 + src/replication/mod.rs | 1350 +++++++++++++++++++++++++----- src/replication/neighbor_sync.rs | 80 +- src/replication/protocol.rs | 42 + src/replication/pruning.rs | 35 + src/replication/quorum.rs | 28 +- src/replication/scheduling.rs | 65 +- src/replication/types.rs | 29 + tests/poc_d1_bounded_queues.rs | 1 + 11 files changed, 1547 insertions(+), 244 deletions(-) diff --git a/src/replication/audit.rs b/src/replication/audit.rs index 3bbeaff9..af56db9c 100644 --- a/src/replication/audit.rs +++ b/src/replication/audit.rs @@ -19,6 +19,7 @@ use crate::replication::protocol::{ use crate::replication::types::{ AuditFailureReason, AuditFailureSummary, FailureEvidence, PeerSyncRecord, RepairProofs, }; +use crate::replication::{is_inbound_replication_overloaded_reason, OverloadClaimTracker}; use crate::storage::LmdbStorage; use saorsa_core::identity::PeerId; use saorsa_core::P2PNode; @@ -86,20 +87,55 @@ pub async fn audit_tick( .await } +/// Execute one repair-proof-gated audit tick with a fresh overload-claim +/// tracker. +/// +/// This preserves the public helper signature that existed before inbound +/// overload accounting was added. The replication engine uses the crate-private +/// tracker-aware helper so overload claim budgets persist across audit ticks. +#[allow(clippy::implicit_hasher, clippy::too_many_lines)] +pub async fn audit_tick_with_repair_proofs( + p2p_node: &Arc, + storage: &Arc, + config: &ReplicationConfig, + sync_history: &HashMap, + repair_proofs: &Arc>, + current_sync_epoch: u64, + is_bootstrapping: bool, +) -> AuditTickResult { + let overload_tracker = Arc::new(OverloadClaimTracker::new()); + audit_tick_with_repair_proofs_and_overload_tracker( + p2p_node, + storage, + config, + sync_history, + repair_proofs, + current_sync_epoch, + &overload_tracker, + is_bootstrapping, + ) + .await +} + /// Execute one repair-proof-gated audit tick. /// /// This is the production path used by the replication engine. The /// compatibility [`audit_tick`] wrapper passes an empty proof table, so direct /// callers that have not adopted repair proofs remain conservative and do not /// audit peers for unproven keys. -#[allow(clippy::implicit_hasher, clippy::too_many_lines)] -pub async fn audit_tick_with_repair_proofs( +#[allow( + clippy::implicit_hasher, + clippy::too_many_lines, + clippy::too_many_arguments +)] +pub(crate) async fn audit_tick_with_repair_proofs_and_overload_tracker( p2p_node: &Arc, storage: &Arc, config: &ReplicationConfig, sync_history: &HashMap, repair_proofs: &Arc>, current_sync_epoch: u64, + overload_tracker: &Arc, is_bootstrapping: bool, ) -> AuditTickResult { // Invariant 19: never audit while still bootstrapping. @@ -264,6 +300,8 @@ pub async fn audit_tick_with_repair_proofs( ) .await; } + // A cooperative response clears any prior overload streak. + overload_tracker.record_normal_response(&challenged_peer); // Step 7b: Bootstrapping claim. AuditTickResult::BootstrapClaim { peer: challenged_peer, @@ -285,6 +323,8 @@ pub async fn audit_tick_with_repair_proofs( ) .await; } + // A cooperative response clears any prior overload streak. + overload_tracker.record_normal_response(&challenged_peer); verify_digests( &challenged_peer, challenge_id, @@ -313,6 +353,30 @@ pub async fn audit_tick_with_repair_proofs( ) .await; } + if is_inbound_replication_overloaded_reason(&reason) { + // Honor a brief genuine overload, but do not let a peer dodge + // the audit indefinitely by always claiming to be overloaded. + if overload_tracker.honor_overload_claim(&challenged_peer) { + debug!( + "Audit: challenge deferred by overloaded peer {challenged_peer}: {reason}" + ); + return AuditTickResult::Idle; + } + warn!( + "Audit: peer {challenged_peer} exceeded overload-claim budget; treating challenge rejection as a failure" + ); + return handle_audit_failure( + &challenged_peer, + challenge_id, + &peer_keys, + AuditFailureReason::Rejected, + p2p_node, + config, + ) + .await; + } + // A genuine (non-overload) rejection clears any prior overload streak. + overload_tracker.record_normal_response(&challenged_peer); warn!("Audit: challenge rejected by {challenged_peer}: {reason}"); handle_audit_failure( &challenged_peer, diff --git a/src/replication/bootstrap.rs b/src/replication/bootstrap.rs index a4ea7026..8c58e342 100644 --- a/src/replication/bootstrap.rs +++ b/src/replication/bootstrap.rs @@ -11,6 +11,7 @@ use crate::logging::{debug, info, warn}; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; +use saorsa_core::identity::PeerId; use saorsa_core::DhtNetworkEvent; use crate::ant_protocol::XorName; @@ -134,6 +135,12 @@ pub async fn check_bootstrap_drained( return false; } + if !state.overload_deferred_sync_peers.is_empty() { + let n = state.overload_deferred_sync_peers.len(); + debug!("Bootstrap NOT drained: {n} peer(s) have overload-deferred sync retries"); + return false; + } + if queues.is_bootstrap_work_empty(&state.pending_keys) { state.drained = true; info!("Bootstrap drained: all peer requests completed and work queues empty"); @@ -150,10 +157,7 @@ pub async fn check_bootstrap_drained( /// [`clear_capacity_rejected`] when the same source's next admission cycle /// completes with zero rejections (i.e. the source successfully /// re-delivered everything that previously overflowed). -pub async fn note_capacity_rejected( - bootstrap_state: &Arc>, - source: saorsa_core::identity::PeerId, -) { +pub async fn note_capacity_rejected(bootstrap_state: &Arc>, source: PeerId) { let mut state = bootstrap_state.write().await; if state.capacity_rejected_sources.insert(source) { let n = state.capacity_rejected_sources.len(); @@ -172,8 +176,8 @@ pub async fn note_capacity_rejected( /// drained" is retired. No-op if the source had no outstanding rejections. pub async fn clear_capacity_rejected( bootstrap_state: &Arc>, - source: &saorsa_core::identity::PeerId, -) { + source: &PeerId, +) -> bool { let mut state = bootstrap_state.write().await; if state.capacity_rejected_sources.remove(source) { let n = state.capacity_rejected_sources.len(); @@ -181,7 +185,48 @@ pub async fn clear_capacity_rejected( "Bootstrap: cleared outstanding capacity rejections for {source} \ ({n} sources still outstanding)" ); + return true; } + false +} + +/// Record that `peer` reported overload during the initial bootstrap sync. +/// +/// The peer has been queued for a later priority neighbor-sync retry. Bootstrap +/// cannot drain until that retry completes, fails, or the peer leaves the local +/// close-neighbor set. +pub async fn note_overload_deferred_sync( + bootstrap_state: &Arc>, + peer: PeerId, +) { + let mut state = bootstrap_state.write().await; + if state.overload_deferred_sync_peers.insert(peer) { + let n = state.overload_deferred_sync_peers.len(); + debug!( + "Bootstrap: peer {peer} has an overload-deferred sync retry \ + ({n} peers outstanding)" + ); + } +} + +/// Clear an outstanding overload-deferred bootstrap sync for `peer`. +/// +/// Returns `true` when this call removed a blocker and the caller should re-run +/// the bootstrap drain check. +pub async fn clear_overload_deferred_sync( + bootstrap_state: &Arc>, + peer: &PeerId, +) -> bool { + let mut state = bootstrap_state.write().await; + if state.overload_deferred_sync_peers.remove(peer) { + let n = state.overload_deferred_sync_peers.len(); + debug!( + "Bootstrap: cleared overload-deferred sync retry for {peer} \ + ({n} peers still outstanding)" + ); + return true; + } + false } /// Record a set of discovered keys into the bootstrap state for drain tracking. @@ -247,6 +292,7 @@ mod tests { pending_peer_requests: 5, pending_keys: HashSet::new(), capacity_rejected_sources: HashSet::new(), + overload_deferred_sync_peers: HashSet::new(), })); let queues = ReplicationQueues::new(); @@ -263,6 +309,7 @@ mod tests { pending_peer_requests: 2, pending_keys: HashSet::new(), capacity_rejected_sources: HashSet::new(), + overload_deferred_sync_peers: HashSet::new(), })); let queues = ReplicationQueues::new(); @@ -279,6 +326,7 @@ mod tests { pending_peer_requests: 0, pending_keys: std::iter::once(xor_name_from_byte(0x01)).collect(), capacity_rejected_sources: HashSet::new(), + overload_deferred_sync_peers: HashSet::new(), })); let queues = ReplicationQueues::new(); @@ -294,6 +342,7 @@ mod tests { pending_peer_requests: 0, pending_keys: std::iter::once(xor_name_from_byte(0x01)).collect(), capacity_rejected_sources: HashSet::new(), + overload_deferred_sync_peers: HashSet::new(), })); let mut queues = ReplicationQueues::new(); @@ -305,6 +354,7 @@ mod tests { tried_sources: HashSet::new(), created_at: Instant::now(), hint_sender: saorsa_core::identity::PeerId::from_bytes([0u8; 32]), + inconclusive_rounds: 0, }; queues.add_pending_verify(xor_name_from_byte(0x01), entry); @@ -406,4 +456,25 @@ mod tests { clear_capacity_rejected(&state, &source_b).await; assert!(check_bootstrap_drained(&state, &queues).await); } + + #[tokio::test] + async fn overload_deferred_sync_blocks_until_cleared() { + const TEST_PEER_BYTE: u8 = 0xCC; + + let state = Arc::new(RwLock::new(BootstrapState::new())); + let queues = ReplicationQueues::new(); + let peer = PeerId::from_bytes([TEST_PEER_BYTE; 32]); + + note_overload_deferred_sync(&state, peer).await; + assert!( + !check_bootstrap_drained(&state, &queues).await, + "overload-deferred sync must block bootstrap drain" + ); + + assert!(clear_overload_deferred_sync(&state, &peer).await); + assert!( + check_bootstrap_drained(&state, &queues).await, + "bootstrap should drain once the overload-deferred retry is cleared" + ); + } } diff --git a/src/replication/config.rs b/src/replication/config.rs index b3cd9441..4d5f7cbe 100644 --- a/src/replication/config.rs +++ b/src/replication/config.rs @@ -166,6 +166,16 @@ const PENDING_VERIFY_MAX_AGE_SECS: u64 = 30 * 60; /// Maximum age for pending-verification entries before stale eviction. pub const PENDING_VERIFY_MAX_AGE: Duration = Duration::from_secs(PENDING_VERIFY_MAX_AGE_SECS); +/// Maximum consecutive `QuorumInconclusive` rounds a pending key is retried +/// before it is abandoned. +/// +/// A backstop complementing the age-based [`PENDING_VERIFY_MAX_AGE`] eviction: +/// it bounds wasted re-verification (and per-source pending-slot occupancy) for +/// keys whose targets never converge — persistently unreachable peers, or peers +/// that keep replying `Overloaded`, which the network-verify path treats as +/// neutral and otherwise has no per-peer overload-claim budget to bound. +pub const MAX_INCONCLUSIVE_VERIFY_ROUNDS: u32 = 10; + /// Trust event weight for confirmed audit failures. pub const AUDIT_FAILURE_TRUST_WEIGHT: f64 = 5.0; diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 7f65472f..c90e2cea 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -28,6 +28,7 @@ pub mod scheduling; pub mod types; use std::collections::{HashMap, HashSet}; +use std::num::NonZeroUsize; use std::path::Path; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -37,9 +38,11 @@ use std::pin::Pin; use crate::logging::{debug, error, info, warn}; use futures::stream::FuturesUnordered; use futures::{Future, StreamExt}; +use lru::LruCache; +use parking_lot::Mutex; use rand::Rng; -use tokio::sync::{mpsc, Notify, RwLock, Semaphore}; -use tokio::task::JoinHandle; +use tokio::sync::{mpsc, Notify, RwLock, Semaphore, TryAcquireError}; +use tokio::task::{JoinError, JoinHandle, JoinSet}; use tokio_util::sync::CancellationToken; use crate::ant_protocol::XorName; @@ -52,8 +55,8 @@ use crate::replication::config::{ }; use crate::replication::paid_list::PaidList; use crate::replication::protocol::{ - FreshReplicationResponse, NeighborSyncResponse, ReplicationMessage, ReplicationMessageBody, - VerificationResponse, + FreshReplicationResponse, NeighborSyncResponse, OverloadedNotice, ReplicationMessage, + ReplicationMessageBody, VerificationResponse, }; use crate::replication::quorum::KeyVerificationOutcome; use crate::replication::scheduling::ReplicationQueues; @@ -72,6 +75,122 @@ use saorsa_core::{DhtNetworkEvent, P2PEvent, P2PNode, TrustEvent}; /// Prefix used by saorsa-core's request-response mechanism. const RR_PREFIX: &str = "/rr/"; +/// Maximum inbound replication request handlers allowed to run in parallel. +/// +/// Handlers can perform LMDB scans, full-chunk reads, payment verification, and +/// audit digest generation. Running them behind the receive loop creates +/// head-of-line blocking where one slow request can make unrelated +/// request-response callers time out. +const MAX_CONCURRENT_INBOUND_REPLICATION_REQUESTS: usize = 16; + +/// Maximum inbound replication request handlers allowed from one peer. +/// +/// A single peer can issue several offers for the same key concurrently within +/// this budget. That is safe and cheap: chunk storage is idempotent, and the +/// expensive part of payment verification (the on-chain pool closeness/payment +/// lookup) is single-flighted per pool inside [`PaymentVerifier`], so duplicate +/// concurrent offers do not duplicate on-chain work. +/// +/// Kept strictly below half of [`MAX_CONCURRENT_INBOUND_REPLICATION_REQUESTS`] +/// so no single peer can occupy the majority of global handler capacity: +/// saturating the receiver now takes at least three distinct peers, preserving +/// fairness for the close group under a one-peer burst. +const MAX_CONCURRENT_INBOUND_REPLICATION_REQUESTS_PER_PEER: usize = 6; + +// Enforce the fairness invariant documented above at compile time so a future +// tuning change cannot silently let one peer monopolize global capacity. +const _: () = assert!( + MAX_CONCURRENT_INBOUND_REPLICATION_REQUESTS_PER_PEER * 2 + < MAX_CONCURRENT_INBOUND_REPLICATION_REQUESTS, + "per-peer inbound cap must stay below half the global cap" +); + +/// Maximum overload response tasks tracked separately from accepted handlers. +const MAX_CONCURRENT_INBOUND_OVERLOAD_RESPONSES: usize = 32; + +/// Reason sent to request-response callers when the global inbound cap is hit. +const INBOUND_REPLICATION_OVERLOADED_REASON: &str = "inbound replication handler capacity exceeded"; + +/// Reason sent to request-response callers when their peer-specific cap is hit. +const INBOUND_REPLICATION_PEER_OVERLOADED_REASON: &str = + "inbound replication per-peer handler capacity exceeded"; + +fn is_inbound_replication_overloaded_reason(reason: &str) -> bool { + reason == INBOUND_REPLICATION_OVERLOADED_REASON + || reason == INBOUND_REPLICATION_PEER_OVERLOADED_REASON +} + +/// Maximum consecutive "overloaded" responses honored from a single peer before +/// the claim is treated as a normal failure. +/// +/// The overload reason is an unauthenticated string in a peer's response, so a +/// misbehaving peer could otherwise return it indefinitely to dodge audit +/// failures and fetch trust penalties. Honoring a short run of consecutive +/// claims still exempts a peer under a brief genuine load spike; beyond that the +/// peer is held accountable. The counter resets on any non-overload response on +/// the same path. +/// +/// The audit and fetch paths each keep their own [`OverloadClaimTracker`], so +/// this budget is enforced independently per path. The trackers are +/// deliberately *not* shared: `record_normal_response` clears a peer's entire +/// streak, so a shared tracker would let a peer keep its audit-overload streak +/// permanently fresh simply by returning ordinary (non-overload) responses on +/// the fetch path — defeating the audit-dodge bound this constant exists to +/// enforce. +const MAX_CONSECUTIVE_OVERLOAD_CLAIMS: u32 = 5; + +/// Capacity of the per-peer overload-claim tracker. Bounds memory under peer +/// churn; evicting a stale peer at worst restarts its (already near-exhausted) +/// budget, which is harmless. +const OVERLOAD_CLAIM_TRACKER_CAPACITY: usize = 1024; + +/// Per-peer budget that bounds how often a peer's "overloaded" response is +/// honored before it is treated as a normal failure. +/// +/// The audit and fetch paths each own a separate instance (behind an `Arc`): +/// they are the request initiators that would otherwise grant an +/// unauthenticated, indefinite accountability exemption to any peer that simply +/// echoes the overload reason. They are kept separate rather than shared so a +/// non-overload response on one path cannot reset the streak that bounds claims +/// on the other (see [`MAX_CONSECUTIVE_OVERLOAD_CLAIMS`]). The prune-audit path +/// does not use this tracker: an overload there only makes us keep our own +/// replica, which has no evasion value. +pub(crate) struct OverloadClaimTracker { + /// Consecutive overload claims per peer (reset on any normal response). + consecutive: Mutex>, +} + +impl OverloadClaimTracker { + pub(crate) fn new() -> Self { + let capacity = + NonZeroUsize::new(OVERLOAD_CLAIM_TRACKER_CAPACITY).unwrap_or(NonZeroUsize::MIN); + Self { + consecutive: Mutex::new(LruCache::new(capacity)), + } + } + + /// Record an overload claim from `peer` and report whether to honor it. + /// + /// Returns `true` while the peer is within its consecutive-claim budget + /// (treat the overload as a neutral deferral), `false` once the budget is + /// exceeded (treat it as a normal failure). + pub(crate) fn honor_overload_claim(&self, peer: &PeerId) -> bool { + let mut map = self.consecutive.lock(); + // `get` promotes the peer's recency so active claimers are not evicted + // mid-streak; an absent peer starts a fresh streak at 1. + let count = map.get(peer).copied().unwrap_or(0).saturating_add(1); + map.put(*peer, count); + count <= MAX_CONSECUTIVE_OVERLOAD_CLAIMS + } + + /// Reset a peer's consecutive-overload counter after any non-overload + /// response, so an occasional genuine overload never accumulates toward the + /// budget. + pub(crate) fn record_normal_response(&self, peer: &PeerId) { + self.consecutive.lock().pop(peer); + } +} + fn fresh_offer_payment_context() -> VerificationContext { VerificationContext::ClientPut } @@ -95,9 +214,190 @@ struct VerificationCycleContext<'a> { bootstrap_complete_notify: &'a Arc, } +#[derive(Clone)] +struct InboundReplicationHandlerContext { + p2p_node: Arc, + storage: Arc, + paid_list: Arc, + payment_verifier: Arc, + queues: Arc>, + config: Arc, + is_bootstrapping: Arc>, + bootstrap_state: Arc>, + sync_history: Arc>>, + sync_cycle_epoch: Arc>, + repair_proofs: Arc>, +} + +impl InboundReplicationHandlerContext { + async fn handle(&self, source: PeerId, payload: Vec, rr_message_id: Option) { + if let Err(e) = handle_replication_message( + &source, + &payload, + &self.p2p_node, + &self.storage, + &self.paid_list, + &self.payment_verifier, + &self.queues, + &self.config, + &self.is_bootstrapping, + &self.bootstrap_state, + &self.sync_history, + &self.sync_cycle_epoch, + &self.repair_proofs, + rr_message_id.as_deref(), + ) + .await + { + debug!("Replication message from {source} error: {e}"); + } + } + + async fn send_overloaded_response( + &self, + source: PeerId, + payload: Vec, + rr_message_id: String, + reason: &'static str, + ) { + let msg = match ReplicationMessage::decode(&payload) { + Ok(msg) => msg, + Err(e) => { + debug!("Failed to decode overloaded replication request from {source}: {e}"); + return; + } + }; + let Some(body) = overloaded_response_body(&msg.body, reason) else { + return; + }; + send_replication_response( + &source, + &self.p2p_node, + msg.request_id, + body, + Some(&rr_message_id), + ) + .await; + } +} + +fn overloaded_response_body( + request: &ReplicationMessageBody, + reason: &str, +) -> Option { + match request { + // Defensive: fresh offers are normally sent fire-and-forget over the + // gossip topic (no `rr_message_id`), so `spawn_inbound_overloaded_response` + // drops them before reaching here and this arm is not exercised in + // practice. It is kept so that an offer arriving over request-response + // would still receive a typed rejection rather than time out. + ReplicationMessageBody::FreshReplicationOffer(offer) => Some( + ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected { + key: offer.key, + reason: reason.to_string(), + }), + ), + ReplicationMessageBody::FetchRequest(request) => Some( + ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Error { + key: request.key, + reason: reason.to_string(), + }), + ), + ReplicationMessageBody::AuditChallenge(challenge) => Some( + ReplicationMessageBody::AuditResponse(protocol::AuditResponse::Rejected { + challenge_id: challenge.challenge_id, + reason: reason.to_string(), + }), + ), + // Verification and neighbor sync have no typed neutral rejection that + // carries overload. Empty success-shaped responses would be misread as + // real work, while no response would be charged as a timeout. A + // dedicated notice lets callers classify overload as a neutral deferral. + ReplicationMessageBody::VerificationRequest(_) + | ReplicationMessageBody::NeighborSyncRequest(_) => { + Some(ReplicationMessageBody::Overloaded(OverloadedNotice { + reason: reason.to_string(), + })) + } + ReplicationMessageBody::PaidNotify(_) + | ReplicationMessageBody::FreshReplicationResponse(_) + | ReplicationMessageBody::NeighborSyncResponse(_) + | ReplicationMessageBody::VerificationResponse(_) + | ReplicationMessageBody::FetchResponse(_) + | ReplicationMessageBody::AuditResponse(_) + | ReplicationMessageBody::Overloaded(_) => None, + } +} + +fn spawn_inbound_overloaded_response( + overload_responses: &mut JoinSet<()>, + overload_semaphore: &Arc, + context: &InboundReplicationHandlerContext, + source: PeerId, + payload: Vec, + rr_message_id: Option, + reason: &'static str, +) { + let Some(rr_message_id) = rr_message_id else { + // Gossip-topic requests (fresh offers, paid notifies) are + // fire-and-forget: there is no reply channel to carry an overload + // rejection, so the request is simply dropped under load. Log it so the + // drop is observable rather than silent; the sender re-offers later. + debug!( + "Dropping fire-and-forget replication request from {source} under overload ({reason})" + ); + return; + }; + // A semaphore (not `JoinSet::len()`) bounds the *in-flight* responses: + // `len()` also counts finished-but-unjoined tasks, which would otherwise + // refuse new responses while the actual in-flight count is low. + let Ok(permit) = Arc::clone(overload_semaphore).try_acquire_owned() else { + debug!("Dropping overload response to {source}: overload response task limit reached"); + return; + }; + + let context = context.clone(); + overload_responses.spawn(async move { + let _permit = permit; + context + .send_overloaded_response(source, payload, rr_message_id, reason) + .await; + }); +} + +fn prune_idle_inbound_peer_limiters(peer_limiters: &mut HashMap>) { + // An outstanding owned permit holds an `Arc` clone of its limiter, so + // `strong_count > 1` exactly means "this peer still has a handler in + // flight". Removing only when the count is back to 1 is therefore safe: an + // entry is never dropped while a permit is checked out. + peer_limiters.retain(|_, limiter| Arc::strong_count(limiter) > 1); +} + +fn has_priority_sync_work(state: &NeighborSyncState) -> bool { + !state.priority_order.is_empty() +} + +fn log_inbound_task_result(result: std::result::Result<(), JoinError>, task_label: &str) { + match result { + Ok(()) => {} + Err(e) if e.is_cancelled() => {} + Err(e) => warn!("{task_label} panicked: {e}"), + } +} + +async fn abort_and_drain_inbound_tasks(tasks: &mut JoinSet<()>, task_label: &str) { + tasks.abort_all(); + while let Some(result) = tasks.join_next().await { + log_inbound_task_result(result, task_label); + } +} + /// Fetch worker polling interval in milliseconds. const FETCH_WORKER_POLL_MS: u64 = 100; +/// Delay before retrying a fetch source that explicitly reported overload. +const FETCH_OVERLOAD_RETRY_DELAY_MS: u64 = 250; + /// Verification worker polling interval in milliseconds. const VERIFICATION_WORKER_POLL_MS: u64 = 250; @@ -155,6 +455,15 @@ pub struct ReplicationEngine { /// Limits concurrent outbound replication sends to prevent bandwidth /// saturation on home broadband connections. send_semaphore: Arc, + /// Bounds repeated "overloaded" responses from a peer on the audit path so + /// the unauthenticated overload reason cannot be used to indefinitely dodge + /// audit accountability. Kept separate from [`Self::fetch_overload_tracker`] + /// so an ordinary fetch response cannot reset an audit-overload streak. + audit_overload_tracker: Arc, + /// Bounds repeated "overloaded" responses from a peer on the fetch path so a + /// source cannot defer a fetch indefinitely. Separate from the audit tracker + /// (see [`OverloadClaimTracker`]). + fetch_overload_tracker: Arc, /// Receiver for fresh-write events from the chunk PUT handler. /// /// When present, `start()` spawns a drainer task that calls @@ -209,6 +518,8 @@ impl ReplicationEngine { sync_trigger: Arc::new(Notify::new()), bootstrap_complete_notify: Arc::new(Notify::new()), send_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REPLICATION_SENDS)), + audit_overload_tracker: Arc::new(OverloadClaimTracker::new()), + fetch_overload_tracker: Arc::new(OverloadClaimTracker::new()), fresh_write_rx: Some(fresh_write_rx), shutdown, task_handles: Vec::new(), @@ -365,25 +676,49 @@ impl ReplicationEngine { fn start_message_handler(&mut self) { let mut p2p_events = self.p2p_node.subscribe_events(); let mut dht_events = self.p2p_node.dht_manager().subscribe_events(); - let p2p = Arc::clone(&self.p2p_node); - let storage = Arc::clone(&self.storage); - let paid_list = Arc::clone(&self.paid_list); - let payment_verifier = Arc::clone(&self.payment_verifier); - let queues = Arc::clone(&self.queues); let config = Arc::clone(&self.config); let shutdown = self.shutdown.clone(); - let is_bootstrapping = Arc::clone(&self.is_bootstrapping); - let bootstrap_state = Arc::clone(&self.bootstrap_state); - let sync_history = Arc::clone(&self.sync_history); - let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch); let repair_proofs = Arc::clone(&self.repair_proofs); let sync_trigger = Arc::clone(&self.sync_trigger); let sync_state = Arc::clone(&self.sync_state); + let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify); + let inbound_context = InboundReplicationHandlerContext { + p2p_node: Arc::clone(&self.p2p_node), + storage: Arc::clone(&self.storage), + paid_list: Arc::clone(&self.paid_list), + payment_verifier: Arc::clone(&self.payment_verifier), + queues: Arc::clone(&self.queues), + config: Arc::clone(&self.config), + is_bootstrapping: Arc::clone(&self.is_bootstrapping), + bootstrap_state: Arc::clone(&self.bootstrap_state), + sync_history: Arc::clone(&self.sync_history), + sync_cycle_epoch: Arc::clone(&self.sync_cycle_epoch), + repair_proofs: Arc::clone(&self.repair_proofs), + }; + let inbound_request_semaphore = + Arc::new(Semaphore::new(MAX_CONCURRENT_INBOUND_REPLICATION_REQUESTS)); + let inbound_overload_semaphore = + Arc::new(Semaphore::new(MAX_CONCURRENT_INBOUND_OVERLOAD_RESPONSES)); let handle = tokio::spawn(async move { + let mut inbound_handlers = JoinSet::new(); + let mut overload_responses = JoinSet::new(); + let mut inbound_peer_limiters: HashMap> = HashMap::new(); + loop { tokio::select! { () = shutdown.cancelled() => break, + result = inbound_handlers.join_next(), if !inbound_handlers.is_empty() => { + if let Some(result) = result { + log_inbound_task_result(result, "Inbound replication handler"); + prune_idle_inbound_peer_limiters(&mut inbound_peer_limiters); + } + } + result = overload_responses.join_next(), if !overload_responses.is_empty() => { + if let Some(result) = result { + log_inbound_task_result(result, "Inbound replication overload response"); + } + } event = p2p_events.recv() => { let Ok(event) = event else { continue }; if let P2PEvent::Message { @@ -396,7 +731,12 @@ impl ReplicationEngine { // and whether it arrived via the /rr/ request-response // path (which wraps payloads in RequestResponseEnvelope). let rr_info = if topic == REPLICATION_PROTOCOL_ID { - Some((data.clone(), None)) + // Move the payload — direct (gossip-topic) + // replication messages can carry full chunk + // bytes in a fresh offer, so cloning before + // admission control would copy data we may + // immediately drop when overloaded. + Some((data, None)) } else if topic.starts_with(RR_PREFIX) && &topic[RR_PREFIX.len()..] == REPLICATION_PROTOCOL_ID { @@ -407,29 +747,67 @@ impl ReplicationEngine { None }; if let Some((payload, rr_message_id)) = rr_info { - match handle_replication_message( - &source, - &payload, - &p2p, - &storage, - &paid_list, - &payment_verifier, - &queues, - &config, - &is_bootstrapping, - &bootstrap_state, - &sync_history, - &sync_cycle_epoch, - &repair_proofs, - rr_message_id.as_deref(), - ).await { - Ok(()) => {} - Err(e) => { + let global_permit = + match Arc::clone(&inbound_request_semaphore).try_acquire_owned() + { + Ok(permit) => permit, + Err(TryAcquireError::NoPermits) => { + debug!( + "Rejecting inbound replication request from {source}: global handler capacity reached" + ); + spawn_inbound_overloaded_response( + &mut overload_responses, + &inbound_overload_semaphore, + &inbound_context, + source, + payload, + rr_message_id, + INBOUND_REPLICATION_OVERLOADED_REASON, + ); + continue; + } + Err(TryAcquireError::Closed) => break, + }; + + let peer_limiter = Arc::clone( + inbound_peer_limiters + .entry(source) + .or_insert_with(|| { + Arc::new(Semaphore::new( + MAX_CONCURRENT_INBOUND_REPLICATION_REQUESTS_PER_PEER, + )) + }), + ); + let peer_permit = match Arc::clone(&peer_limiter).try_acquire_owned() { + Ok(permit) => permit, + Err(TryAcquireError::NoPermits) => { debug!( - "Replication message from {source} error: {e}" + "Rejecting inbound replication request from {source}: per-peer handler capacity reached" + ); + drop(global_permit); + spawn_inbound_overloaded_response( + &mut overload_responses, + &inbound_overload_semaphore, + &inbound_context, + source, + payload, + rr_message_id, + INBOUND_REPLICATION_PEER_OVERLOADED_REASON, ); + continue; } - } + Err(TryAcquireError::Closed) => break, + }; + let context = inbound_context.clone(); + let handler_shutdown = shutdown.clone(); + inbound_handlers.spawn(async move { + let _global_permit = global_permit; + let _peer_permit = peer_permit; + tokio::select! { + () = handler_shutdown.cancelled() => {} + () = context.handle(source, payload, rr_message_id) => {} + } + }); } } } @@ -456,6 +834,10 @@ impl ReplicationEngine { .collect::>(); let new_peers = new_scoped.iter().copied().collect::>(); + let removed_peers = old_peers + .difference(&new_peers) + .copied() + .collect::>(); let entrants = new_scoped .iter() .copied() @@ -477,17 +859,64 @@ impl ReplicationEngine { "K-closest peers changed, no additional close peers queued, pruned {sync_removals} departed pending sync entries, triggering early neighbor sync" ); } + for peer in &removed_peers { + let cleared_capacity = bootstrap::clear_capacity_rejected( + &inbound_context.bootstrap_state, + peer, + ) + .await; + let cleared_deferred = bootstrap::clear_overload_deferred_sync( + &inbound_context.bootstrap_state, + peer, + ) + .await; + if cleared_capacity || cleared_deferred { + check_and_complete_bootstrap( + &inbound_context.bootstrap_state, + &inbound_context.queues, + &inbound_context.is_bootstrapping, + &bootstrap_complete_notify, + ) + .await; + } + } sync_trigger.notify_one(); } DhtNetworkEvent::PeerRemoved { peer_id } => { sync_state.write().await.remove_peer(&peer_id); repair_proofs.write().await.remove_peer(&peer_id); + let cleared_capacity = bootstrap::clear_capacity_rejected( + &inbound_context.bootstrap_state, + &peer_id, + ) + .await; + let cleared_deferred = bootstrap::clear_overload_deferred_sync( + &inbound_context.bootstrap_state, + &peer_id, + ) + .await; + if cleared_capacity || cleared_deferred { + check_and_complete_bootstrap( + &inbound_context.bootstrap_state, + &inbound_context.queues, + &inbound_context.is_bootstrapping, + &bootstrap_complete_notify, + ) + .await; + } } _ => {} } } } } + abort_and_drain_inbound_tasks(&mut inbound_handlers, "Inbound replication handler") + .await; + abort_and_drain_inbound_tasks( + &mut overload_responses, + "Inbound replication overload response", + ) + .await; debug!("Replication message handler shut down"); }); self.task_handles.push(handle); @@ -506,16 +935,25 @@ impl ReplicationEngine { let repair_proofs = Arc::clone(&self.repair_proofs); let is_bootstrapping = Arc::clone(&self.is_bootstrapping); let bootstrap_state = Arc::clone(&self.bootstrap_state); + let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify); let sync_trigger = Arc::clone(&self.sync_trigger); let handle = tokio::spawn(async move { loop { - let interval = config.random_neighbor_sync_interval(); - tokio::select! { - () = shutdown.cancelled() => break, - () = tokio::time::sleep(interval) => {} - () = sync_trigger.notified() => { - debug!("Neighbor sync triggered by topology change"); + let has_priority_backlog = { + let state = sync_state.read().await; + has_priority_sync_work(&state) + }; + if has_priority_backlog { + debug!("Neighbor sync continuing queued priority sync backlog"); + } else { + let interval = config.random_neighbor_sync_interval(); + tokio::select! { + () = shutdown.cancelled() => break, + () = tokio::time::sleep(interval) => {} + () = sync_trigger.notified() => { + debug!("Neighbor sync triggered by topology change"); + } } } // Wrap the sync round in a select so shutdown cancels @@ -535,6 +973,7 @@ impl ReplicationEngine { &repair_proofs, &is_bootstrapping, &bootstrap_state, + &bootstrap_complete_notify, ) => {} } } @@ -576,6 +1015,7 @@ impl ReplicationEngine { let bootstrap_state = Arc::clone(&self.bootstrap_state); let is_bootstrapping = Arc::clone(&self.is_bootstrapping); let sync_state = Arc::clone(&self.sync_state); + let overload_tracker = Arc::clone(&self.audit_overload_tracker); let handle = tokio::spawn(async move { // Invariant 19: wait for bootstrap to drain before starting audits. @@ -598,13 +1038,14 @@ impl ReplicationEngine { let result = { let history = sync_history.read().await; let current_sync_epoch = *sync_cycle_epoch.read().await; - audit::audit_tick_with_repair_proofs( + audit::audit_tick_with_repair_proofs_and_overload_tracker( &p2p, &storage, &config, &history, &repair_proofs, current_sync_epoch, + &overload_tracker, bootstrapping, ) .await @@ -622,13 +1063,14 @@ impl ReplicationEngine { let result = { let history = sync_history.read().await; let current_sync_epoch = *sync_cycle_epoch.read().await; - audit::audit_tick_with_repair_proofs( + audit::audit_tick_with_repair_proofs_and_overload_tracker( &p2p, &storage, &config, &history, &repair_proofs, current_sync_epoch, + &overload_tracker, bootstrapping, ) .await @@ -652,6 +1094,7 @@ impl ReplicationEngine { let bootstrap_state = Arc::clone(&self.bootstrap_state); let is_bootstrapping = Arc::clone(&self.is_bootstrapping); let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify); + let overload_tracker = Arc::clone(&self.fetch_overload_tracker); let concurrency = max_parallel_fetch(); info!("Fetch worker concurrency set to {concurrency} (hardware threads)"); @@ -660,6 +1103,13 @@ impl ReplicationEngine { // Each in-flight future yields (key, Option) so we // always recover the key — even if the inner task panics. let mut in_flight = FuturesUnordered::::new(); + let fetch_context = FetchAttemptContext { + p2p_node: Arc::clone(&p2p), + storage: Arc::clone(&storage), + config: Arc::clone(&config), + overload_tracker: Arc::clone(&overload_tracker), + shutdown: shutdown.clone(), + }; loop { // Fill up to `concurrency` slots from the queue. @@ -678,35 +1128,12 @@ impl ReplicationEngine { }; q.start_fetch(candidate.key, source, candidate.sources.clone()); - let p2p = Arc::clone(&p2p); - let storage = Arc::clone(&storage); - let config = Arc::clone(&config); - let token = shutdown.clone(); - let fetch_key = candidate.key; - in_flight.push(Box::pin(async move { - let handle = tokio::spawn(async move { - // Cancel-aware: abort when the engine shuts down. - tokio::select! { - () = token.cancelled() => FetchOutcome { - key: fetch_key, - result: FetchResult::SourceFailed, - }, - outcome = execute_single_fetch( - p2p, storage, config, fetch_key, source, - ) => outcome, - } - }); - match handle.await { - Ok(outcome) => (outcome.key, Some(outcome)), - Err(e) => { - error!( - "Fetch task for {} panicked: {e}", - hex::encode(fetch_key) - ); - (fetch_key, None) - } - } - })); + in_flight.push(spawn_fetch_attempt( + fetch_context.clone(), + candidate.key, + source, + None, + )); } } // release queues write lock @@ -731,37 +1158,31 @@ impl ReplicationEngine { q.complete_fetch(&key); true } + FetchResult::Deferred => { + if let Some(source) = q.current_fetch_source(&key) { + in_flight.push(spawn_fetch_attempt( + fetch_context.clone(), + key, + source, + Some(Duration::from_millis( + FETCH_OVERLOAD_RETRY_DELAY_MS, + )), + )); + false + } else { + q.complete_fetch(&key); + true + } + } FetchResult::IntegrityFailed | FetchResult::SourceFailed => { if let Some(next_peer) = q.retry_fetch(&key) { // Spawn a new fetch task for the next source. - let p2p = Arc::clone(&p2p); - let storage = Arc::clone(&storage); - let config = Arc::clone(&config); - let token = shutdown.clone(); - let fetch_key = key; - in_flight.push(Box::pin(async move { - let handle = tokio::spawn(async move { - tokio::select! { - () = token.cancelled() => FetchOutcome { - key: fetch_key, - result: FetchResult::SourceFailed, - }, - outcome = execute_single_fetch( - p2p, storage, config, fetch_key, next_peer, - ) => outcome, - } - }); - match handle.await { - Ok(outcome) => (outcome.key, Some(outcome)), - Err(e) => { - error!( - "Fetch task for {} panicked: {e}", - hex::encode(fetch_key) - ); - (fetch_key, None) - } - } - })); + in_flight.push(spawn_fetch_attempt( + fetch_context.clone(), + key, + next_peer, + None, + )); false } else { q.complete_fetch(&key); @@ -871,6 +1292,8 @@ impl ReplicationEngine { let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify); let sync_cycle_epoch = Arc::clone(&self.sync_cycle_epoch); let repair_proofs = Arc::clone(&self.repair_proofs); + let sync_state = Arc::clone(&self.sync_state); + let sync_trigger = Arc::clone(&self.sync_trigger); let handle = tokio::spawn(async move { // Wait for DHT bootstrap to complete before snapshotting @@ -938,51 +1361,75 @@ impl ReplicationEngine { ) .await; - bootstrap::decrement_pending_requests(&bootstrap_state, 1).await; - - if let Some(outcome) = outcome { - if !outcome.response.bootstrapping { - record_sent_replica_hints( - peer, - &outcome.sent_replica_hints, - &repair_proofs, - &sync_cycle_epoch, - ) - .await; - // Admit hints into verification pipeline. - let outcome = admit_and_queue_hints( - &self_id, - peer, - &outcome.response.replica_hints, - &outcome.response.paid_hints, - &p2p, - &config, - &storage, - &paid_list, - &queues, - ) - .await; - - // Track discovered keys for drain detection. - if !outcome.discovered.is_empty() { - bootstrap::track_discovered_keys( - &bootstrap_state, - &outcome.discovered, + match outcome { + neighbor_sync::NeighborSyncAttempt::Completed(outcome) => { + if !outcome.response.bootstrapping { + record_sent_replica_hints( + peer, + &outcome.sent_replica_hints, + &repair_proofs, + &sync_cycle_epoch, + ) + .await; + // Admit hints into verification pipeline. + let outcome = admit_and_queue_hints( + &self_id, + peer, + &outcome.response.replica_hints, + &outcome.response.paid_hints, + &p2p, + &config, + &storage, + &paid_list, + &queues, ) .await; - } - // Record / retire capacity rejections so the - // drain check correctly reflects whether each - // source still owes us re-hinted work after - // queue overflow. - if outcome.capacity_rejected_count > 0 { - bootstrap::note_capacity_rejected(&bootstrap_state, *peer).await; - } else { - bootstrap::clear_capacity_rejected(&bootstrap_state, peer).await; + // Track discovered keys for drain detection. + if !outcome.discovered.is_empty() { + bootstrap::track_discovered_keys( + &bootstrap_state, + &outcome.discovered, + ) + .await; + } + + // Record / retire capacity rejections so the + // drain check correctly reflects whether each + // source still owes us re-hinted work after + // queue overflow. + if outcome.capacity_rejected_count > 0 { + bootstrap::note_capacity_rejected(&bootstrap_state, *peer) + .await; + } else { + bootstrap::clear_capacity_rejected(&bootstrap_state, peer) + .await; + } } } + neighbor_sync::NeighborSyncAttempt::Overloaded => { + debug!( + "Bootstrap sync with {peer} deferred because peer is overloaded" + ); + bootstrap::note_overload_deferred_sync(&bootstrap_state, *peer).await; + { + let mut state = sync_state.write().await; + let _ = state.queue_priority_peers(std::iter::once(*peer)); + } + sync_trigger.notify_one(); + } + neighbor_sync::NeighborSyncAttempt::Failed => {} } + + // Decrement only after the outcome is fully applied — + // hints admitted, discovered keys tracked, capacity + // rejections / overload deferrals recorded. Decrementing + // earlier opens a window where a concurrent drain check + // (verification cycle, sync round, or DHT-churn handler) + // sees zero pending requests while this peer's just- + // discovered work has not yet been registered, and could + // declare bootstrap drained prematurely. + bootstrap::decrement_pending_requests(&bootstrap_state, 1).await; } } @@ -1116,7 +1563,8 @@ async fn handle_replication_message( | ReplicationMessageBody::NeighborSyncResponse(_) | ReplicationMessageBody::VerificationResponse(_) | ReplicationMessageBody::FetchResponse(_) - | ReplicationMessageBody::AuditResponse(_) => Ok(()), + | ReplicationMessageBody::AuditResponse(_) + | ReplicationMessageBody::Overloaded(_) => Ok(()), } } @@ -1745,6 +2193,40 @@ async fn record_sent_replica_hints( } } +async fn check_and_complete_bootstrap( + bootstrap_state: &Arc>, + queues: &Arc>, + is_bootstrapping: &Arc>, + bootstrap_complete_notify: &Arc, +) { + let drained = bootstrap_state.read().await.is_drained(); + if drained { + return; + } + let q = queues.read().await; + if bootstrap::check_bootstrap_drained(bootstrap_state, &q).await { + complete_bootstrap(is_bootstrapping, bootstrap_complete_notify).await; + } +} + +async fn clear_overload_deferred_sync_and_check_drain( + peer: &PeerId, + bootstrap_state: &Arc>, + queues: &Arc>, + is_bootstrapping: &Arc>, + bootstrap_complete_notify: &Arc, +) { + if bootstrap::clear_overload_deferred_sync(bootstrap_state, peer).await { + check_and_complete_bootstrap( + bootstrap_state, + queues, + is_bootstrapping, + bootstrap_complete_notify, + ) + .await; + } +} + // --------------------------------------------------------------------------- // Neighbor sync round // --------------------------------------------------------------------------- @@ -1763,6 +2245,7 @@ async fn run_neighbor_sync_round( repair_proofs: &Arc>, is_bootstrapping: &Arc>, bootstrap_state: &Arc>, + bootstrap_complete_notify: &Arc, ) { let self_id = *p2p_node.peer_id(); let bootstrapping = *is_bootstrapping.read().await; @@ -1862,75 +2345,150 @@ async fn run_neighbor_sync_round( neighbor_sync::sync_with_peer_with_hints(peer, p2p_node, config, bootstrapping, hints) .await; - if let Some(outcome) = outcome { - handle_sync_response( - &self_id, - peer, - &outcome.response, - &outcome.sent_replica_hints, - p2p_node, - config, - bootstrapping, - bootstrap_state, - storage, - paid_list, - queues, - sync_state, - sync_history, - sync_cycle_epoch, - repair_proofs, - ) - .await; - } else { - // Sync failed -- remove peer and try to fill slot. - let replacement = { - let mut state = sync_state.write().await; - neighbor_sync::handle_sync_failure(&mut state, peer, config.neighbor_sync_cooldown) - }; - - // Attempt sync with the replacement peer (if one was found). - if let Some(replacement_peer) = replacement { - let mut replacement_hints = neighbor_sync::build_sync_hints_for_peers( - std::slice::from_ref(&replacement_peer), + match outcome { + neighbor_sync::NeighborSyncAttempt::Completed(outcome) => { + handle_sync_response( + &self_id, + peer, + &outcome.response, + &outcome.sent_replica_hints, + p2p_node, + config, + bootstrapping, + bootstrap_state, storage, paid_list, - p2p_node, - config.close_group_size, - config.paid_list_close_group_size, + queues, + sync_state, + sync_history, + sync_cycle_epoch, + repair_proofs, ) .await; - let hints = replacement_hints - .remove(&replacement_peer) - .unwrap_or_default(); - let replacement_outcome = neighbor_sync::sync_with_peer_with_hints( - &replacement_peer, - p2p_node, - config, - bootstrapping, - hints, + clear_overload_deferred_sync_and_check_drain( + peer, + bootstrap_state, + queues, + is_bootstrapping, + bootstrap_complete_notify, ) .await; + } + neighbor_sync::NeighborSyncAttempt::Overloaded => { + // The peer answered the deferred bootstrap retry but is still + // overloaded. Stop blocking bootstrap drain on it rather than + // re-deferring: the neighbor-sync path has no overload-claim + // budget, so a reachable peer that always reports overload would + // otherwise suspend bootstrap (and therefore audits) + // indefinitely. This matches the documented contract on + // `note_overload_deferred_sync` ("until that retry completes, + // fails, or the peer leaves"). Regular periodic sync still + // retries this peer later and will pick up any repair work. + clear_overload_deferred_sync_and_check_drain( + peer, + bootstrap_state, + queues, + is_bootstrapping, + bootstrap_complete_notify, + ) + .await; + } + neighbor_sync::NeighborSyncAttempt::Failed => { + // Sync failed -- remove peer and try to fill slot. + let replacement = { + let mut state = sync_state.write().await; + neighbor_sync::handle_sync_failure( + &mut state, + peer, + config.neighbor_sync_cooldown, + ) + }; - if let Some(outcome) = replacement_outcome { - handle_sync_response( - &self_id, + // Attempt sync with the replacement peer (if one was found). + if let Some(replacement_peer) = replacement { + let mut replacement_hints = neighbor_sync::build_sync_hints_for_peers( + std::slice::from_ref(&replacement_peer), + storage, + paid_list, + p2p_node, + config.close_group_size, + config.paid_list_close_group_size, + ) + .await; + let hints = replacement_hints + .remove(&replacement_peer) + .unwrap_or_default(); + let replacement_outcome = neighbor_sync::sync_with_peer_with_hints( &replacement_peer, - &outcome.response, - &outcome.sent_replica_hints, p2p_node, config, bootstrapping, - bootstrap_state, - storage, - paid_list, - queues, - sync_state, - sync_history, - sync_cycle_epoch, - repair_proofs, + hints, ) .await; + + match replacement_outcome { + neighbor_sync::NeighborSyncAttempt::Completed(outcome) => { + handle_sync_response( + &self_id, + &replacement_peer, + &outcome.response, + &outcome.sent_replica_hints, + p2p_node, + config, + bootstrapping, + bootstrap_state, + storage, + paid_list, + queues, + sync_state, + sync_history, + sync_cycle_epoch, + repair_proofs, + ) + .await; + clear_overload_deferred_sync_and_check_drain( + &replacement_peer, + bootstrap_state, + queues, + is_bootstrapping, + bootstrap_complete_notify, + ) + .await; + } + neighbor_sync::NeighborSyncAttempt::Overloaded => { + // See the primary-peer arm above: give up rather + // than re-deferring indefinitely on a replacement + // peer that keeps reporting overload. + clear_overload_deferred_sync_and_check_drain( + &replacement_peer, + bootstrap_state, + queues, + is_bootstrapping, + bootstrap_complete_notify, + ) + .await; + } + neighbor_sync::NeighborSyncAttempt::Failed => { + clear_overload_deferred_sync_and_check_drain( + &replacement_peer, + bootstrap_state, + queues, + is_bootstrapping, + bootstrap_complete_notify, + ) + .await; + } + } } + clear_overload_deferred_sync_and_check_drain( + peer, + bootstrap_state, + queues, + is_bootstrapping, + bootstrap_complete_notify, + ) + .await; } } } @@ -2108,6 +2666,7 @@ async fn admit_and_queue_hints( tried_sources: HashSet::new(), created_at: now, hint_sender: *source_peer, + inconclusive_rounds: 0, }, ); match result { @@ -2132,6 +2691,7 @@ async fn admit_and_queue_hints( tried_sources: HashSet::new(), created_at: now, hint_sender: *source_peer, + inconclusive_rounds: 0, }, ); match result { @@ -2179,9 +2739,19 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { // Evict stale entries that have been pending too long (e.g. unreachable // verification targets during a network partition). - { + let evicted_keys = { let mut q = queues.write().await; - q.evict_stale(config::PENDING_VERIFY_MAX_AGE); + q.evict_stale(config::PENDING_VERIFY_MAX_AGE) + }; + if !evicted_keys.is_empty() { + update_bootstrap_after_verification( + &evicted_keys, + bootstrap_state, + queues, + is_bootstrapping, + bootstrap_complete_notify, + ) + .await; } let pending_keys = { @@ -2322,7 +2892,8 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { // Step 3: Evaluate results — collect outcomes without holding the write // lock across paid-list I/O. - let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline)> = Vec::new(); + let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline, Vec)> = + Vec::new(); { let q = queues.read().await; for key in &keys_needing_network { @@ -2333,13 +2904,23 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { continue; }; let outcome = quorum::evaluate_key_evidence(key, ev, &targets, config); - evaluated.push((*key, outcome, entry.pipeline)); + // `present_sources` is only consumed by the bootstrap + // `QuorumFailed` + replica-pipeline repair branch below, so only + // pay for the per-key allocation when that branch can fire. + let present_sources = if matches!(outcome, KeyVerificationOutcome::QuorumFailed) + && entry.pipeline == HintPipeline::Replica + { + quorum::present_sources_for_key(key, ev, &targets) + } else { + Vec::new() + }; + evaluated.push((*key, outcome, entry.pipeline, present_sources)); } } // read lock released // Step 4: Insert verified keys into PaidForList (no lock held). let mut paid_insert_keys: Vec = Vec::new(); - for (key, outcome, _) in &evaluated { + for (key, outcome, _, _) in &evaluated { if matches!( outcome, KeyVerificationOutcome::QuorumVerified { .. } @@ -2359,7 +2940,7 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { // paid-only hint can safely repair a missing replica using sources // from the same verification round. let mut paid_only_fetch_keys: HashSet = HashSet::new(); - for (key, outcome, pipeline) in &evaluated { + for (key, outcome, pipeline, _) in &evaluated { if *pipeline == HintPipeline::PaidOnly && matches!( outcome, @@ -2380,8 +2961,9 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { } // Step 5: Update queues with the evaluated outcomes. + let bootstrapping = *is_bootstrapping.read().await; let mut q = queues.write().await; - for (key, outcome, pipeline) in evaluated { + for (key, outcome, pipeline, present_sources) in evaluated { match outcome { KeyVerificationOutcome::QuorumVerified { sources } | KeyVerificationOutcome::PaidListVerified { sources } => { @@ -2408,10 +2990,38 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { terminal_keys.push(key); } } - KeyVerificationOutcome::QuorumFailed - | KeyVerificationOutcome::QuorumInconclusive => { - q.remove_pending(&key); - terminal_keys.push(key); + KeyVerificationOutcome::QuorumFailed => { + if bootstrapping + && pipeline == HintPipeline::Replica + && !present_sources.is_empty() + { + // Bootstrap repair must converge even when the + // network is temporarily under-replicated. A present + // holder still has to serve content-address-valid + // bytes before the key leaves the pipeline. + let distance = + crate::client::xor_distance(&key, p2p_node.peer_id().as_bytes()); + let _ = q.promote_pending_to_fetch(key, distance, present_sources); + } else { + q.remove_pending(&key); + terminal_keys.push(key); + } + } + KeyVerificationOutcome::QuorumInconclusive => { + let rounds = q.record_inconclusive(&key); + if rounds >= config::MAX_INCONCLUSIVE_VERIFY_ROUNDS { + debug!( + "Verification for key {} still inconclusive after {rounds} rounds; abandoning", + hex::encode(key) + ); + q.remove_pending(&key); + terminal_keys.push(key); + } else { + debug!( + "Verification for key {} is inconclusive (round {rounds}); leaving pending for retry", + hex::encode(key) + ); + } } } } @@ -2491,9 +3101,14 @@ async fn complete_bootstrap( // --------------------------------------------------------------------------- /// Result classification for a single fetch attempt. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] enum FetchResult { /// Data fetched, integrity-checked, and stored successfully. Stored, + /// Source is temporarily overloaded within its overload-claim budget; keep + /// the fetch in flight and retry without exhausting sources or completing + /// bootstrap work. + Deferred, /// Content-address integrity check failed — do not retry. IntegrityFailed, /// Source failed (network error or non-success response) — retryable. @@ -2507,6 +3122,81 @@ struct FetchOutcome { result: FetchResult, } +#[derive(Clone)] +struct FetchAttemptContext { + p2p_node: Arc, + storage: Arc, + config: Arc, + overload_tracker: Arc, + shutdown: CancellationToken, +} + +fn classify_fetch_overload_response( + source: &PeerId, + key: &XorName, + reason: &str, + overload_tracker: &OverloadClaimTracker, +) -> FetchResult { + if overload_tracker.honor_overload_claim(source) { + debug!( + "Fetch: peer {source} is overloaded for {} (deferred): {reason}", + hex::encode(key) + ); + FetchResult::Deferred + } else { + warn!( + "Fetch: peer {source} exceeded overload-claim budget for {}; trying alternate source", + hex::encode(key) + ); + FetchResult::SourceFailed + } +} + +fn spawn_fetch_attempt( + context: FetchAttemptContext, + key: XorName, + source: PeerId, + retry_delay: Option, +) -> FetchFuture { + Box::pin(async move { + let fetch_key = key; + let handle = tokio::spawn(async move { + if let Some(delay) = retry_delay { + tokio::select! { + () = context.shutdown.cancelled() => { + return FetchOutcome { + key: fetch_key, + result: FetchResult::SourceFailed, + }; + } + () = tokio::time::sleep(delay) => {} + } + } + tokio::select! { + () = context.shutdown.cancelled() => FetchOutcome { + key: fetch_key, + result: FetchResult::SourceFailed, + }, + outcome = execute_single_fetch( + context.p2p_node, + context.storage, + context.config, + context.overload_tracker, + fetch_key, + source, + ) => outcome, + } + }); + match handle.await { + Ok(outcome) => (outcome.key, Some(outcome)), + Err(e) => { + error!("Fetch task for {} panicked: {e}", hex::encode(fetch_key)); + (fetch_key, None) + } + } + }) +} + #[allow(clippy::too_many_lines)] /// Execute a single fetch request against `source` for `key`. /// @@ -2517,6 +3207,7 @@ async fn execute_single_fetch( p2p_node: Arc, storage: Arc, config: Arc, + overload_tracker: Arc, key: XorName, source: PeerId, ) -> FetchOutcome { @@ -2561,6 +3252,28 @@ async fn execute_single_fetch( }; }; + if let ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Error { + reason, + .. + }) = &resp_msg.body + { + if is_inbound_replication_overloaded_reason(reason) { + let result = + classify_fetch_overload_response(&source, &key, reason, &overload_tracker); + if result == FetchResult::SourceFailed { + p2p_node + .report_trust_event( + &source, + TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT), + ) + .await; + } + return FetchOutcome { key, result }; + } + } + + overload_tracker.record_normal_response(&source); + match resp_msg.body { ReplicationMessageBody::FetchResponse(protocol::FetchResponse::Success { key: resp_key, @@ -2673,6 +3386,8 @@ async fn execute_single_fetch( reason, .. }) => { + // Overload errors were already classified and returned + // before this match, so this is a genuine error response. warn!( "Fetch: peer {source} returned error for {}: {reason}", hex::encode(key) @@ -2851,11 +3566,37 @@ fn audit_failure_clears_bootstrap_claim(reason: &AuditFailureReason) -> bool { #[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] mod tests { use super::{ - audit_failure_clears_bootstrap_claim, first_failed_key_label, fresh_offer_payment_context, - paid_notify_payment_context, + audit_failure_clears_bootstrap_claim, classify_fetch_overload_response, + first_failed_key_label, fresh_offer_payment_context, has_priority_sync_work, + is_inbound_replication_overloaded_reason, overloaded_response_body, + paid_notify_payment_context, FetchResult, OverloadClaimTracker, + INBOUND_REPLICATION_OVERLOADED_REASON, INBOUND_REPLICATION_PEER_OVERLOADED_REASON, + MAX_CONSECUTIVE_OVERLOAD_CLAIMS, }; use crate::payment::VerificationContext; - use crate::replication::types::AuditFailureReason; + use crate::replication::protocol::{ + AuditChallenge, AuditResponse, FetchRequest, FetchResponse, FreshReplicationOffer, + FreshReplicationResponse, NeighborSyncRequest, OverloadedNotice, PaidNotify, + ReplicationMessageBody, VerificationRequest, + }; + use crate::replication::types::{AuditFailureReason, NeighborSyncState}; + use saorsa_core::identity::PeerId; + + const TEST_KEY: [u8; 32] = [0x42; 32]; + const TEST_NONCE: [u8; 32] = [0; 32]; + const TEST_CHALLENGED_PEER_ID: [u8; 32] = [1; 32]; + const TEST_CHALLENGE_ID: u64 = 7; + + #[test] + fn priority_sync_backlog_detects_queued_peers() { + let mut state = NeighborSyncState::new_cycle(Vec::new()); + assert!(!has_priority_sync_work(&state)); + + let peer = PeerId::from_bytes(TEST_CHALLENGED_PEER_ID); + assert_eq!(state.queue_priority_peers(std::iter::once(peer)), 1); + + assert!(has_priority_sync_work(&state)); + } #[test] fn fresh_offer_runs_client_put_payment_checks() { @@ -2873,6 +3614,199 @@ mod tests { ); } + #[test] + fn inbound_overload_reason_classifier_matches_admission_reasons() { + assert!(is_inbound_replication_overloaded_reason( + INBOUND_REPLICATION_OVERLOADED_REASON + )); + assert!(is_inbound_replication_overloaded_reason( + INBOUND_REPLICATION_PEER_OVERLOADED_REASON + )); + assert!(!is_inbound_replication_overloaded_reason( + "ordinary replication failure" + )); + } + + #[test] + fn overloaded_response_body_rejects_fresh_offers() { + let request = ReplicationMessageBody::FreshReplicationOffer(FreshReplicationOffer { + key: TEST_KEY, + data: Vec::new(), + proof_of_payment: Vec::new(), + }); + + let response = overloaded_response_body(&request, INBOUND_REPLICATION_OVERLOADED_REASON) + .expect("fresh offer should have a rejection response"); + + match response { + ReplicationMessageBody::FreshReplicationResponse( + FreshReplicationResponse::Rejected { key, reason }, + ) => { + assert_eq!(key, TEST_KEY); + assert_eq!(reason, INBOUND_REPLICATION_OVERLOADED_REASON); + } + other => panic!("unexpected overload response: {other:?}"), + } + } + + #[test] + fn overloaded_response_body_returns_fetch_error() { + let request = ReplicationMessageBody::FetchRequest(FetchRequest { key: TEST_KEY }); + + let response = overloaded_response_body(&request, INBOUND_REPLICATION_OVERLOADED_REASON) + .expect("fetch request should have an error response"); + + match response { + ReplicationMessageBody::FetchResponse(FetchResponse::Error { key, reason }) => { + assert_eq!(key, TEST_KEY); + assert_eq!(reason, INBOUND_REPLICATION_OVERLOADED_REASON); + } + other => panic!("unexpected overload response: {other:?}"), + } + } + + #[test] + fn overloaded_response_body_returns_audit_rejection() { + let request = ReplicationMessageBody::AuditChallenge(AuditChallenge { + challenge_id: TEST_CHALLENGE_ID, + nonce: TEST_NONCE, + challenged_peer_id: TEST_CHALLENGED_PEER_ID, + keys: vec![TEST_KEY], + }); + + let response = overloaded_response_body(&request, INBOUND_REPLICATION_OVERLOADED_REASON) + .expect("audit challenge should have a rejection response"); + + match response { + ReplicationMessageBody::AuditResponse(AuditResponse::Rejected { + challenge_id, + reason, + }) => { + assert_eq!(challenge_id, TEST_CHALLENGE_ID); + assert_eq!(reason, INBOUND_REPLICATION_OVERLOADED_REASON); + } + other => panic!("unexpected overload response: {other:?}"), + } + } + + #[test] + fn overloaded_response_body_returns_overloaded_notice_for_verification() { + let request = ReplicationMessageBody::VerificationRequest(VerificationRequest { + keys: vec![TEST_KEY], + paid_list_check_indices: Vec::new(), + }); + + let response = overloaded_response_body(&request, INBOUND_REPLICATION_OVERLOADED_REASON) + .expect("verification request should get an overloaded notice"); + + match response { + ReplicationMessageBody::Overloaded(OverloadedNotice { reason }) => { + assert_eq!(reason, INBOUND_REPLICATION_OVERLOADED_REASON); + } + other => panic!("unexpected overload response: {other:?}"), + } + } + + #[test] + fn overloaded_response_body_returns_overloaded_notice_for_neighbor_sync() { + let request = ReplicationMessageBody::NeighborSyncRequest(NeighborSyncRequest { + replica_hints: vec![TEST_KEY], + paid_hints: Vec::new(), + bootstrapping: false, + }); + + let response = overloaded_response_body(&request, INBOUND_REPLICATION_OVERLOADED_REASON) + .expect("neighbor sync should get an overloaded notice, not a timeout"); + + match response { + ReplicationMessageBody::Overloaded(OverloadedNotice { reason }) => { + assert_eq!(reason, INBOUND_REPLICATION_OVERLOADED_REASON); + } + other => panic!("unexpected overload response: {other:?}"), + } + } + + #[test] + fn overloaded_response_body_ignores_one_way_paid_notify() { + let request = ReplicationMessageBody::PaidNotify(PaidNotify { + key: TEST_KEY, + proof_of_payment: Vec::new(), + }); + + assert!( + overloaded_response_body(&request, INBOUND_REPLICATION_OVERLOADED_REASON).is_none() + ); + } + + #[test] + fn overload_tracker_honors_budget_then_penalizes() { + let tracker = OverloadClaimTracker::new(); + let peer = PeerId::from_bytes(TEST_CHALLENGED_PEER_ID); + + // A bounded run of consecutive claims is honored as a neutral deferral. + for _ in 0..MAX_CONSECUTIVE_OVERLOAD_CLAIMS { + assert!(tracker.honor_overload_claim(&peer)); + } + // The next claim exceeds the budget and must not be honored. + assert!(!tracker.honor_overload_claim(&peer)); + } + + #[test] + fn overload_tracker_resets_on_normal_response() { + let tracker = OverloadClaimTracker::new(); + let peer = PeerId::from_bytes(TEST_CHALLENGED_PEER_ID); + + for _ in 0..MAX_CONSECUTIVE_OVERLOAD_CLAIMS { + assert!(tracker.honor_overload_claim(&peer)); + } + // A genuine response clears the streak, restoring the full budget. + tracker.record_normal_response(&peer); + assert!(tracker.honor_overload_claim(&peer)); + } + + #[test] + fn overload_tracker_budget_is_per_peer() { + let tracker = OverloadClaimTracker::new(); + let peer_a = PeerId::from_bytes([1; 32]); + let peer_b = PeerId::from_bytes([2; 32]); + + // Exhaust peer_a's budget. + for _ in 0..=MAX_CONSECUTIVE_OVERLOAD_CLAIMS { + let _ = tracker.honor_overload_claim(&peer_a); + } + assert!(!tracker.honor_overload_claim(&peer_a)); + // peer_b is unaffected and keeps its own budget. + assert!(tracker.honor_overload_claim(&peer_b)); + } + + #[test] + fn fetch_overload_uses_tracker_budget_then_source_failed() { + let tracker = OverloadClaimTracker::new(); + let peer = PeerId::from_bytes(TEST_CHALLENGED_PEER_ID); + + for _ in 0..MAX_CONSECUTIVE_OVERLOAD_CLAIMS { + assert_eq!( + classify_fetch_overload_response( + &peer, + &TEST_KEY, + INBOUND_REPLICATION_OVERLOADED_REASON, + &tracker, + ), + FetchResult::Deferred + ); + } + + assert_eq!( + classify_fetch_overload_response( + &peer, + &TEST_KEY, + INBOUND_REPLICATION_OVERLOADED_REASON, + &tracker, + ), + FetchResult::SourceFailed + ); + } + #[test] fn audit_timeout_preserves_active_bootstrap_claim() { assert!(!audit_failure_clears_bootstrap_claim( diff --git a/src/replication/neighbor_sync.rs b/src/replication/neighbor_sync.rs index 0fadab01..8d1c8ad7 100644 --- a/src/replication/neighbor_sync.rs +++ b/src/replication/neighbor_sync.rs @@ -43,6 +43,19 @@ pub(crate) struct NeighborSyncOutcome { pub(crate) sent_replica_hints: Vec, } +/// Result of attempting one outbound neighbor-sync request. +#[derive(Debug)] +pub(crate) enum NeighborSyncAttempt { + /// Peer processed the sync and returned a typed response. + Completed(NeighborSyncOutcome), + /// Request failed, timed out, decoded incorrectly, or returned an + /// unexpected response type. + Failed, + /// Peer explicitly reported inbound replication overload before processing + /// the sync request. + Overloaded, +} + /// Prebuilt hint sets for one outbound neighbor-sync exchange. #[derive(Debug, Default)] pub(crate) struct PeerSyncHints { @@ -283,12 +296,14 @@ fn select_next_sync_peer( now: Instant, cooldown: Duration, ) -> Option { - while let Some(peer) = state.priority_order.pop_front() { - if peer_on_cooldown(state, &peer, now, cooldown) { - state.remove_peer(&peer); - continue; - } - + // Priority peers are explicitly queued for prompt attention — newly entered + // close-set members, or peers whose bootstrap sync was deferred because they + // reported overload. They bypass the periodic-sync cooldown: an + // overload-deferred peer blocks bootstrap drain (and hence audits) until it + // is re-synced, so skipping it on cooldown could strand drain until the next + // cycle. They are still de-duplicated, so a peer is synced at most once per + // queueing. + if let Some(peer) = state.priority_order.pop_front() { state.remove_peer(&peer); return Some(peer); } @@ -335,9 +350,12 @@ pub async fn sync_with_peer( config: &ReplicationConfig, is_bootstrapping: bool, ) -> Option { - sync_with_peer_with_outcome(peer, p2p_node, storage, paid_list, config, is_bootstrapping) + match sync_with_peer_with_outcome(peer, p2p_node, storage, paid_list, config, is_bootstrapping) .await - .map(|outcome| outcome.response) + { + NeighborSyncAttempt::Completed(outcome) => Some(outcome.response), + NeighborSyncAttempt::Failed | NeighborSyncAttempt::Overloaded => None, + } } pub(crate) async fn sync_with_peer_with_outcome( @@ -347,7 +365,7 @@ pub(crate) async fn sync_with_peer_with_outcome( paid_list: &Arc, config: &ReplicationConfig, is_bootstrapping: bool, -) -> Option { +) -> NeighborSyncAttempt { // Build peer-targeted hint sets (Rule 7). let mut hints_by_peer = build_sync_hints_for_peers( std::slice::from_ref(peer), @@ -368,7 +386,7 @@ pub(crate) async fn sync_with_peer_with_hints( config: &ReplicationConfig, is_bootstrapping: bool, hints: PeerSyncHints, -) -> Option { +) -> NeighborSyncAttempt { let replica_hints = hints .sent_replica_hints .iter() @@ -391,7 +409,7 @@ pub(crate) async fn sync_with_peer_with_hints( Ok(data) => data, Err(e) => { warn!("Failed to encode sync request for {peer}: {e}"); - return None; + return NeighborSyncAttempt::Failed; } }; @@ -407,25 +425,38 @@ pub(crate) async fn sync_with_peer_with_hints( Ok(resp) => resp, Err(e) => { debug!("Sync with {peer} failed: {e}"); - return None; + return NeighborSyncAttempt::Failed; } }; match ReplicationMessage::decode(&response.data) { - Ok(decoded) => { - if let ReplicationMessageBody::NeighborSyncResponse(resp) = decoded.body { - Some(NeighborSyncOutcome { + Ok(decoded) => match decoded.body { + ReplicationMessageBody::NeighborSyncResponse(resp) => { + NeighborSyncAttempt::Completed(NeighborSyncOutcome { response: resp, sent_replica_hints, }) - } else { + } + // The peer was too overloaded to process this sync. Treat it as a + // neutral non-result: the request-response call already succeeded, + // so there is no timeout trust penalty, and the explicit + // `Overloaded` outcome avoids recording a false successful sync. + // The peer is retried on a later cycle. + ReplicationMessageBody::Overloaded(notice) => { + debug!( + "Sync with {peer} deferred — peer overloaded: {}", + notice.reason + ); + NeighborSyncAttempt::Overloaded + } + _ => { warn!("Unexpected response type from {peer} during sync"); - None + NeighborSyncAttempt::Failed } - } + }, Err(e) => { warn!("Failed to decode sync response from {peer}: {e}"); - None + NeighborSyncAttempt::Failed } } } @@ -1037,7 +1068,12 @@ mod tests { } #[test] - fn priority_peer_on_cooldown_is_skipped_and_removed_from_snapshot() { + fn priority_peer_on_cooldown_is_still_selected_and_removed_from_snapshot() { + // Priority peers bypass the periodic-sync cooldown: a peer queued for + // priority sync (e.g. an overload-deferred bootstrap retry) is selected + // even within its cooldown window so it cannot strand bootstrap drain + // until a later cycle. It is still removed from the snapshot once + // selected, exactly like any synced peer. let peers = vec![peer_id_from_byte(1), peer_id_from_byte(2)]; let mut state = NeighborSyncState::new_cycle(peers); let cooldown = Duration::from_secs(1); @@ -1047,7 +1083,9 @@ mod tests { let batch = select_sync_batch(&mut state, 2, cooldown); - assert_eq!(batch, vec![peer_id_from_byte(1)]); + // Priority peer comes first despite cooldown, then the regular-order + // peer; the priority peer is consumed from both queues. + assert_eq!(batch, vec![priority_peer, peer_id_from_byte(1)]); assert!(state.priority_order.is_empty()); assert!(!state.order.contains(&priority_peer)); } diff --git a/src/replication/protocol.rs b/src/replication/protocol.rs index a5151a33..612225ee 100644 --- a/src/replication/protocol.rs +++ b/src/replication/protocol.rs @@ -114,6 +114,26 @@ pub enum ReplicationMessageBody { AuditChallenge(AuditChallenge), /// Response to audit challenge. AuditResponse(AuditResponse), + + // === Admission control === + /// Sent in place of a typed response when the receiver's inbound handler + /// capacity is exhausted and a request type has no natural rejection + /// response to carry the overload reason (currently `NeighborSyncRequest`). + /// + /// Added as the final variant so postcard keeps the discriminants of all + /// existing variants byte-identical: a node that predates this variant + /// simply fails to decode it and falls back to its neutral + /// no-response-received path, rather than misreading another message. + Overloaded(OverloadedNotice), +} + +/// Notice that the receiver dropped a request because its inbound replication +/// handler capacity was exhausted. Carries the human-readable reason for +/// logging parity with the typed overload rejections. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OverloadedNotice { + /// Human-readable overload reason (for logging only). + pub reason: String, } // --------------------------------------------------------------------------- @@ -601,6 +621,28 @@ mod tests { } } + #[test] + fn overloaded_notice_roundtrip() { + let msg = ReplicationMessage { + request_id: 42, + body: ReplicationMessageBody::Overloaded(OverloadedNotice { + reason: "inbound replication handler capacity exceeded".to_string(), + }), + }; + let encoded = msg.encode().expect("encode should succeed"); + let decoded = ReplicationMessage::decode(&encoded).expect("decode should succeed"); + + assert_eq!(decoded.request_id, 42); + if let ReplicationMessageBody::Overloaded(notice) = decoded.body { + assert_eq!( + notice.reason, + "inbound replication handler capacity exceeded" + ); + } else { + panic!("expected Overloaded"); + } + } + // === Fetch roundtrips === #[test] diff --git a/src/replication/pruning.rs b/src/replication/pruning.rs index 68ebb8cc..3ec76905 100644 --- a/src/replication/pruning.rs +++ b/src/replication/pruning.rs @@ -21,6 +21,7 @@ use crate::replication::config::{ storage_admission_width, ReplicationConfig, AUDIT_FAILURE_TRUST_WEIGHT, MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS, REPLICATION_PROTOCOL_ID, }; +use crate::replication::is_inbound_replication_overloaded_reason; use crate::replication::paid_list::PaidList; use crate::replication::protocol::{ compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage, @@ -95,6 +96,7 @@ enum PruneAuditStatus { Proven, Failed, Bootstrapping, + Overloaded, } #[derive(Debug, Default)] @@ -1122,6 +1124,7 @@ async fn peer_proves_record( report_prune_audit_failure_once(&peer, &key, p2p_node, config, report_state).await; None } + PruneAuditStatus::Overloaded => None, } } @@ -1255,6 +1258,13 @@ fn prune_audit_response_status( reason, }) => { if resp_id == challenge_id { + if is_inbound_replication_overloaded_reason(&reason) { + debug!( + "Prune audit proof for {} deferred by overloaded peer {peer}: {reason}", + hex::encode(key) + ); + return PruneAuditStatus::Overloaded; + } warn!( "Prune audit proof for {} rejected by {peer}: {reason}", hex::encode(key) @@ -1764,6 +1774,31 @@ mod tests { assert!(!prune_audit_response_clears_bootstrap_claim( PruneAuditStatus::Bootstrapping )); + assert!(!prune_audit_response_clears_bootstrap_claim( + PruneAuditStatus::Overloaded + )); + } + + #[test] + fn prune_audit_overload_rejection_is_neutral() { + const TEST_CHALLENGE_ID: u64 = 99; + const TEST_KEY_BYTE: u8 = 0xab; + const TEST_NONCE: [u8; 32] = [0; 32]; + + let peer = peer_id_from_byte(1); + let key = key_from_byte(TEST_KEY_BYTE); + let decoded = ReplicationMessage { + request_id: TEST_CHALLENGE_ID, + body: ReplicationMessageBody::AuditResponse(AuditResponse::Rejected { + challenge_id: TEST_CHALLENGE_ID, + reason: crate::replication::INBOUND_REPLICATION_OVERLOADED_REASON.to_string(), + }), + }; + + let status = + prune_audit_response_status(decoded, TEST_CHALLENGE_ID, &peer, &key, &TEST_NONCE, &[]); + + assert_eq!(status, PruneAuditStatus::Overloaded); } #[tokio::test] diff --git a/src/replication/quorum.rs b/src/replication/quorum.rs index 5cfddff7..ab8bd04d 100644 --- a/src/replication/quorum.rs +++ b/src/replication/quorum.rs @@ -427,9 +427,7 @@ pub async fn run_verification_round( continue; }; - if let ReplicationMessageBody::VerificationResponse(resp) = msg.body { - process_verification_response(&peer, &resp, targets, &mut evidence); - } + process_peer_verification_message(&peer, msg, targets, &mut evidence); } let elapsed_ms = started.elapsed().as_millis(); @@ -450,6 +448,30 @@ pub async fn run_verification_round( evidence } +fn process_peer_verification_message( + peer: &PeerId, + msg: ReplicationMessage, + targets: &VerificationTargets, + evidence: &mut HashMap, +) { + match msg.body { + ReplicationMessageBody::VerificationResponse(resp) => { + process_verification_response(peer, &resp, targets, evidence); + } + ReplicationMessageBody::Overloaded(notice) => { + debug!( + "Verification request to {peer} deferred because peer is overloaded: {}", + notice.reason + ); + mark_peer_unresolved(peer, targets, evidence); + } + other => { + debug!("Unexpected verification response from {peer}: {other:?}"); + mark_peer_unresolved(peer, targets, evidence); + } + } +} + /// Mark all keys for a peer as unresolved (timeout / decode failure). fn mark_peer_unresolved( peer: &PeerId, diff --git a/src/replication/scheduling.rs b/src/replication/scheduling.rs index ce02386c..b79fa955 100644 --- a/src/replication/scheduling.rs +++ b/src/replication/scheduling.rs @@ -266,6 +266,22 @@ impl ReplicationQueues { Some(entry.pipeline) } + /// Record that `key` resolved as `QuorumInconclusive` this round and return + /// the running count of consecutive inconclusive rounds. + /// + /// Returns `0` if the key is no longer pending (already terminal). The + /// caller abandons the key once the count reaches its bound so a key whose + /// targets never produce a conclusive answer cannot be re-verified forever. + pub fn record_inconclusive(&mut self, key: &XorName) -> u32 { + match self.pending_verify.get_mut(key) { + Some(entry) => { + entry.inconclusive_rounds = entry.inconclusive_rounds.saturating_add(1); + entry.inconclusive_rounds + } + None => 0, + } + } + /// Remove a key from pending verification. pub fn remove_pending(&mut self, key: &XorName) -> Option { let removed = self.pending_verify.remove(key); @@ -348,6 +364,15 @@ impl ReplicationQueues { for retry next cycle", hex::encode(key) ); + // This key reached a conclusive verification outcome (every caller + // promotes only verified/failed keys) but cannot move to fetch yet. + // Clear any inconclusive streak so a prior run of inconclusive rounds + // does not count toward inconclusive abandonment once the entry is + // retained — the streak only ever reflects *unbroken* inconclusive + // rounds. + if let Some(entry) = self.pending_verify.get_mut(&key) { + entry.inconclusive_rounds = 0; + } return false; } // Capacity confirmed; safe to release the pending slot and enqueue. @@ -405,6 +430,12 @@ impl ReplicationQueues { self.in_flight_fetch.remove(key) } + /// Return the peer currently serving an in-flight fetch. + #[must_use] + pub fn current_fetch_source(&self, key: &XorName) -> Option { + self.in_flight_fetch.get(key).map(|entry| entry.source) + } + /// Mark the current fetch attempt as failed and try the next untried source. /// /// Returns the next source peer if one is available, or `None` if all @@ -455,21 +486,27 @@ impl ReplicationQueues { } /// Evict stale pending-verification entries older than `max_age`. - pub fn evict_stale(&mut self, max_age: Duration) { + /// + /// Returns the evicted keys so callers that maintain external bookkeeping + /// (for example bootstrap drain tracking) can remove those keys from their + /// own pending sets. + pub fn evict_stale(&mut self, max_age: Duration) -> Vec { let now = Instant::now(); - let before = self.pending_verify.len(); + let mut evicted_keys = Vec::new(); let pending_per_sender = &mut self.pending_per_sender; - self.pending_verify.retain(|_, entry| { + self.pending_verify.retain(|key, entry| { let fresh = now.duration_since(entry.created_at) < max_age; if !fresh { Self::release_sender_slot(pending_per_sender, &entry.hint_sender); + evicted_keys.push(*key); } fresh }); - let evicted = before.saturating_sub(self.pending_verify.len()); + let evicted = evicted_keys.len(); if evicted > 0 { debug!("Evicted {evicted} stale pending-verification entries"); } + evicted_keys } /// Number of `pending_verify` entries currently attributed to `sender`. @@ -513,6 +550,7 @@ mod tests { tried_sources: HashSet::new(), created_at: Instant::now(), hint_sender: peer_id_from_byte(sender_byte), + inconclusive_rounds: 0, } } @@ -535,6 +573,22 @@ mod tests { assert_eq!(queues.pending_count(), 1); } + #[test] + fn record_inconclusive_counts_streak_and_reports_zero_when_absent() { + let mut queues = ReplicationQueues::new(); + let key = xor_name_from_byte(0x55); + assert!(queues.add_pending_verify(key, test_entry(1)).admitted()); + + assert_eq!(queues.record_inconclusive(&key), 1); + assert_eq!(queues.record_inconclusive(&key), 2); + assert_eq!(queues.record_inconclusive(&key), 3); + + // Once the key leaves pending, further calls report 0 (no entry to + // count) rather than resurrecting state. + queues.remove_pending(&key); + assert_eq!(queues.record_inconclusive(&key), 0); + } + #[test] fn add_pending_verify_rejected_if_in_fetch_queue() { let mut queues = ReplicationQueues::new(); @@ -856,6 +910,7 @@ mod tests { tried_sources: HashSet::new(), created_at: Instant::now(), hint_sender: peer_id_from_byte(1), + inconclusive_rounds: 0, }; assert!(queues.add_pending_verify(key, entry).admitted()); @@ -875,6 +930,7 @@ mod tests { tried_sources: HashSet::new(), created_at: Instant::now(), hint_sender: peer_id_from_byte(2), + inconclusive_rounds: 0, }; assert!( @@ -915,6 +971,7 @@ mod tests { tried_sources: HashSet::new(), created_at: Instant::now(), hint_sender, + inconclusive_rounds: 0, }; assert!( queues.add_pending_verify(key, entry).admitted(), diff --git a/src/replication/types.rs b/src/replication/types.rs index 59e6b417..d55a5702 100644 --- a/src/replication/types.rs +++ b/src/replication/types.rs @@ -94,6 +94,18 @@ pub struct VerificationEntry { pub created_at: Instant, /// The peer that originally hinted this key (for source tracking). pub hint_sender: PeerId, + /// Consecutive verification rounds this key has resolved as + /// `QuorumInconclusive`. + /// + /// Bounds re-verification of keys that never converge — e.g. targets that + /// are persistently unreachable or that keep replying `Overloaded` (the + /// network-verify path has no per-peer overload-claim budget). The counter + /// only ever reflects an *unbroken* inconclusive streak: a conclusive + /// outcome normally removes the entry, and on the one path where a verified + /// key is retained instead of removed (fetch queue at capacity) + /// [`promote_pending_to_fetch`](crate::replication::scheduling::ReplicationQueues::promote_pending_to_fetch) + /// resets this field to zero. + pub inconclusive_rounds: u32, } // --------------------------------------------------------------------------- @@ -654,6 +666,14 @@ pub struct BootstrapState { /// of a global counter prevents one peer's rejection from being /// "cleared" by an unrelated peer's clean cycle. pub capacity_rejected_sources: HashSet, + /// Bootstrap neighbors whose initial sync was explicitly deferred because + /// the peer reported inbound overload. + /// + /// These peers have been queued for a later priority neighbor-sync retry. + /// Bootstrap must not drain while this set is non-empty, because otherwise + /// an overload response can make the initial bootstrap sync appear complete + /// before the retry has a chance to discover repair work. + pub overload_deferred_sync_peers: HashSet, } impl BootstrapState { @@ -665,6 +685,7 @@ impl BootstrapState { pending_peer_requests: 0, pending_keys: HashSet::new(), capacity_rejected_sources: HashSet::new(), + overload_deferred_sync_peers: HashSet::new(), } } @@ -1388,6 +1409,14 @@ mod tests { from_default.pending_peer_requests ); assert_eq!(from_new.pending_keys, from_default.pending_keys); + assert_eq!( + from_new.capacity_rejected_sources, + from_default.capacity_rejected_sources + ); + assert_eq!( + from_new.overload_deferred_sync_peers, + from_default.overload_deferred_sync_peers + ); } // -- Scenario tests ------------------------------------------------------- diff --git a/tests/poc_d1_bounded_queues.rs b/tests/poc_d1_bounded_queues.rs index 79465f08..65c1a291 100644 --- a/tests/poc_d1_bounded_queues.rs +++ b/tests/poc_d1_bounded_queues.rs @@ -66,6 +66,7 @@ fn entry_from(sender: PeerId) -> VerificationEntry { tried_sources: HashSet::new(), created_at: Instant::now(), hint_sender: sender, + inconclusive_rounds: 0, } } From 215cb7d5aacb1a1787b8ea3f162a77cd92f1f96e Mon Sep 17 00:00:00 2001 From: Hermes Agent Date: Mon, 22 Jun 2026 20:32:02 +0100 Subject: [PATCH 2/4] fix(replication): bound overload notices --- src/replication/mod.rs | 140 ++++++++++++++++++++++++++++--------- src/replication/pruning.rs | 7 +- src/replication/quorum.rs | 53 ++++++++++++-- 3 files changed, 159 insertions(+), 41 deletions(-) diff --git a/src/replication/mod.rs b/src/replication/mod.rs index c90e2cea..fde556e6 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -130,13 +130,12 @@ fn is_inbound_replication_overloaded_reason(reason: &str) -> bool { /// peer is held accountable. The counter resets on any non-overload response on /// the same path. /// -/// The audit and fetch paths each keep their own [`OverloadClaimTracker`], so -/// this budget is enforced independently per path. The trackers are -/// deliberately *not* shared: `record_normal_response` clears a peer's entire -/// streak, so a shared tracker would let a peer keep its audit-overload streak -/// permanently fresh simply by returning ordinary (non-overload) responses on -/// the fetch path — defeating the audit-dodge bound this constant exists to -/// enforce. +/// Each request/response path keeps its own [`OverloadClaimTracker`], so this +/// budget is enforced independently per path. The trackers are deliberately +/// *not* shared: `record_normal_response` clears a peer's entire streak, so a +/// shared tracker would let a peer keep its audit-overload streak permanently +/// fresh simply by returning ordinary (non-overload) responses on the fetch path +/// — defeating the audit-dodge bound this constant exists to enforce. const MAX_CONSECUTIVE_OVERLOAD_CLAIMS: u32 = 5; /// Capacity of the per-peer overload-claim tracker. Bounds memory under peer @@ -209,6 +208,7 @@ struct VerificationCycleContext<'a> { storage: &'a Arc, queues: &'a Arc>, config: &'a ReplicationConfig, + overload_tracker: &'a Arc, bootstrap_state: &'a Arc>, is_bootstrapping: &'a Arc>, bootstrap_complete_notify: &'a Arc, @@ -464,6 +464,14 @@ pub struct ReplicationEngine { /// source cannot defer a fetch indefinitely. Separate from the audit tracker /// (see [`OverloadClaimTracker`]). fetch_overload_tracker: Arc, + /// Bounds repeated `Overloaded` notices from verification targets. Without + /// this, a peer could indefinitely avoid producing presence / paid-list + /// evidence while each individual key is merely abandoned after its + /// inconclusive-round budget. + verification_overload_tracker: Arc, + /// Bounds repeated `Overloaded` notices from neighbor-sync peers. This keeps + /// bootstrap retry deferrals from becoming a silent accountability bypass. + neighbor_sync_overload_tracker: Arc, /// Receiver for fresh-write events from the chunk PUT handler. /// /// When present, `start()` spawns a drainer task that calls @@ -520,6 +528,8 @@ impl ReplicationEngine { send_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REPLICATION_SENDS)), audit_overload_tracker: Arc::new(OverloadClaimTracker::new()), fetch_overload_tracker: Arc::new(OverloadClaimTracker::new()), + verification_overload_tracker: Arc::new(OverloadClaimTracker::new()), + neighbor_sync_overload_tracker: Arc::new(OverloadClaimTracker::new()), fresh_write_rx: Some(fresh_write_rx), shutdown, task_handles: Vec::new(), @@ -937,6 +947,7 @@ impl ReplicationEngine { let bootstrap_state = Arc::clone(&self.bootstrap_state); let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify); let sync_trigger = Arc::clone(&self.sync_trigger); + let overload_tracker = Arc::clone(&self.neighbor_sync_overload_tracker); let handle = tokio::spawn(async move { loop { @@ -974,6 +985,8 @@ impl ReplicationEngine { &is_bootstrapping, &bootstrap_state, &bootstrap_complete_notify, + &sync_trigger, + &overload_tracker, ) => {} } } @@ -1238,6 +1251,7 @@ impl ReplicationEngine { let bootstrap_state = Arc::clone(&self.bootstrap_state); let is_bootstrapping = Arc::clone(&self.is_bootstrapping); let bootstrap_complete_notify = Arc::clone(&self.bootstrap_complete_notify); + let overload_tracker = Arc::clone(&self.verification_overload_tracker); let handle = tokio::spawn(async move { loop { @@ -1252,6 +1266,7 @@ impl ReplicationEngine { storage: &storage, queues: &queues, config: &config, + overload_tracker: &overload_tracker, bootstrap_state: &bootstrap_state, is_bootstrapping: &is_bootstrapping, bootstrap_complete_notify: &bootstrap_complete_notify, @@ -1294,6 +1309,7 @@ impl ReplicationEngine { let repair_proofs = Arc::clone(&self.repair_proofs); let sync_state = Arc::clone(&self.sync_state); let sync_trigger = Arc::clone(&self.sync_trigger); + let overload_tracker = Arc::clone(&self.neighbor_sync_overload_tracker); let handle = tokio::spawn(async move { // Wait for DHT bootstrap to complete before snapshotting @@ -1363,6 +1379,7 @@ impl ReplicationEngine { match outcome { neighbor_sync::NeighborSyncAttempt::Completed(outcome) => { + overload_tracker.record_normal_response(peer); if !outcome.response.bootstrapping { record_sent_replica_hints( peer, @@ -1408,15 +1425,19 @@ impl ReplicationEngine { } } neighbor_sync::NeighborSyncAttempt::Overloaded => { - debug!( - "Bootstrap sync with {peer} deferred because peer is overloaded" - ); - bootstrap::note_overload_deferred_sync(&bootstrap_state, *peer).await; - { - let mut state = sync_state.write().await; - let _ = state.queue_priority_peers(std::iter::once(*peer)); - } - sync_trigger.notify_one(); + handle_neighbor_sync_overload( + peer, + &p2p, + &sync_state, + &bootstrap_state, + &queues, + &is_bootstrapping, + &bootstrap_complete_notify, + &sync_trigger, + &overload_tracker, + true, + ) + .await; } neighbor_sync::NeighborSyncAttempt::Failed => {} } @@ -2227,6 +2248,51 @@ async fn clear_overload_deferred_sync_and_check_drain( } } +#[allow(clippy::too_many_arguments)] +async fn handle_neighbor_sync_overload( + peer: &PeerId, + p2p_node: &Arc, + sync_state: &Arc>, + bootstrap_state: &Arc>, + queues: &Arc>, + is_bootstrapping: &Arc>, + bootstrap_complete_notify: &Arc, + sync_trigger: &Arc, + overload_tracker: &Arc, + block_bootstrap_drain: bool, +) { + if overload_tracker.honor_overload_claim(peer) { + debug!("Neighbor sync with {peer} deferred because peer is overloaded"); + if block_bootstrap_drain { + bootstrap::note_overload_deferred_sync(bootstrap_state, *peer).await; + { + let mut state = sync_state.write().await; + let _ = state.queue_priority_peers(std::iter::once(*peer)); + } + sync_trigger.notify_one(); + } + return; + } + + warn!( + "Neighbor sync peer {peer} exceeded overload-claim budget; treating overload as sync failure" + ); + p2p_node + .report_trust_event( + peer, + TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT), + ) + .await; + clear_overload_deferred_sync_and_check_drain( + peer, + bootstrap_state, + queues, + is_bootstrapping, + bootstrap_complete_notify, + ) + .await; +} + // --------------------------------------------------------------------------- // Neighbor sync round // --------------------------------------------------------------------------- @@ -2246,6 +2312,8 @@ async fn run_neighbor_sync_round( is_bootstrapping: &Arc>, bootstrap_state: &Arc>, bootstrap_complete_notify: &Arc, + sync_trigger: &Arc, + overload_tracker: &Arc, ) { let self_id = *p2p_node.peer_id(); let bootstrapping = *is_bootstrapping.read().await; @@ -2347,6 +2415,7 @@ async fn run_neighbor_sync_round( match outcome { neighbor_sync::NeighborSyncAttempt::Completed(outcome) => { + overload_tracker.record_normal_response(peer); handle_sync_response( &self_id, peer, @@ -2375,21 +2444,17 @@ async fn run_neighbor_sync_round( .await; } neighbor_sync::NeighborSyncAttempt::Overloaded => { - // The peer answered the deferred bootstrap retry but is still - // overloaded. Stop blocking bootstrap drain on it rather than - // re-deferring: the neighbor-sync path has no overload-claim - // budget, so a reachable peer that always reports overload would - // otherwise suspend bootstrap (and therefore audits) - // indefinitely. This matches the documented contract on - // `note_overload_deferred_sync` ("until that retry completes, - // fails, or the peer leaves"). Regular periodic sync still - // retries this peer later and will pick up any repair work. - clear_overload_deferred_sync_and_check_drain( + handle_neighbor_sync_overload( peer, + p2p_node, + sync_state, bootstrap_state, queues, is_bootstrapping, bootstrap_complete_notify, + sync_trigger, + overload_tracker, + bootstrapping, ) .await; } @@ -2429,6 +2494,7 @@ async fn run_neighbor_sync_round( match replacement_outcome { neighbor_sync::NeighborSyncAttempt::Completed(outcome) => { + overload_tracker.record_normal_response(&replacement_peer); handle_sync_response( &self_id, &replacement_peer, @@ -2457,15 +2523,17 @@ async fn run_neighbor_sync_round( .await; } neighbor_sync::NeighborSyncAttempt::Overloaded => { - // See the primary-peer arm above: give up rather - // than re-deferring indefinitely on a replacement - // peer that keeps reporting overload. - clear_overload_deferred_sync_and_check_drain( + handle_neighbor_sync_overload( &replacement_peer, + p2p_node, + sync_state, bootstrap_state, queues, is_bootstrapping, bootstrap_complete_notify, + sync_trigger, + overload_tracker, + bootstrapping, ) .await; } @@ -2732,6 +2800,7 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { storage, queues, config, + overload_tracker, bootstrap_state, is_bootstrapping, bootstrap_complete_notify, @@ -2849,6 +2918,7 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { &targets, p2p_node, config, + Some(overload_tracker), ) .await; @@ -2887,8 +2957,14 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { quorum::compute_verification_targets(&keys_needing_network, p2p_node, config, &self_id) .await; - let evidence = - quorum::run_verification_round(&keys_needing_network, &targets, p2p_node, config).await; + let evidence = quorum::run_verification_round( + &keys_needing_network, + &targets, + p2p_node, + config, + Some(overload_tracker), + ) + .await; // Step 3: Evaluate results — collect outcomes without holding the write // lock across paid-list I/O. diff --git a/src/replication/pruning.rs b/src/replication/pruning.rs index 3ec76905..19502e5f 100644 --- a/src/replication/pruning.rs +++ b/src/replication/pruning.rs @@ -21,7 +21,6 @@ use crate::replication::config::{ storage_admission_width, ReplicationConfig, AUDIT_FAILURE_TRUST_WEIGHT, MAX_PRUNE_AUDIT_CHALLENGES_PER_PASS, REPLICATION_PROTOCOL_ID, }; -use crate::replication::is_inbound_replication_overloaded_reason; use crate::replication::paid_list::PaidList; use crate::replication::protocol::{ compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage, @@ -32,6 +31,7 @@ use crate::replication::types::{ BootstrapClaimObservation, KeyVerificationEvidence, NeighborSyncState, PaidListEvidence, RepairProofs, }; +use crate::replication::{is_inbound_replication_overloaded_reason, OverloadClaimTracker}; use crate::storage::LmdbStorage; use super::REPLICATION_TRUST_WEIGHT; @@ -787,7 +787,10 @@ async fn collect_paid_prune_confirmations( keys_list.dedup(); } - let evidence = quorum::run_verification_round(&keys, &targets, p2p_node, config).await; + let overload_tracker = Arc::new(OverloadClaimTracker::new()); + let evidence = + quorum::run_verification_round(&keys, &targets, p2p_node, config, Some(&overload_tracker)) + .await; paid_confirmations_by_key(expired_candidates, &evidence) } diff --git a/src/replication/quorum.rs b/src/replication/quorum.rs index ab8bd04d..4952d4a0 100644 --- a/src/replication/quorum.rs +++ b/src/replication/quorum.rs @@ -17,6 +17,10 @@ use crate::replication::protocol::{ ReplicationMessage, ReplicationMessageBody, VerificationRequest, VerificationResponse, }; use crate::replication::types::{KeyVerificationEvidence, PaidListEvidence, PresenceEvidence}; +use crate::replication::{ + is_inbound_replication_overloaded_reason, OverloadClaimTracker, REPLICATION_TRUST_WEIGHT, +}; +use saorsa_core::TrustEvent; /// Verification round duration that is worth surfacing at info level. const VERIFICATION_ROUND_SLOW_LOG_MS: u128 = 500; @@ -326,11 +330,13 @@ fn collect_present_sources( /// /// Implements Section 9 requirement: one request per peer carrying many keys. /// Returns per-key evidence aggregated from all peer responses. -pub async fn run_verification_round( +#[allow(clippy::too_many_lines)] +pub(crate) async fn run_verification_round( keys: &[XorName], targets: &VerificationTargets, p2p_node: &Arc, config: &ReplicationConfig, + overload_tracker: Option<&Arc>, ) -> HashMap { let started = Instant::now(); let peer_count = targets.peer_to_keys.len(); @@ -427,7 +433,15 @@ pub async fn run_verification_round( continue; }; - process_peer_verification_message(&peer, msg, targets, &mut evidence); + process_peer_verification_message( + &peer, + msg, + targets, + &mut evidence, + p2p_node, + overload_tracker, + ) + .await; } let elapsed_ms = started.elapsed().as_millis(); @@ -448,21 +462,46 @@ pub async fn run_verification_round( evidence } -fn process_peer_verification_message( +async fn process_peer_verification_message( peer: &PeerId, msg: ReplicationMessage, targets: &VerificationTargets, evidence: &mut HashMap, + p2p_node: &Arc, + overload_tracker: Option<&Arc>, ) { match msg.body { ReplicationMessageBody::VerificationResponse(resp) => { + if let Some(tracker) = overload_tracker { + tracker.record_normal_response(peer); + } process_verification_response(peer, &resp, targets, evidence); } ReplicationMessageBody::Overloaded(notice) => { - debug!( - "Verification request to {peer} deferred because peer is overloaded: {}", - notice.reason - ); + let honor = match overload_tracker { + Some(tracker) => { + is_inbound_replication_overloaded_reason(¬ice.reason) + && tracker.honor_overload_claim(peer) + } + None => true, + }; + if honor { + debug!( + "Verification request to {peer} deferred because peer is overloaded: {}", + notice.reason + ); + } else { + warn!( + "Verification peer {peer} sent invalid or over-budget overload notice; treating as non-cooperation: {}", + notice.reason + ); + p2p_node + .report_trust_event( + peer, + TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT), + ) + .await; + } mark_peer_unresolved(peer, targets, evidence); } other => { From 54652dfa3932ab1b6fe8d23f76db9a49a4c5b221 Mon Sep 17 00:00:00 2001 From: Hermes Agent Date: Mon, 22 Jun 2026 21:42:53 +0100 Subject: [PATCH 3/4] fix(replication): defer overload verification without abandoning repair --- src/replication/config.rs | 3 +- src/replication/mod.rs | 76 ++++++++++++++++++++++++++++------- src/replication/pruning.rs | 1 + src/replication/quorum.rs | 55 +++++++++++++++++++++++++ src/replication/scheduling.rs | 25 ++++++++++++ src/replication/types.rs | 6 +++ 6 files changed, 149 insertions(+), 17 deletions(-) diff --git a/src/replication/config.rs b/src/replication/config.rs index 4d5f7cbe..707ecc31 100644 --- a/src/replication/config.rs +++ b/src/replication/config.rs @@ -172,8 +172,7 @@ pub const PENDING_VERIFY_MAX_AGE: Duration = Duration::from_secs(PENDING_VERIFY_ /// A backstop complementing the age-based [`PENDING_VERIFY_MAX_AGE`] eviction: /// it bounds wasted re-verification (and per-source pending-slot occupancy) for /// keys whose targets never converge — persistently unreachable peers, or peers -/// that keep replying `Overloaded`, which the network-verify path treats as -/// neutral and otherwise has no per-peer overload-claim budget to bound. +/// that keep replying `Overloaded` beyond the per-peer overload budget. pub const MAX_INCONCLUSIVE_VERIFY_ROUNDS: u32 = 10; /// Trust event weight for confirmed audit failures. diff --git a/src/replication/mod.rs b/src/replication/mod.rs index fde556e6..8ea47632 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -146,14 +146,14 @@ const OVERLOAD_CLAIM_TRACKER_CAPACITY: usize = 1024; /// Per-peer budget that bounds how often a peer's "overloaded" response is /// honored before it is treated as a normal failure. /// -/// The audit and fetch paths each own a separate instance (behind an `Arc`): -/// they are the request initiators that would otherwise grant an -/// unauthenticated, indefinite accountability exemption to any peer that simply +/// Long-lived request initiators each own a separate instance (behind an +/// `Arc`): audit, fetch, verification, and neighbour-sync can all otherwise +/// grant an unauthenticated accountability exemption to any peer that simply /// echoes the overload reason. They are kept separate rather than shared so a /// non-overload response on one path cannot reset the streak that bounds claims -/// on the other (see [`MAX_CONSECUTIVE_OVERLOAD_CLAIMS`]). The prune-audit path -/// does not use this tracker: an overload there only makes us keep our own -/// replica, which has no evasion value. +/// on another (see [`MAX_CONSECUTIVE_OVERLOAD_CLAIMS`]). Short one-shot prune +/// verification uses a local tracker because overload there only makes us keep +/// our own replica. pub(crate) struct OverloadClaimTracker { /// Consecutive overload claims per peer (reset on any normal response). consecutive: Mutex>, @@ -2270,6 +2270,19 @@ async fn handle_neighbor_sync_overload( let _ = state.queue_priority_peers(std::iter::once(*peer)); } sync_trigger.notify_one(); + } else { + // A later/priority retry completed with an overload notice. Do not + // keep bootstrap drain blocked on this peer; normal sync cycles can + // try it again later, while the overload budget still bounds + // repeated claims. + clear_overload_deferred_sync_and_check_drain( + peer, + bootstrap_state, + queues, + is_bootstrapping, + bootstrap_complete_notify, + ) + .await; } return; } @@ -2454,7 +2467,7 @@ async fn run_neighbor_sync_round( bootstrap_complete_notify, sync_trigger, overload_tracker, - bootstrapping, + false, ) .await; } @@ -2533,7 +2546,7 @@ async fn run_neighbor_sync_round( bootstrap_complete_notify, sync_trigger, overload_tracker, - bootstrapping, + false, ) .await; } @@ -2932,7 +2945,14 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { let sources = evidence.get(&key).map_or_else(Vec::new, |ev| { quorum::present_sources_for_key(&key, ev, &targets) }); - if sources.is_empty() { + let overload_deferred = evidence.get(&key).is_some_and(|ev| ev.overload_deferred); + if sources.is_empty() && overload_deferred { + q.reset_inconclusive(&key); + debug!( + "Locally paid key {} has no responding holders because verification was overload-deferred; leaving pending", + hex::encode(key) + ); + } else if sources.is_empty() { // Terminal failure: remove pending and report. No fetch path. q.remove_pending(&key); warn!( @@ -2968,8 +2988,13 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { // Step 3: Evaluate results — collect outcomes without holding the write // lock across paid-list I/O. - let mut evaluated: Vec<(XorName, KeyVerificationOutcome, HintPipeline, Vec)> = - Vec::new(); + let mut evaluated: Vec<( + XorName, + KeyVerificationOutcome, + HintPipeline, + Vec, + bool, + )> = Vec::new(); { let q = queues.read().await; for key in &keys_needing_network { @@ -2990,13 +3015,20 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { } else { Vec::new() }; - evaluated.push((*key, outcome, entry.pipeline, present_sources)); + let overload_deferred = ev.overload_deferred; + evaluated.push(( + *key, + outcome, + entry.pipeline, + present_sources, + overload_deferred, + )); } } // read lock released // Step 4: Insert verified keys into PaidForList (no lock held). let mut paid_insert_keys: Vec = Vec::new(); - for (key, outcome, _, _) in &evaluated { + for (key, outcome, _, _, _) in &evaluated { if matches!( outcome, KeyVerificationOutcome::QuorumVerified { .. } @@ -3016,7 +3048,7 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { // paid-only hint can safely repair a missing replica using sources // from the same verification round. let mut paid_only_fetch_keys: HashSet = HashSet::new(); - for (key, outcome, pipeline, _) in &evaluated { + for (key, outcome, pipeline, _, _) in &evaluated { if *pipeline == HintPipeline::PaidOnly && matches!( outcome, @@ -3039,7 +3071,7 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { // Step 5: Update queues with the evaluated outcomes. let bootstrapping = *is_bootstrapping.read().await; let mut q = queues.write().await; - for (key, outcome, pipeline, present_sources) in evaluated { + for (key, outcome, pipeline, present_sources, overload_deferred) in evaluated { match outcome { KeyVerificationOutcome::QuorumVerified { sources } | KeyVerificationOutcome::PaidListVerified { sources } => { @@ -3054,6 +3086,12 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { let _ = q.promote_pending_to_fetch(key, distance, sources); // Not terminal — either moved to fetch queue, or // retained as pending until queue drains. + } else if fetch_eligible && sources.is_empty() && overload_deferred { + q.reset_inconclusive(&key); + debug!( + "Verified storage-admitted key {} has no holders because verification was overload-deferred; leaving pending", + hex::encode(key) + ); } else if fetch_eligible && sources.is_empty() { warn!( "Verified storage-admitted key {} has no holders (possible data loss)", @@ -3084,6 +3122,14 @@ async fn run_verification_cycle(ctx: VerificationCycleContext<'_>) { } } KeyVerificationOutcome::QuorumInconclusive => { + if overload_deferred { + q.reset_inconclusive(&key); + debug!( + "Verification for key {} is overload-deferred; leaving pending without burning inconclusive budget", + hex::encode(key) + ); + continue; + } let rounds = q.record_inconclusive(&key); if rounds >= config::MAX_INCONCLUSIVE_VERIFY_ROUNDS { debug!( diff --git a/src/replication/pruning.rs b/src/replication/pruning.rs index 19502e5f..dd16ac2a 100644 --- a/src/replication/pruning.rs +++ b/src/replication/pruning.rs @@ -1607,6 +1607,7 @@ mod tests { // Confirmation from a peer outside the target set. (outsider, PaidListEvidence::Confirmed), ]), + overload_deferred: false, }, ); diff --git a/src/replication/quorum.rs b/src/replication/quorum.rs index 4952d4a0..afd3572e 100644 --- a/src/replication/quorum.rs +++ b/src/replication/quorum.rs @@ -351,6 +351,7 @@ pub(crate) async fn run_verification_round( KeyVerificationEvidence { presence: HashMap::new(), paid_list: HashMap::new(), + overload_deferred: false, }, ) }) @@ -503,6 +504,9 @@ async fn process_peer_verification_message( .await; } mark_peer_unresolved(peer, targets, evidence); + if honor { + mark_peer_overload_deferred(peer, targets, evidence); + } } other => { debug!("Unexpected verification response from {peer}: {other:?}"); @@ -511,6 +515,20 @@ async fn process_peer_verification_message( } } +fn mark_peer_overload_deferred( + peer: &PeerId, + targets: &VerificationTargets, + evidence: &mut HashMap, +) { + if let Some(peer_keys) = targets.peer_to_keys.get(peer) { + for key in peer_keys { + if let Some(ev) = evidence.get_mut(key) { + ev.overload_deferred = true; + } + } + } +} + /// Mark all keys for a peer as unresolved (timeout / decode failure). fn mark_peer_unresolved( peer: &PeerId, @@ -671,6 +689,7 @@ mod tests { KeyVerificationEvidence { presence: presence.into_iter().collect(), paid_list: paid_list.into_iter().collect(), + overload_deferred: false, } } @@ -1045,6 +1064,7 @@ mod tests { KeyVerificationEvidence { presence: HashMap::new(), paid_list: HashMap::new(), + overload_deferred: false, }, )) .collect(); @@ -1084,6 +1104,7 @@ mod tests { KeyVerificationEvidence { presence: HashMap::new(), paid_list: HashMap::new(), + overload_deferred: false, }, )) .collect(); @@ -1106,6 +1127,33 @@ mod tests { ); } + #[test] + fn mark_overload_deferred_sets_flag_without_changing_evidence_values() { + let key = xor_name_from_byte(0xA1); + let peer = peer_id_from_byte(2); + + let targets = single_key_targets(&key, vec![peer], vec![peer]); + let mut evidence: HashMap = std::iter::once(( + key, + KeyVerificationEvidence { + presence: HashMap::from([(peer, PresenceEvidence::Unresolved)]), + paid_list: HashMap::from([(peer, PaidListEvidence::Unresolved)]), + overload_deferred: false, + }, + )) + .collect(); + + mark_peer_overload_deferred(&peer, &targets, &mut evidence); + + let ev = evidence.get(&key).expect("evidence for key"); + assert!( + ev.overload_deferred, + "overload should be tracked separately" + ); + assert_eq!(ev.presence.get(&peer), Some(&PresenceEvidence::Unresolved)); + assert_eq!(ev.paid_list.get(&peer), Some(&PaidListEvidence::Unresolved)); + } + #[test] fn process_response_ignores_unsolicited_keys() { let key = xor_name_from_byte(0xB0); @@ -1119,6 +1167,7 @@ mod tests { KeyVerificationEvidence { presence: HashMap::new(), paid_list: HashMap::new(), + overload_deferred: false, }, )) .collect(); @@ -1181,6 +1230,7 @@ mod tests { KeyVerificationEvidence { presence: HashMap::new(), paid_list: HashMap::new(), + overload_deferred: false, }, ), ( @@ -1188,6 +1238,7 @@ mod tests { KeyVerificationEvidence { presence: HashMap::new(), paid_list: HashMap::new(), + overload_deferred: false, }, ), ] @@ -1439,6 +1490,7 @@ mod tests { KeyVerificationEvidence { presence: HashMap::new(), paid_list: HashMap::new(), + overload_deferred: false, }, ), ( @@ -1446,6 +1498,7 @@ mod tests { KeyVerificationEvidence { presence: HashMap::new(), paid_list: HashMap::new(), + overload_deferred: false, }, ), ] @@ -1501,6 +1554,7 @@ mod tests { KeyVerificationEvidence { presence: HashMap::new(), paid_list: HashMap::new(), + overload_deferred: false, }, ), ( @@ -1508,6 +1562,7 @@ mod tests { KeyVerificationEvidence { presence: HashMap::new(), paid_list: HashMap::new(), + overload_deferred: false, }, ), ] diff --git a/src/replication/scheduling.rs b/src/replication/scheduling.rs index b79fa955..da540de1 100644 --- a/src/replication/scheduling.rs +++ b/src/replication/scheduling.rs @@ -282,6 +282,15 @@ impl ReplicationQueues { } } + /// Clear a pending key's inconclusive streak after an explicit overload + /// deferral. Overload is a receiver-side backpressure signal, not evidence + /// that the key cannot converge. + pub fn reset_inconclusive(&mut self, key: &XorName) { + if let Some(entry) = self.pending_verify.get_mut(key) { + entry.inconclusive_rounds = 0; + } + } + /// Remove a key from pending verification. pub fn remove_pending(&mut self, key: &XorName) -> Option { let removed = self.pending_verify.remove(key); @@ -589,6 +598,22 @@ mod tests { assert_eq!(queues.record_inconclusive(&key), 0); } + #[test] + fn reset_inconclusive_clears_streak_for_pending_key() { + let mut queues = ReplicationQueues::new(); + let key = xor_name_from_byte(0x56); + assert!(queues.add_pending_verify(key, test_entry(1)).admitted()); + + assert_eq!(queues.record_inconclusive(&key), 1); + assert_eq!(queues.record_inconclusive(&key), 2); + queues.reset_inconclusive(&key); + assert_eq!(queues.record_inconclusive(&key), 1); + + queues.remove_pending(&key); + queues.reset_inconclusive(&key); + assert_eq!(queues.record_inconclusive(&key), 0); + } + #[test] fn add_pending_verify_rejected_if_in_fetch_queue() { let mut queues = ReplicationQueues::new(); diff --git a/src/replication/types.rs b/src/replication/types.rs index d55a5702..8343ec13 100644 --- a/src/replication/types.rs +++ b/src/replication/types.rs @@ -186,6 +186,12 @@ pub struct KeyVerificationEvidence { pub presence: HashMap, /// Paid-list evidence per peer (from `PaidTargets`). pub paid_list: HashMap, + /// At least one target explicitly reported a valid, in-budget overload. + /// + /// This is distinct from an ordinary timeout/unresolved peer: the caller + /// should defer/retry without burning the normal inconclusive-round budget + /// or terminally deciding that no holders exist. + pub overload_deferred: bool, } // --------------------------------------------------------------------------- From fc9844bc2e551bfa507768a9c357b9e7cc46d98d Mon Sep 17 00:00:00 2001 From: Hermes Agent Date: Mon, 22 Jun 2026 21:52:32 +0100 Subject: [PATCH 4/4] fix(replication): preserve bootstrap blocker on sync overload --- src/replication/mod.rs | 19 +++++++------------ src/replication/types.rs | 13 ++++++------- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 8ea47632..54268269 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -2271,18 +2271,13 @@ async fn handle_neighbor_sync_overload( } sync_trigger.notify_one(); } else { - // A later/priority retry completed with an overload notice. Do not - // keep bootstrap drain blocked on this peer; normal sync cycles can - // try it again later, while the overload budget still bounds - // repeated claims. - clear_overload_deferred_sync_and_check_drain( - peer, - bootstrap_state, - queues, - is_bootstrapping, - bootstrap_complete_notify, - ) - .await; + // A later/priority retry completed with another in-budget overload + // notice. Do not immediately requeue the same peer again — priority + // peers bypass cooldown — but also do not clear any existing + // bootstrap-drain blocker because this peer still has not processed + // the bootstrap sync or returned its hints. Normal neighbour-sync + // cycles will retry later; success/failure or over-budget overload + // clears the blocker. } return; } diff --git a/src/replication/types.rs b/src/replication/types.rs index 8343ec13..14edaf76 100644 --- a/src/replication/types.rs +++ b/src/replication/types.rs @@ -98,13 +98,12 @@ pub struct VerificationEntry { /// `QuorumInconclusive`. /// /// Bounds re-verification of keys that never converge — e.g. targets that - /// are persistently unreachable or that keep replying `Overloaded` (the - /// network-verify path has no per-peer overload-claim budget). The counter - /// only ever reflects an *unbroken* inconclusive streak: a conclusive - /// outcome normally removes the entry, and on the one path where a verified - /// key is retained instead of removed (fetch queue at capacity) - /// [`promote_pending_to_fetch`](crate::replication::scheduling::ReplicationQueues::promote_pending_to_fetch) - /// resets this field to zero. + /// are persistently unreachable, or peers whose `Overloaded` replies have + /// exceeded the per-peer overload-claim budget. The counter only ever + /// reflects an *unbroken* inconclusive streak: a conclusive outcome normally + /// removes the entry. When a pending entry is intentionally retained after a + /// conclusive/deferred outcome (fetch queue at capacity or honored overload + /// deferral), the queue resets this field to zero. pub inconclusive_rounds: u32, }