diff --git a/src/replication/config.rs b/src/replication/config.rs index e2aec73..571c934 100644 --- a/src/replication/config.rs +++ b/src/replication/config.rs @@ -103,17 +103,18 @@ pub const MAX_CONCURRENT_REPLICATION_SENDS: usize = 3; /// Maximum number of concurrent in-flight audit-responder tasks. /// -/// Subtree (round 1) and byte (round 2) challenge handlers are spawned off the -/// serial replication message loop so their disk reads don't stall replication. -/// This caps how many run at once across the engine, restoring backpressure: a -/// peer flooding audit challenges cannot fan out unbounded `get_raw` reads or -/// multi-MiB byte serves. When the cap is hit, the challenge is dropped — the -/// auditor graces a non-response as a timeout, so honest auditors are -/// unaffected and only a flooder is throttled. Sized to cover a handful of -/// concurrent honest auditors (the per-peer gossip-audit cooldown is 30 min, so -/// genuine concurrent audits are few) while bounding the byte round's worst-case -/// resident bytes (`N × MAX_BYTE_CHALLENGE_KEYS × MAX_CHUNK_SIZE`). -pub const MAX_CONCURRENT_AUDIT_RESPONSES: usize = 8; +/// The responsible-chunk (audit #2), subtree (round 1), and byte (round 2) +/// challenge handlers are all spawned off the serial replication message loop so +/// their disk reads don't stall replication. This caps how many run at once +/// across the engine, restoring backpressure: a peer flooding audit challenges +/// cannot fan out unbounded `get_raw` reads or multi-MiB byte serves. When the +/// cap is hit, the challenge is dropped — the auditor graces a non-response as a +/// timeout, so honest auditors are unaffected and only a flooder is throttled. +/// Sized to cover a handful of concurrent honest auditors (the per-peer +/// gossip-audit cooldown is 30 min, so genuine concurrent audits are few) while +/// bounding the byte round's worst-case resident bytes +/// (`N × MAX_BYTE_CHALLENGE_KEYS × MAX_CHUNK_SIZE`). +pub const MAX_CONCURRENT_AUDIT_RESPONSES: usize = 16; /// Maximum concurrent in-flight audit-responder tasks from any SINGLE peer. /// diff --git a/src/replication/mod.rs b/src/replication/mod.rs index b3d827d..0f6394a 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -1691,22 +1691,53 @@ async fn handle_replication_message( ) .await } - ReplicationMessageBody::AuditChallenge(ref challenge) => { + ReplicationMessageBody::AuditChallenge(challenge) => { // Responsible-chunk audit (audit #2) responder: answer with per-key // possession digests. This same handler also answers the // prune-confirmation audit, which sends the same `AuditChallenge` // wire message. + // + // Answering digests the stored bytes of every challenged key, so — + // like the subtree/byte audits below — run it on a detached task off + // this serial message loop. Handling it inline lets one challenge + // block all other replication traffic until its digests complete + // (head-of-line blocking). The same flood-fair admission applies: a + // global ceiling AND a per-peer cap, dropping the challenge if either + // is hit (an honest auditor graces a non-response as a timeout, while + // a flooder is held to its per-peer share and cannot starve others). + let Some(guard) = + admit_audit_responder(audit_responder_semaphore, audit_responder_inflight, source) + .await + else { + warn!( + "Audit challenge reply not sent: kind=responsible response=dropped \ + source={source} (audit-responder capacity reached)" + ); + return Ok(()); + }; let bootstrapping = *is_bootstrapping.read().await; - handle_audit_challenge_msg( - source, - challenge, - storage, - p2p_node, - bootstrapping, - msg.request_id, - rr_message_id, - ) - .await + let storage = Arc::clone(storage); + let p2p_node = Arc::clone(p2p_node); + let source = *source; + let request_id = msg.request_id; + let rr_message_id = rr_message_id.map(ToOwned::to_owned); + tokio::spawn(async move { + let _guard = guard; // global permit + per-peer slot, held until done + if let Err(e) = handle_audit_challenge_msg( + &source, + &challenge, + &storage, + &p2p_node, + bootstrapping, + request_id, + rr_message_id.as_deref(), + ) + .await + { + debug!("Audit challenge from {source} error: {e}"); + } + }); + Ok(()) } ReplicationMessageBody::SubtreeAuditChallenge(challenge) => { // Gossip-triggered storage-bound subtree audit (ADR-0002). The