From bc07c03dccc5663ee514e9ce60dfab54ffd651fc Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 26 Jun 2026 14:18:23 +0200 Subject: [PATCH 1/9] feat(replication): retry fresh-replication delivery up to 2x per peer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First slice of ADR-0003. replicate_fresh now retries each per-peer push up to FRESH_REPLICATION_DELIVERY_MAX_RETRIES on a transport failure, so a transient hiccup doesn't silently drop the offer. The encoded offer is shared via Arc so the common single-attempt path keeps one clone per peer. Delivery assurance only — possession scoring (the delayed 5-15 min check and AuditChallenge-severity penalty) is the next stage. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/replication/config.rs | 9 ++++++++ src/replication/fresh.rs | 46 +++++++++++++++++++++++++++++++++------ 2 files changed, 48 insertions(+), 7 deletions(-) diff --git a/src/replication/config.rs b/src/replication/config.rs index 571c934f..3bfaf3c5 100644 --- a/src/replication/config.rs +++ b/src/replication/config.rs @@ -47,6 +47,15 @@ pub const NEIGHBOR_SYNC_SCOPE: usize = 20; /// round. pub const NEIGHBOR_SYNC_PEER_COUNT: usize = 4; +/// Best-effort delivery retries for a fresh-replication push, per peer. +/// +/// ADR-0003: on a transport/send failure the offer is retried up to this many +/// times so a transient hiccup does not silently drop it. This is delivery +/// assurance only — possession is judged separately by the delayed possession +/// check, which still penalises a close peer that lacks the chunk even if the +/// push never reached it. +pub const FRESH_REPLICATION_DELIVERY_MAX_RETRIES: u32 = 2; + /// Width used when deciding whether this node may locally store or retain a /// chunk. #[must_use] diff --git a/src/replication/fresh.rs b/src/replication/fresh.rs index af3a93a9..7b17bbd3 100644 --- a/src/replication/fresh.rs +++ b/src/replication/fresh.rs @@ -14,7 +14,9 @@ use saorsa_core::P2PNode; use tokio::sync::Semaphore; use crate::ant_protocol::XorName; -use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID}; +use crate::replication::config::{ + ReplicationConfig, FRESH_REPLICATION_DELIVERY_MAX_RETRIES, REPLICATION_PROTOCOL_ID, +}; use crate::replication::paid_list::PaidList; use crate::replication::protocol::{ FreshReplicationOffer, PaidNotify, ReplicationMessage, ReplicationMessageBody, @@ -90,9 +92,13 @@ pub async fn replicate_fresh( ); return; }; + // Share one encoded copy across the per-peer send tasks so a retry only + // re-materialises the buffer for the (consuming) send call, keeping the + // common single-attempt path at one clone per peer. + let encoded = Arc::new(encoded); for peer in &target_peers { let p2p = Arc::clone(p2p_node); - let data = encoded.clone(); + let data = Arc::clone(&encoded); let peer_id = *peer; let sem = Arc::clone(send_semaphore); tokio::spawn(async move { @@ -103,11 +109,37 @@ pub async fn replicate_fresh( "Replication send permit acquired for peer {peer_id} ({} available)", sem.available_permits() ); - if let Err(e) = p2p - .send_message(&peer_id, REPLICATION_PROTOCOL_ID, data, &[]) - .await - { - debug!("Failed to send fresh offer to {peer_id}: {e}"); + // ADR-0003: best-effort delivery. Retry the push up to + // FRESH_REPLICATION_DELIVERY_MAX_RETRIES times on a transport + // failure so a transient hiccup doesn't silently drop the offer. + // Possession is judged separately by the delayed possession check. + let mut attempt = 0u32; + loop { + match p2p + .send_message( + &peer_id, + REPLICATION_PROTOCOL_ID, + data.as_ref().clone(), + &[], + ) + .await + { + Ok(()) => break, + Err(e) => { + if attempt >= FRESH_REPLICATION_DELIVERY_MAX_RETRIES { + debug!( + "Failed to send fresh offer to {peer_id} after {} attempts: {e}", + attempt + 1 + ); + break; + } + attempt += 1; + debug!( + "Retrying fresh offer to {peer_id} (attempt {}): {e}", + attempt + 1 + ); + } + } } }); } From 9d43c2708d4c9bf5a9a69b77a72abf0e8f944644 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 26 Jun 2026 15:04:48 +0200 Subject: [PATCH 2/9] build(deps): pin saorsa-core to PR #119 git branch to fix version skew ant-protocol 2.2.0 pulled saorsa-core 0.26.0 from crates.io while the node used a local path of the same version, producing two copies that collided on shared types (e.g. saorsa_core::address::MultiAddr) and broke the ant-devnet binary. Point the direct dependency at WithAutonomi/saorsa-core PR #119 (trust quarantine thresholds) and add a matching [patch.crates-io] so the transitive copy unifies onto one source. Full workspace now builds. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 3 +-- Cargo.toml | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e212fa4b..91545e1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4867,8 +4867,7 @@ dependencies = [ [[package]] name = "saorsa-core" version = "0.26.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa8cc1b7f59f97d018760ff150bbb4f217197c41622b83f7085c9cf0424b736e" +source = "git+https://github.com/WithAutonomi/saorsa-core?branch=feat/trust-quarantine-thresholds#3afe290442df42a7b6ca8989f860d185d1f6f9b4" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 7d08918a..88f570a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,8 +41,11 @@ mimalloc = "0.1" # with ant-protocol's re-exports. ant-protocol = "2.2.0" -# Core (provides EVERYTHING: networking, DHT, security, trust, storage) -saorsa-core = "0.26.0" +# Core (provides EVERYTHING: networking, DHT, security, trust, storage). +# Pinned to WithAutonomi/saorsa-core PR #119 (trust quarantine thresholds); +# the matching `[patch.crates-io]` below redirects ant-protocol's transitive +# saorsa-core to the same source so Cargo unifies on one copy. +saorsa-core = { git = "https://github.com/WithAutonomi/saorsa-core", branch = "feat/trust-quarantine-thresholds" } saorsa-pqc = "0.5" # Payment verification - autonomi network lookup + EVM payment @@ -196,3 +199,11 @@ unused_async = "allow" cognitive_complexity = "allow" # Allow non-const functions during initial development (may need runtime features later) missing_const_for_fn = "allow" + +[patch.crates-io] +# Redirect the saorsa-core that ant-protocol (and other crates) pull from +# crates.io onto the same git source as the node's direct dependency, +# eliminating the duplicate 0.26.0 copies (crates.io vs git) that otherwise +# collide on shared types such as `saorsa_core::address::MultiAddr`. +# Tracks WithAutonomi/saorsa-core PR #119 (trust quarantine thresholds). +saorsa-core = { git = "https://github.com/WithAutonomi/saorsa-core", branch = "feat/trust-quarantine-thresholds" } From 260e31c717df5a55f65d1275097005564bca2606 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 26 Jun 2026 15:13:55 +0200 Subject: [PATCH 3/9] feat(storage): add self-closeness gate on client PUTs Implements the ADR-0003 gate. A client PUT is now accepted only when this node is within its own local SELF_CLOSENESS_GATE_WIDTH (= K_BUCKET_SIZE) closest peers to the address, so the fresh replication it triggers is legitimate and cannot mis-penalise honest peers. The width equals the client's PUT fallback ceiling (ADR-0002), so a client routing past full close-group members onto further peers is still accepted; a genuinely far node is turned away. The P2P handle is bound out of the lock before the await, and the gate no-ops when no handle is attached (unit tests). Fresh-replication receives keep their own narrower admission check. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/storage/handler.rs | 45 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/src/storage/handler.rs b/src/storage/handler.rs index 586b5178..13c52d08 100644 --- a/src/storage/handler.rs +++ b/src/storage/handler.rs @@ -38,13 +38,27 @@ use crate::client::compute_address; use crate::error::{Error, Result}; use crate::logging::{debug, info, warn}; use crate::payment::{PaymentVerifier, QuoteGenerator, VerificationContext}; +use crate::replication::admission; +use crate::replication::config::K_BUCKET_SIZE; use crate::replication::fresh::FreshWriteEvent; use crate::storage::lmdb::LmdbStorage; use bytes::Bytes; +use parking_lot::RwLock; use saorsa_core::P2PNode; use std::sync::Arc; use tokio::sync::mpsc; +/// Width of the self-closeness gate on client PUTs (ADR-0003): a node accepts +/// a PUT only when it is within its own local `SELF_CLOSENESS_GATE_WIDTH` +/// closest peers to the address. +/// +/// Set to the client's PUT fallback ceiling (`K_BUCKET_SIZE`), wider than the +/// storage-admission width, so a client routing past full close-group members +/// onto further peers (ADR-0002) is still accepted here, while a genuinely far +/// node — which could only mis-attribute fresh-replication failures — is +/// turned away. +const SELF_CLOSENESS_GATE_WIDTH: usize = K_BUCKET_SIZE; + /// ANT protocol handler. /// /// Handles chunk PUT/GET/Quote requests using LMDB storage for persistence @@ -59,6 +73,10 @@ pub struct AntProtocol { quote_generator: Arc, /// Channel for notifying the replication engine about newly-stored chunks. fresh_write_tx: Option>, + /// The node's P2P handle, attached post-construction via + /// `attach_p2p_node`. Drives the self-closeness gate on client PUTs; + /// `None` in unit tests that never attach a node. + p2p_node: RwLock>>, } impl AntProtocol { @@ -90,6 +108,7 @@ impl AntProtocol { payment_verifier, quote_generator, fresh_write_tx: None, + p2p_node: RwLock::new(None), } } @@ -99,8 +118,9 @@ impl AntProtocol { /// checks can use the live routing view. Idempotent: calling twice /// replaces the verifier handle. pub fn attach_p2p_node(&self, node: Arc) { + *self.p2p_node.write() = Some(Arc::clone(&node)); self.payment_verifier.attach_p2p_node(node); - debug!("AntProtocol: P2PNode attached for payment live-DHT checks"); + debug!("AntProtocol: P2PNode attached for payment live-DHT checks and self-closeness gate"); } /// Set the channel sender for fresh-write replication events. @@ -280,9 +300,28 @@ impl AntProtocol { return ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string())); } + // Self-closeness gate (ADR-0003): accept a client PUT only when this + // node is within its own local closest view of the address, so the + // fresh replication it triggers is legitimate and cannot mis-penalise + // honest peers. The width is the client's PUT fallback ceiling + // (`SELF_CLOSENESS_GATE_WIDTH`), so a client routing past full + // close-group members onto further peers is still accepted here. + // Skipped when no P2P handle is attached (unit tests). Bind the handle + // out of the lock first so no guard is held across the `.await`. + let attached = self.p2p_node.read().as_ref().map(Arc::clone); + if let Some(p2p) = attached { + let self_id = *p2p.peer_id(); + if !admission::is_responsible(&self_id, &address, &p2p, SELF_CLOSENESS_GATE_WIDTH).await + { + debug!("Rejecting PUT for {addr_hex}: not within local closest peers"); + return ChunkPutResponse::Error(ProtocolError::StorageFailed( + "node is not within its local closest peers for this address".to_string(), + )); + } + } + // 5. Verify payment. The ClientPut context applies the store-strength - // payment cache and verifies live proofs. Direct client PUT does not - // reject based on this node's local storage-responsibility view. + // payment cache and verifies live proofs. let payment_result = self .payment_verifier .verify_payment( From 50b472a3335b3d96a38bf90b473b3da4f43dd56c Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 26 Jun 2026 15:29:05 +0200 Subject: [PATCH 4/9] feat(replication): add delayed possession-check scheduler Implements the detection+penalty core of ADR-0003. After fresh replication, replicate_fresh now returns the responsible close-group peers; the fresh-write drainer enqueues a possession-check event, and a new scheduler task waits a randomised 5-15 minute settle delay (POSSESSION_CHECK_DELAY_MIN/MAX) before probing every responsible peer for actual possession via a presence-only VerificationRequest. A peer confirmed absent is penalised at AuditChallenge severity (report_trust_event(ApplicationFailure(AUDIT_FAILURE_TRUST_WEIGHT))); a peer that holds it earns nothing; an unreachable peer yields no verdict and is re-probed under a bounded grace, never penalised. Every responsible peer is tested regardless of whether the original push reached it. New possession module, config tunables, and a delay-bounds unit test. 376 replication tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/replication/config.rs | 32 ++++++ src/replication/fresh.rs | 13 ++- src/replication/mod.rs | 71 ++++++++++++- src/replication/possession.rs | 192 ++++++++++++++++++++++++++++++++++ 4 files changed, 302 insertions(+), 6 deletions(-) create mode 100644 src/replication/possession.rs diff --git a/src/replication/config.rs b/src/replication/config.rs index 3bfaf3c5..7f988be0 100644 --- a/src/replication/config.rs +++ b/src/replication/config.rs @@ -56,6 +56,38 @@ pub const NEIGHBOR_SYNC_PEER_COUNT: usize = 4; /// push never reached it. pub const FRESH_REPLICATION_DELIVERY_MAX_RETRIES: u32 = 2; +const POSSESSION_CHECK_DELAY_MIN_SECS: u64 = 5 * 60; +const POSSESSION_CHECK_DELAY_MAX_SECS: u64 = 15 * 60; +const POSSESSION_CHECK_TIMEOUT_SECS: u64 = 15; +const POSSESSION_CHECK_RETRY_BACKOFF_SECS: u64 = 30; + +/// Lower bound of the delay before a fresh-replication possession check runs +/// (ADR-0003). +/// +/// The delay lets replication settle so an honest peer still mid-store is not +/// judged prematurely, and makes the check unpredictable to the peer. +pub const POSSESSION_CHECK_DELAY_MIN: Duration = + Duration::from_secs(POSSESSION_CHECK_DELAY_MIN_SECS); + +/// Upper bound of the possession-check delay (ADR-0003). +pub const POSSESSION_CHECK_DELAY_MAX: Duration = + Duration::from_secs(POSSESSION_CHECK_DELAY_MAX_SECS); + +/// Per-peer request timeout for a single possession probe (ADR-0003). +pub const POSSESSION_CHECK_TIMEOUT: Duration = Duration::from_secs(POSSESSION_CHECK_TIMEOUT_SECS); + +/// Maximum possession-probe attempts per peer before giving up without a +/// verdict (ADR-0003). +/// +/// A peer unreachable at check time is re-attempted under this bounded grace +/// and is never penalised as absent. +pub const POSSESSION_CHECK_MAX_ATTEMPTS: u32 = 3; + +/// Backoff between possession-probe attempts when a peer yields no verdict +/// (ADR-0003). +pub const POSSESSION_CHECK_RETRY_BACKOFF: Duration = + Duration::from_secs(POSSESSION_CHECK_RETRY_BACKOFF_SECS); + /// Width used when deciding whether this node may locally store or retain a /// chunk. #[must_use] diff --git a/src/replication/fresh.rs b/src/replication/fresh.rs index 7b17bbd3..80e9766e 100644 --- a/src/replication/fresh.rs +++ b/src/replication/fresh.rs @@ -38,9 +38,10 @@ pub struct FreshWriteEvent { /// Execute fresh replication for a newly accepted record. /// -/// Sends fresh offers to close group members and `PaidNotify` to -/// `PaidCloseGroup`. Both are fire-and-forget (no ack tracking or retry per -/// Section 6.1, rule 8). +/// Sends fresh offers to close group members (with bounded delivery retries, +/// ADR-0003) and `PaidNotify` to `PaidCloseGroup`. Returns the close-group +/// peers responsible for the key (excluding self) so the caller can schedule +/// the delayed possession check; `PaidNotify` remains fire-and-forget. /// /// The `send_semaphore` limits how many outbound chunk transfers can be /// in-flight concurrently across the entire replication engine, preventing @@ -53,7 +54,7 @@ pub async fn replicate_fresh( paid_list: &Arc, config: &ReplicationConfig, send_semaphore: &Arc, -) { +) -> Vec { let self_id = *p2p_node.peer_id(); // Rule 6: Node that validates PoP adds K to PaidForList(self). @@ -90,7 +91,7 @@ pub async fn replicate_fresh( "Failed to encode FreshReplicationOffer for {}", hex::encode(key), ); - return; + return Vec::new(); }; // Share one encoded copy across the per-peer send tasks so a retry only // re-materialises the buffer for the (consuming) send call, keeping the @@ -154,6 +155,8 @@ pub async fn replicate_fresh( hex::encode(key), target_peers.len() ); + + target_peers } /// Send `PaidNotify(K)` to every peer in `PaidCloseGroup(K)` (fire-and-forget). diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 0f6394a4..d0e6e817 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -23,6 +23,7 @@ pub mod config; pub mod fresh; pub mod neighbor_sync; pub mod paid_list; +pub mod possession; pub mod protocol; pub mod pruning; pub mod quorum; @@ -311,6 +312,12 @@ pub struct ReplicationEngine { /// When present, `start()` spawns a drainer task that calls /// `replicate_fresh` for each event. fresh_write_rx: Option>, + /// Sender for delayed possession-check events (ADR-0003). The fresh-write + /// drainer pushes the responsible close-group peers here after each fresh + /// replication; the possession-check scheduler drains the paired receiver. + possession_check_tx: mpsc::UnboundedSender, + /// Receiver paired with `possession_check_tx`; taken by the scheduler task. + possession_check_rx: Option>, /// Shutdown token. shutdown: CancellationToken, /// Background task handles. @@ -345,6 +352,7 @@ impl ReplicationEngine { let initial_neighbors = NeighborSyncState::new_cycle(Vec::new()); let config = Arc::new(config); + let (possession_check_tx, possession_check_rx) = mpsc::unbounded_channel(); Ok(Self { config: Arc::clone(&config), @@ -373,6 +381,8 @@ impl ReplicationEngine { audit_responder_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_AUDIT_RESPONSES)), audit_responder_inflight: Arc::new(RwLock::new(HashMap::new())), fresh_write_rx: Some(fresh_write_rx), + possession_check_tx, + possession_check_rx: Some(possession_check_rx), shutdown, task_handles: Vec::new(), }) @@ -500,6 +510,7 @@ impl ReplicationEngine { self.start_verification_worker(); self.start_bootstrap_sync(dht_events); self.start_fresh_write_drainer(); + self.start_possession_check_scheduler(); info!( "Replication engine started with {} background tasks", @@ -594,6 +605,7 @@ impl ReplicationEngine { let paid_list = Arc::clone(&self.paid_list); let config = Arc::clone(&self.config); let send_semaphore = Arc::clone(&self.send_semaphore); + let possession_tx = self.possession_check_tx.clone(); let shutdown = self.shutdown.clone(); let handle = tokio::spawn(async move { @@ -602,7 +614,7 @@ impl ReplicationEngine { () = shutdown.cancelled() => break, event = rx.recv() => { let Some(event) = event else { break }; - fresh::replicate_fresh( + let peers = fresh::replicate_fresh( &event.key, &event.data, &event.payment_proof, @@ -612,6 +624,15 @@ impl ReplicationEngine { &send_semaphore, ) .await; + // Schedule the delayed possession check (ADR-0003) for + // the responsible close-group peers. A closed receiver + // (engine shutting down) is ignored. + if !peers.is_empty() { + let _ = possession_tx.send(possession::PossessionCheckEvent { + key: event.key, + peers, + }); + } } } } @@ -620,6 +641,54 @@ impl ReplicationEngine { self.task_handles.push(handle); } + /// Spawn the possession-check scheduler (ADR-0003). + /// + /// Drains scheduled possession-check events and, for each, waits a + /// randomised 5-15 minute settle delay before probing every responsible + /// peer for actual possession. A peer that lacks the chunk is penalised at + /// `AuditChallenge` severity; an unreachable peer is re-probed under a + /// bounded grace and never penalised. + fn start_possession_check_scheduler(&mut self) { + let Some(mut rx) = self.possession_check_rx.take() else { + return; + }; + let p2p = Arc::clone(&self.p2p_node); + let shutdown = self.shutdown.clone(); + + let handle = tokio::spawn(async move { + loop { + tokio::select! { + () = shutdown.cancelled() => break, + event = rx.recv() => { + let Some(event) = event else { break }; + // Spawn a per-chunk delayed check so the drain loop + // keeps pace with the write rate. Each check sleeps the + // randomised settle delay, then probes every peer. + let p2p = Arc::clone(&p2p); + let shutdown = shutdown.clone(); + tokio::spawn(async move { + let delay = possession::random_delay(); + tokio::select! { + () = shutdown.cancelled() => {} + () = tokio::time::sleep(delay) => { + possession::run_possession_check( + event.key, + event.peers, + &p2p, + &shutdown, + ) + .await; + } + } + }); + } + } + } + debug!("Possession-check scheduler shut down"); + }); + self.task_handles.push(handle); + } + #[allow(clippy::too_many_lines)] fn start_message_handler(&mut self) { let mut p2p_events = self.p2p_node.subscribe_events(); diff --git a/src/replication/possession.rs b/src/replication/possession.rs new file mode 100644 index 00000000..5ccefb77 --- /dev/null +++ b/src/replication/possession.rs @@ -0,0 +1,192 @@ +//! Delayed possession verification for fresh replication (ADR-0003). +//! +//! After a node fresh-replicates a chunk, every close-group peer responsible +//! for it is checked 5-15 minutes later for actual possession. A peer that +//! holds the chunk earns nothing — storing what it was paid to store is the +//! baseline expectation, not meritorious; a peer confirmed *not* to hold it is +//! penalised at `AuditChallenge` severity. Delivery of the original push is +//! irrelevant: a peer the push never reached is still checked and penalised if +//! it lacks the chunk. A peer merely unreachable at check time yields no +//! verdict — it is re-attempted under a bounded grace and never penalised as +//! absent. + +use std::sync::Arc; +use std::time::Duration; + +use rand::Rng; +use saorsa_core::identity::PeerId; +use saorsa_core::{P2PNode, TrustEvent}; +use tokio_util::sync::CancellationToken; + +use crate::ant_protocol::XorName; +use crate::logging::{debug, warn}; +use crate::replication::config::{ + self, POSSESSION_CHECK_DELAY_MAX, POSSESSION_CHECK_DELAY_MIN, POSSESSION_CHECK_MAX_ATTEMPTS, + POSSESSION_CHECK_RETRY_BACKOFF, POSSESSION_CHECK_TIMEOUT, REPLICATION_PROTOCOL_ID, +}; +use crate::replication::protocol::{ + ReplicationMessage, ReplicationMessageBody, VerificationRequest, +}; + +/// A scheduled possession check for one freshly-replicated chunk. +pub struct PossessionCheckEvent { + /// Content-address of the chunk. + pub key: XorName, + /// Close-group peers responsible for holding it (excludes self). + pub peers: Vec, +} + +/// Verdict of probing a single peer for possession of a chunk. +enum ProbeOutcome { + /// Peer confirmed it holds the chunk. + Present, + /// Peer confirmed it does not hold the chunk. + Absent, + /// No verdict obtained (timeout / transport error / malformed response). + NoVerdict, +} + +/// Pick a randomised delay in `[POSSESSION_CHECK_DELAY_MIN, +/// POSSESSION_CHECK_DELAY_MAX]` to wait before a possession check runs. +#[must_use] +pub fn random_delay() -> Duration { + let to_millis = |d: Duration| u64::try_from(d.as_millis()).unwrap_or(u64::MAX); + let min = to_millis(POSSESSION_CHECK_DELAY_MIN); + let max = to_millis(POSSESSION_CHECK_DELAY_MAX); + if min >= max { + return POSSESSION_CHECK_DELAY_MIN; + } + Duration::from_millis(rand::thread_rng().gen_range(min..=max)) +} + +/// Run the possession check for one chunk against every responsible peer. +/// +/// Penalises each peer confirmed absent at `AuditChallenge` severity, leaves +/// present peers unrewarded, and never penalises a peer that only failed to +/// yield a verdict. +pub async fn run_possession_check( + key: XorName, + peers: Vec, + p2p_node: &Arc, + shutdown: &CancellationToken, +) { + let key_hex = hex::encode(key); + for peer in peers { + if shutdown.is_cancelled() { + return; + } + match probe_with_grace(&key, &peer, p2p_node, shutdown).await { + ProbeOutcome::Present => { + debug!("Possession check: {peer} holds {key_hex}"); + } + ProbeOutcome::Absent => { + warn!( + "Possession check: {peer} is missing {key_hex}; penalising at audit severity" + ); + p2p_node + .report_trust_event( + &peer, + TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT), + ) + .await; + } + ProbeOutcome::NoVerdict => { + debug!( + "Possession check: no verdict from {peer} for {key_hex} after grace; \ + not penalised" + ); + } + } + } +} + +/// Probe a peer for possession, re-attempting on no-verdict up to the grace +/// bound. A definite Present/Absent verdict short-circuits immediately. +async fn probe_with_grace( + key: &XorName, + peer: &PeerId, + p2p_node: &Arc, + shutdown: &CancellationToken, +) -> ProbeOutcome { + for attempt in 1..=POSSESSION_CHECK_MAX_ATTEMPTS { + match probe_once(key, peer, p2p_node).await { + ProbeOutcome::NoVerdict if attempt < POSSESSION_CHECK_MAX_ATTEMPTS => { + tokio::select! { + () = shutdown.cancelled() => return ProbeOutcome::NoVerdict, + () = tokio::time::sleep(POSSESSION_CHECK_RETRY_BACKOFF) => {} + } + } + outcome => return outcome, + } + } + ProbeOutcome::NoVerdict +} + +/// Send one presence-only `VerificationRequest` and interpret the response. +async fn probe_once(key: &XorName, peer: &PeerId, p2p_node: &Arc) -> ProbeOutcome { + let request = VerificationRequest { + keys: vec![*key], + // Presence-only: no paid-list status is needed to judge possession. + paid_list_check_indices: Vec::new(), + }; + let msg = ReplicationMessage { + request_id: rand::random(), + body: ReplicationMessageBody::VerificationRequest(request), + }; + let Ok(encoded) = msg.encode() else { + warn!( + "Failed to encode possession request for {}", + hex::encode(key) + ); + return ProbeOutcome::NoVerdict; + }; + + let response = match p2p_node + .send_request( + peer, + REPLICATION_PROTOCOL_ID, + encoded, + POSSESSION_CHECK_TIMEOUT, + ) + .await + { + Ok(response) => response, + Err(e) => { + debug!("Possession probe to {peer} failed: {e}"); + return ProbeOutcome::NoVerdict; + } + }; + + let decoded = match ReplicationMessage::decode(&response.data) { + Ok(decoded) => decoded, + Err(e) => { + debug!("Failed to decode possession response from {peer}: {e}"); + return ProbeOutcome::NoVerdict; + } + }; + + let ReplicationMessageBody::VerificationResponse(resp) = decoded.body else { + debug!("Unexpected possession response type from {peer}"); + return ProbeOutcome::NoVerdict; + }; + + match resp.results.iter().find(|r| r.key == *key) { + Some(result) if result.present => ProbeOutcome::Present, + Some(_) => ProbeOutcome::Absent, + None => ProbeOutcome::NoVerdict, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn random_delay_is_within_bounds() { + for _ in 0..100 { + let d = random_delay(); + assert!(d >= POSSESSION_CHECK_DELAY_MIN); + assert!(d <= POSSESSION_CHECK_DELAY_MAX); + } + } +} From 04341849c8d3a685f29f77877f8d416ed4b3d27d Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 26 Jun 2026 16:06:26 +0200 Subject: [PATCH 5/9] test(replication): e2e proof of possession-check penalty Adds a test-only ReplicationEngine::run_possession_check_now that drives the ADR-0003 possession check without the 5-15 min scheduler delay, plus an e2e test on a 10-node testnet: node A probes an absent peer B and a present peer C over real transport; B's trust score drops (penalised, the signal saorsa-core eviction acts on) while C's is untouched. Proves the detection+penalty core deterministically. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/replication/mod.rs | 11 ++++++ tests/e2e/replication.rs | 80 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/src/replication/mod.rs b/src/replication/mod.rs index d0e6e817..64afd604 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -485,6 +485,17 @@ impl ReplicationEngine { .await } + /// Test-only: run the possession check immediately for `key` against + /// `peers`, bypassing the scheduler's randomised 5-15 minute settle delay. + /// + /// Penalises any peer that does not hold `key` at `AuditChallenge` + /// severity (ADR-0003). Lets e2e tests assert the detection+penalty path + /// deterministically without waiting for the scheduled check. + #[cfg(any(test, feature = "test-utils"))] + pub async fn run_possession_check_now(&self, key: XorName, peers: Vec) { + possession::run_possession_check(key, peers, &self.p2p_node, &self.shutdown).await; + } + /// Start all background tasks. /// /// `dht_events` must be subscribed **before** `P2PNode::start()` so that diff --git a/tests/e2e/replication.rs b/tests/e2e/replication.rs index eb8ee49f..11342a4f 100644 --- a/tests/e2e/replication.rs +++ b/tests/e2e/replication.rs @@ -252,6 +252,86 @@ async fn test_fresh_replication_propagates_to_close_group() { harness.teardown().await.expect("teardown"); } +/// ADR-0003: the delayed possession check penalises a responsible peer that +/// does NOT hold the chunk, and leaves a peer that DOES hold it unpenalised. +/// +/// Drives the check directly (`run_possession_check_now`, bypassing the 5-15 +/// minute settle delay) so the detection+penalty path is asserted +/// deterministically over real transport. The penalty is observed as a drop in +/// the checker's trust score for the absent peer (the same signal saorsa-core +/// eviction acts on), via `P2PNode::peer_trust`. +#[tokio::test] +#[serial] +async fn possession_check_penalises_absent_peer_only() { + let harness = TestHarness::setup_small().await.expect("setup"); + harness.warmup_dht().await.expect("warmup"); + + // A is the checker; B will be absent, C will hold the chunk. All three are + // regular nodes (idx >= 3) with running replication engines and storage. + let a = harness.test_node(3).expect("node a"); + let b = harness.test_node(4).expect("node b"); + let c = harness.test_node(5).expect("node c"); + + let p2p_a = a.p2p_node.as_ref().expect("p2p a"); + let engine_a = a.replication_engine.as_ref().expect("engine a"); + let peer_b = *b.p2p_node.as_ref().expect("p2p b").peer_id(); + let peer_c = *c.p2p_node.as_ref().expect("p2p c").peer_id(); + + let content = b"adr-0003 possession-check payload"; + let address = compute_address(content); + + // C holds the chunk; B never stores it. + c.ant_protocol + .as_ref() + .expect("proto c") + .storage() + .put(&address, content) + .await + .expect("put on c"); + + assert!( + !b.ant_protocol + .as_ref() + .expect("proto b") + .storage() + .exists(&address) + .expect("exists b"), + "precondition: B must not hold the chunk" + ); + assert!( + c.ant_protocol + .as_ref() + .expect("proto c") + .storage() + .exists(&address) + .expect("exists c"), + "precondition: C must hold the chunk" + ); + + let trust_b_before = p2p_a.peer_trust(&peer_b); + let trust_c_before = p2p_a.peer_trust(&peer_c); + + // Probe both peers now (no scheduler delay). B is absent -> penalised; C is + // present -> untouched. + engine_a + .run_possession_check_now(address, vec![peer_b, peer_c]) + .await; + + let trust_b_after = p2p_a.peer_trust(&peer_b); + let trust_c_after = p2p_a.peer_trust(&peer_c); + + assert!( + trust_b_after < trust_b_before, + "absent peer B must be penalised: {trust_b_before} -> {trust_b_after}" + ); + assert!( + trust_c_after >= trust_c_before - f64::EPSILON, + "present peer C must not be penalised: {trust_c_before} -> {trust_c_after}" + ); + + harness.teardown().await.expect("teardown"); +} + /// `PaidForList` persistence (Section 18 #43). /// /// Insert a key into the `PaidList`, verify it persists by reopening the From fe9e4447a9fe28bbbd8cf30f946b2cf5737a6d1f Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 26 Jun 2026 16:32:49 +0200 Subject: [PATCH 6/9] test(replication): e2e proof of possession-check scheduler wiring Proves the full replicate_fresh -> enqueue -> delayed scheduler -> penalty chain. Makes the 5-15 min possession delay a ReplicationConfig field (POSSESSION_CHECK_DELAY_MIN/MAX as defaults) so tests can shorten it, threads an optional replication_config override through TestNetworkConfig, and has the public replicate_fresh entry point schedule the possession check (the production PUT path already does via the drainer). The e2e test runs a 10-node testnet with a ~200-500ms delay, triggers fresh replication with no payment cache so close peers reject and are absent, and asserts the scheduled check penalises an absent close peer. 675 lib tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/replication/config.rs | 9 +++++ src/replication/mod.rs | 18 +++++++-- src/replication/possession.rs | 24 ++++++------ tests/e2e/replication.rs | 74 +++++++++++++++++++++++++++++++++++ tests/e2e/testnet.rs | 8 +++- 5 files changed, 118 insertions(+), 15 deletions(-) diff --git a/src/replication/config.rs b/src/replication/config.rs index 7f988be0..98d92594 100644 --- a/src/replication/config.rs +++ b/src/replication/config.rs @@ -469,6 +469,13 @@ pub struct ReplicationConfig { /// Seconds to wait for `DhtNetworkEvent::BootstrapComplete` before /// proceeding with bootstrap sync (covers bootstrap nodes with no peers). pub bootstrap_complete_timeout_secs: u64, + /// Lower bound of the delay before a fresh-replication possession check + /// runs (ADR-0003). Defaults to [`POSSESSION_CHECK_DELAY_MIN`]; tests + /// shorten it so the scheduled check fires quickly. + pub possession_check_delay_min: Duration, + /// Upper bound of the possession-check delay window (ADR-0003). Defaults + /// to [`POSSESSION_CHECK_DELAY_MAX`]. + pub possession_check_delay_max: Duration, } impl Default for ReplicationConfig { @@ -495,6 +502,8 @@ impl Default for ReplicationConfig { verification_request_timeout: VERIFICATION_REQUEST_TIMEOUT, fetch_request_timeout: FETCH_REQUEST_TIMEOUT, bootstrap_complete_timeout_secs: BOOTSTRAP_COMPLETE_TIMEOUT_SECS, + possession_check_delay_min: POSSESSION_CHECK_DELAY_MIN, + possession_check_delay_max: POSSESSION_CHECK_DELAY_MAX, } } } diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 64afd604..3f15d4a9 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -588,9 +588,13 @@ impl ReplicationEngine { self.sync_trigger.notify_one(); } - /// Execute fresh replication for a newly stored record. + /// Execute fresh replication for a newly stored record, then schedule the + /// delayed possession check for the responsible close-group peers + /// (ADR-0003). The production PUT path schedules via the fresh-write + /// drainer; this direct entry point schedules here so callers (and tests) + /// that drive replication directly still get the possession check. pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) { - fresh::replicate_fresh( + let peers = fresh::replicate_fresh( key, data, proof_of_payment, @@ -600,6 +604,11 @@ impl ReplicationEngine { &self.send_semaphore, ) .await; + if !peers.is_empty() { + let _ = self + .possession_check_tx + .send(possession::PossessionCheckEvent { key: *key, peers }); + } } // ======================================================================= @@ -664,6 +673,7 @@ impl ReplicationEngine { return; }; let p2p = Arc::clone(&self.p2p_node); + let config = Arc::clone(&self.config); let shutdown = self.shutdown.clone(); let handle = tokio::spawn(async move { @@ -677,8 +687,10 @@ impl ReplicationEngine { // randomised settle delay, then probes every peer. let p2p = Arc::clone(&p2p); let shutdown = shutdown.clone(); + let delay_min = config.possession_check_delay_min; + let delay_max = config.possession_check_delay_max; tokio::spawn(async move { - let delay = possession::random_delay(); + let delay = possession::random_delay(delay_min, delay_max); tokio::select! { () = shutdown.cancelled() => {} () = tokio::time::sleep(delay) => { diff --git a/src/replication/possession.rs b/src/replication/possession.rs index 5ccefb77..79b3b6a2 100644 --- a/src/replication/possession.rs +++ b/src/replication/possession.rs @@ -21,8 +21,8 @@ use tokio_util::sync::CancellationToken; use crate::ant_protocol::XorName; use crate::logging::{debug, warn}; use crate::replication::config::{ - self, POSSESSION_CHECK_DELAY_MAX, POSSESSION_CHECK_DELAY_MIN, POSSESSION_CHECK_MAX_ATTEMPTS, - POSSESSION_CHECK_RETRY_BACKOFF, POSSESSION_CHECK_TIMEOUT, REPLICATION_PROTOCOL_ID, + self, POSSESSION_CHECK_MAX_ATTEMPTS, POSSESSION_CHECK_RETRY_BACKOFF, POSSESSION_CHECK_TIMEOUT, + REPLICATION_PROTOCOL_ID, }; use crate::replication::protocol::{ ReplicationMessage, ReplicationMessageBody, VerificationRequest, @@ -46,17 +46,18 @@ enum ProbeOutcome { NoVerdict, } -/// Pick a randomised delay in `[POSSESSION_CHECK_DELAY_MIN, -/// POSSESSION_CHECK_DELAY_MAX]` to wait before a possession check runs. +/// Pick a randomised delay in `[min, max]` to wait before a possession check +/// runs. The bounds come from `ReplicationConfig` (defaulting to +/// `POSSESSION_CHECK_DELAY_MIN`/`MAX`) so tests can shorten them. #[must_use] -pub fn random_delay() -> Duration { +pub fn random_delay(min: Duration, max: Duration) -> Duration { let to_millis = |d: Duration| u64::try_from(d.as_millis()).unwrap_or(u64::MAX); - let min = to_millis(POSSESSION_CHECK_DELAY_MIN); - let max = to_millis(POSSESSION_CHECK_DELAY_MAX); - if min >= max { - return POSSESSION_CHECK_DELAY_MIN; + let min_ms = to_millis(min); + let max_ms = to_millis(max); + if min_ms >= max_ms { + return min; } - Duration::from_millis(rand::thread_rng().gen_range(min..=max)) + Duration::from_millis(rand::thread_rng().gen_range(min_ms..=max_ms)) } /// Run the possession check for one chunk against every responsible peer. @@ -180,11 +181,12 @@ async fn probe_once(key: &XorName, peer: &PeerId, p2p_node: &Arc) -> Pr #[cfg(test)] mod tests { use super::*; + use crate::replication::config::{POSSESSION_CHECK_DELAY_MAX, POSSESSION_CHECK_DELAY_MIN}; #[test] fn random_delay_is_within_bounds() { for _ in 0..100 { - let d = random_delay(); + let d = random_delay(POSSESSION_CHECK_DELAY_MIN, POSSESSION_CHECK_DELAY_MAX); assert!(d >= POSSESSION_CHECK_DELAY_MIN); assert!(d <= POSSESSION_CHECK_DELAY_MAX); } diff --git a/tests/e2e/replication.rs b/tests/e2e/replication.rs index 11342a4f..ca62c959 100644 --- a/tests/e2e/replication.rs +++ b/tests/e2e/replication.rs @@ -5,6 +5,7 @@ #![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] +use super::testnet::TestNetworkConfig; use super::TestHarness; use ant_node::client::compute_address; use ant_node::replication::commitment_state::{BuiltCommitment, ResponderCommitmentState}; @@ -332,6 +333,79 @@ async fn possession_check_penalises_absent_peer_only() { harness.teardown().await.expect("teardown"); } +/// ADR-0003: the possession-check *scheduler* (not the direct-drive path) +/// fires after the configured delay and penalises an absent close peer. +/// +/// Uses a shortened possession delay so the scheduled check runs in well under +/// a second. No payment cache is pre-populated, so the close-group peers reject +/// the fresh offer and are absent when the scheduled check probes them. Proves +/// the `replicate_fresh` -> enqueue -> delayed scheduler -> penalty wiring. +#[tokio::test] +#[serial] +async fn possession_scheduler_penalises_absent_close_peer_after_delay() { + let mut net_config = TestNetworkConfig::small(); + net_config.replication_config = Some(ReplicationConfig { + possession_check_delay_min: Duration::from_millis(200), + possession_check_delay_max: Duration::from_millis(500), + ..ReplicationConfig::default() + }); + let harness = TestHarness::setup_with_config(net_config) + .await + .expect("setup"); + harness.warmup_dht().await.expect("warmup"); + + let a = harness.test_node(3).expect("node a"); + let p2p_a = a.p2p_node.as_ref().expect("p2p a"); + let engine_a = a.replication_engine.as_ref().expect("engine a"); + let self_a = *p2p_a.peer_id(); + + let content = b"adr-0003 scheduler-wiring payload"; + let address = compute_address(content); + + // A's close group for this key = exactly the peers the scheduled possession + // check targets. With no payment cache anywhere, they reject the fresh offer + // and are absent when probed. + let close_group_size = ReplicationConfig::default().close_group_size; + let close_group: Vec = p2p_a + .dht_manager() + .find_closest_nodes_local_with_self(&address, close_group_size) + .await + .iter() + .filter(|n| n.peer_id != self_a) + .map(|n| n.peer_id) + .collect(); + assert!(!close_group.is_empty(), "expected a non-empty close group"); + + let trust_before: Vec = close_group.iter().map(|p| p2p_a.peer_trust(p)).collect(); + + // Trigger fresh replication; the engine enqueues the possession check, which + // fires ~200-500 ms later and penalises the absent close peers. + let dummy_pop = [0x01u8; 64]; + engine_a + .replicate_fresh(&address, content, &dummy_pop) + .await; + + // Poll until at least one absent close peer is penalised (trust drops). + let deadline = tokio::time::Instant::now() + PROPAGATION_TIMEOUT; + let mut penalised = false; + while tokio::time::Instant::now() < deadline { + penalised = close_group + .iter() + .zip(trust_before.iter()) + .any(|(peer, &before)| p2p_a.peer_trust(peer) < before - f64::EPSILON); + if penalised { + break; + } + tokio::time::sleep(PROPAGATION_POLL_INTERVAL).await; + } + assert!( + penalised, + "the scheduled possession check should have penalised an absent close peer" + ); + + harness.teardown().await.expect("teardown"); +} + /// `PaidForList` persistence (Section 18 #43). /// /// Insert a key into the `PaidList`, verify it persists by reopening the diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index 6b5ae84b..949aa4d6 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -222,6 +222,11 @@ pub struct TestNetworkConfig { /// this network (e.g. Anvil testnet) for on-chain verification. /// When `None`, defaults to `ArbitrumOne`. pub evm_network: Option, + + /// Optional replication-config override applied to every node's + /// replication engine. `None` uses `ReplicationConfig::default()`. Tests + /// use this to shorten timers — e.g. the ADR-0003 possession-check delay. + pub replication_config: Option, } impl Default for TestNetworkConfig { @@ -255,6 +260,7 @@ impl Default for TestNetworkConfig { enable_node_logging: false, payment_enforcement: false, evm_network: None, + replication_config: None, } } } @@ -1284,7 +1290,7 @@ impl TestNetwork { (&node.p2p_node, &node.ant_protocol, &node.node_identity) { let shutdown = CancellationToken::new(); - let repl_config = ReplicationConfig::default(); + let repl_config = self.config.replication_config.clone().unwrap_or_default(); let (_fresh_tx, fresh_rx) = tokio::sync::mpsc::unbounded_channel(); let node_identity = Arc::clone(id); match ReplicationEngine::new( From fa0048318f6ee4a8abbdbe32f6c1d61819fe4fb6 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 26 Jun 2026 16:39:37 +0200 Subject: [PATCH 7/9] test(storage): e2e proof of self-closeness gate accept/reject Adds an e2e test on a 25-node testnet: the closest (responsible) node accepts a client PUT (gate must not break normal uploads), while the farthest node (rank >= K_BUCKET_SIZE, outside its own closest view) rejects before payment with a closeness error. Proves both the regression guard and the intended reject behaviour of the ADR-0003 self-closeness gate. Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/e2e/replication.rs | 97 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 96 insertions(+), 1 deletion(-) diff --git a/tests/e2e/replication.rs b/tests/e2e/replication.rs index ca62c959..6acc10c8 100644 --- a/tests/e2e/replication.rs +++ b/tests/e2e/replication.rs @@ -10,7 +10,7 @@ use super::TestHarness; use ant_node::client::compute_address; use ant_node::replication::commitment_state::{BuiltCommitment, ResponderCommitmentState}; use ant_node::replication::config::{ - storage_admission_width, REPAIR_HINT_MIN_AGE, REPLICATION_PROTOCOL_ID, + storage_admission_width, K_BUCKET_SIZE, REPAIR_HINT_MIN_AGE, REPLICATION_PROTOCOL_ID, }; use ant_node::replication::protocol::{ compute_audit_digest, AuditChallenge, AuditResponse, FetchRequest, FetchResponse, @@ -406,6 +406,101 @@ async fn possession_scheduler_penalises_absent_close_peer_after_delay() { harness.teardown().await.expect("teardown"); } +/// ADR-0003 self-closeness gate: a node accepts a client PUT only when it is +/// within its own local `K_BUCKET_SIZE`-closest to the address. +/// +/// Needs more than `K_BUCKET_SIZE` nodes so a far node falls outside its own +/// closest view. The responsible (closest) node accepts and stores; the +/// non-responsible (farthest) node rejects before payment with a closeness +/// error. Guards both the regression risk (gate must not reject responsible +/// puts) and the intended reject behaviour. +#[tokio::test] +#[serial] +async fn self_closeness_gate_accepts_responsible_rejects_far_node() { + let harness = TestHarness::setup().await.expect("setup"); + harness.warmup_dht().await.expect("warmup"); + + let content = b"adr-0003 self-closeness gate payload"; + let address = compute_address(content); + + // Rank all peers by XOR distance to the address (identical from any node's + // view once routing tables are warm). + let ranker_p2p = harness + .test_node(3) + .expect("ranker") + .p2p_node + .as_ref() + .expect("p2p"); + let ranked: Vec = ranker_p2p + .dht_manager() + .find_closest_nodes_local_with_self(&address, harness.node_count()) + .await + .iter() + .map(|n| n.peer_id) + .collect(); + assert!( + ranked.len() > K_BUCKET_SIZE, + "need > K_BUCKET_SIZE nodes to exercise the reject path; got {}", + ranked.len() + ); + + // Only nodes that actually run a protocol handler can serve a client PUT. + let has_protocol = |peer: &PeerId| { + node_index_for_peer(&harness, peer) + .and_then(|idx| harness.test_node(idx)) + .is_some_and(|n| n.ant_protocol.is_some()) + }; + + // Closest node with a handler -> within its own K-closest -> gate accepts. + let close_peer = ranked + .iter() + .copied() + .find(|p| has_protocol(p)) + .expect("a close node with a handler"); + // Farthest node beyond the gate width with a handler -> gate rejects. + let far_peer = ranked + .iter() + .copied() + .enumerate() + .rev() + .find(|(rank, p)| *rank >= K_BUCKET_SIZE && has_protocol(p)) + .map(|(_, p)| p) + .expect("a far node (rank >= K_BUCKET_SIZE) with a handler"); + + let close_idx = node_index_for_peer(&harness, &close_peer).expect("close idx"); + let far_idx = node_index_for_peer(&harness, &far_peer).expect("far idx"); + + // Accept path: a responsible node stores the chunk (gate passes). + let close_result = harness + .test_node(close_idx) + .expect("close node") + .store_chunk(content) + .await; + assert!( + close_result.is_ok(), + "responsible (closest) node must accept the PUT, got {close_result:?}" + ); + + // Reject path: a non-responsible node rejects before payment with a + // closeness error. + let far_result = harness + .test_node(far_idx) + .expect("far node") + .store_chunk(content) + .await; + assert!( + far_result.is_err(), + "non-responsible (farthest) node must reject the PUT" + ); + let err = format!("{}", far_result.expect_err("far rejection")); + assert!( + err.contains("closest"), + "rejection should cite closeness, got: {err}" + ); + + harness.teardown().await.expect("teardown"); +} + /// `PaidForList` persistence (Section 18 #43). /// /// Insert a key into the `PaidList`, verify it persists by reopening the From 52b6f49fa22c630ffa0b7502ddf5a3e3771314e4 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 26 Jun 2026 18:18:20 +0200 Subject: [PATCH 8/9] test(replication): prove full close-group nodes are shunned --- tests/e2e/replication.rs | 153 +++++++++++++++++++++++++++++++++++++++ tests/e2e/testnet.rs | 39 +++++++++- 2 files changed, 189 insertions(+), 3 deletions(-) diff --git a/tests/e2e/replication.rs b/tests/e2e/replication.rs index 6acc10c8..ab3dc010 100644 --- a/tests/e2e/replication.rs +++ b/tests/e2e/replication.rs @@ -33,6 +33,20 @@ use tokio::sync::RwLock; const PROPAGATION_TIMEOUT: Duration = Duration::from_secs(15); /// Interval between propagation poll checks. const PROPAGATION_POLL_INTERVAL: Duration = Duration::from_millis(200); +/// Checker node used by the full-node shunning regression. +const FULL_NODE_SHUN_CHECKER_INDEX: usize = 3; +/// Target node made disk-full by the full-node shunning regression. +const FULL_NODE_SHUN_TARGET_INDEX: usize = 4; +/// Search budget for finding a key whose close group contains the full node. +const FULL_NODE_SHUN_KEY_SEARCH_ATTEMPTS: usize = 10_000; +/// Fast lower bound for the full-node shunning scheduler check. +const FULL_NODE_SHUN_POSSESSION_DELAY_MIN: Duration = Duration::from_millis(200); +/// Fast upper bound for the full-node shunning scheduler check. +const FULL_NODE_SHUN_POSSESSION_DELAY_MAX: Duration = Duration::from_millis(500); +/// Dummy proof length used when a test only needs to reach pre-payment gates. +const DUMMY_PAYMENT_PROOF_LEN: usize = 64; +/// Dummy proof byte used when a test only needs to reach pre-payment gates. +const DUMMY_PAYMENT_PROOF_BYTE: u8 = 0x01; /// Send a replication request via saorsa-core's request-response mechanism /// and decode the response. @@ -406,6 +420,145 @@ async fn possession_scheduler_penalises_absent_close_peer_after_delay() { harness.teardown().await.expect("teardown"); } +/// ADR-0003 full-node shunning: a close-group peer that is disk-full rejects a +/// fresh-replication offer before payment verification, remains absent for the +/// key, and is penalised when the checker probes possession. +/// +/// This bridges the two protections that make a full node get shunned by close +/// groups: capacity rejection creates a missing replica, and the delayed +/// possession-check verdict turns that absence into the trust signal that +/// saorsa-core eviction acts on. +#[tokio::test] +#[serial] +async fn full_close_group_node_rejects_replica_and_is_penalised_as_absent() { + let mut net_config = TestNetworkConfig::small(); + net_config.replication_config = Some(ReplicationConfig { + possession_check_delay_min: FULL_NODE_SHUN_POSSESSION_DELAY_MIN, + possession_check_delay_max: FULL_NODE_SHUN_POSSESSION_DELAY_MAX, + ..ReplicationConfig::default() + }); + net_config + .storage_disk_reserve_overrides + .insert(FULL_NODE_SHUN_TARGET_INDEX, u64::MAX); + let harness = TestHarness::setup_with_config(net_config) + .await + .expect("setup"); + harness.warmup_dht().await.expect("warmup"); + + let checker = harness + .test_node(FULL_NODE_SHUN_CHECKER_INDEX) + .expect("checker node"); + let full_node = harness + .test_node(FULL_NODE_SHUN_TARGET_INDEX) + .expect("full node"); + let checker_p2p = checker.p2p_node.as_ref().expect("checker p2p"); + let checker_engine = checker.replication_engine.as_ref().expect("checker engine"); + let full_p2p = full_node.p2p_node.as_ref().expect("full node p2p"); + let full_peer = *full_p2p.peer_id(); + + let close_group_size = ReplicationConfig::default().close_group_size; + let admission_width = storage_admission_width(close_group_size); + let mut candidate = None; + for attempt in 0..FULL_NODE_SHUN_KEY_SEARCH_ATTEMPTS { + let content = format!("adr-0003 full-node shunning payload {attempt}").into_bytes(); + let address = compute_address(&content); + let full_node_in_checker_close_group = checker_p2p + .dht_manager() + .find_closest_nodes_local_with_self(&address, close_group_size) + .await + .iter() + .any(|node| node.peer_id == full_peer); + if !full_node_in_checker_close_group { + continue; + } + + let full_node_admits_self = full_p2p + .dht_manager() + .find_closest_nodes_local_with_self(&address, admission_width) + .await + .iter() + .any(|node| node.peer_id == full_peer); + if full_node_admits_self { + candidate = Some((content, address)); + break; + } + } + let (content, address) = + candidate.expect("find key where full node is a responsible close-group peer"); + + for idx in 0..harness.node_count() { + if let Some(protocol) = harness + .test_node(idx) + .and_then(|node| node.ant_protocol.as_ref()) + { + protocol.payment_verifier().cache_insert(address); + } + } + + let dummy_payment_proof = vec![DUMMY_PAYMENT_PROOF_BYTE; DUMMY_PAYMENT_PROOF_LEN]; + let offer = FreshReplicationOffer { + key: address, + data: content.clone(), + proof_of_payment: dummy_payment_proof.clone(), + }; + let response = send_replication_request( + checker_p2p, + &full_peer, + ReplicationMessage { + request_id: rand::random(), + body: ReplicationMessageBody::FreshReplicationOffer(offer), + }, + PROPAGATION_TIMEOUT, + ) + .await; + + match response.body { + ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected { + key, + reason, + }) => { + assert_eq!(key, address); + assert!( + reason.contains("Insufficient disk space"), + "expected disk-full rejection, got: {reason}" + ); + } + other => panic!("expected disk-full rejection, got: {other:?}"), + } + + let full_storage = full_node + .ant_protocol + .as_ref() + .expect("full node protocol") + .storage(); + assert!( + !full_storage.exists(&address).expect("exists on full node"), + "full node must not store the rejected replica" + ); + + let trust_before = checker_p2p.peer_trust(&full_peer); + checker_engine + .replicate_fresh(&address, &content, &dummy_payment_proof) + .await; + + let deadline = tokio::time::Instant::now() + PROPAGATION_TIMEOUT; + let mut trust_after = trust_before; + while tokio::time::Instant::now() < deadline { + trust_after = checker_p2p.peer_trust(&full_peer); + if trust_after < trust_before - f64::EPSILON { + break; + } + tokio::time::sleep(PROPAGATION_POLL_INTERVAL).await; + } + assert!( + trust_after < trust_before - f64::EPSILON, + "full close-group peer should be shunned by the scheduled possession check: \ + {trust_before} -> {trust_after}" + ); + + harness.teardown().await.expect("teardown"); +} + /// ADR-0003 self-closeness gate: a node accepts a client PUT only when it is /// within its own local `K_BUCKET_SIZE`-closest to the address. /// diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index 949aa4d6..3eb19a8c 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -35,6 +35,7 @@ use saorsa_core::{ identity::NodeIdentity, IPDiversityConfig as CoreDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, }; +use std::collections::HashMap; use std::net::{Ipv4Addr, SocketAddr}; use std::path::PathBuf; use std::sync::Arc; @@ -227,6 +228,12 @@ pub struct TestNetworkConfig { /// replication engine. `None` uses `ReplicationConfig::default()`. Tests /// use this to shorten timers — e.g. the ADR-0003 possession-check delay. pub replication_config: Option, + + /// Optional per-node storage disk-reserve overrides. + /// + /// Tests can set a node's reserve above available disk space to make its + /// capacity pre-check fail deterministically without filling the host disk. + pub storage_disk_reserve_overrides: HashMap, } impl Default for TestNetworkConfig { @@ -261,6 +268,7 @@ impl Default for TestNetworkConfig { payment_enforcement: false, evm_network: None, replication_config: None, + storage_disk_reserve_overrides: HashMap::new(), } } } @@ -1046,9 +1054,19 @@ impl TestNetwork { })?); // Initialize AntProtocol for this node with payment enforcement setting - let ant_protocol = - Self::create_ant_protocol(&data_dir, self.config.evm_network.clone(), &identity) - .await?; + let storage_disk_reserve = self + .config + .storage_disk_reserve_overrides + .get(&index) + .copied() + .unwrap_or_default(); + let ant_protocol = Self::create_ant_protocol_with_disk_reserve( + &data_dir, + self.config.evm_network.clone(), + storage_disk_reserve, + &identity, + ) + .await?; Ok(TestNode { index, @@ -1086,10 +1104,25 @@ impl TestNetwork { data_dir: &std::path::Path, evm_network: Option, identity: &saorsa_core::identity::NodeIdentity, + ) -> Result { + Self::create_ant_protocol_with_disk_reserve(data_dir, evm_network, 0, identity).await + } + + /// Create an `AntProtocol` handler with an explicit storage disk reserve. + /// + /// # Errors + /// + /// Returns an error if LMDB storage initialisation fails. + pub async fn create_ant_protocol_with_disk_reserve( + data_dir: &std::path::Path, + evm_network: Option, + disk_reserve: u64, + identity: &saorsa_core::identity::NodeIdentity, ) -> Result { // Create LMDB storage let storage_config = LmdbStorageConfig { root_dir: data_dir.to_path_buf(), + disk_reserve, ..LmdbStorageConfig::test_default() }; let storage = LmdbStorage::new(storage_config) From 8639fdabacfb1350ecf25ee7ca43c5cd3a2e7cfd Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Fri, 26 Jun 2026 20:07:49 +0200 Subject: [PATCH 9/9] feat(replication): widen fresh-replication accept admission during convergence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Amends ADR-0003. A healthy replica whose routing table still lists closer full nodes it hasn't evicted yet ranks itself outside the narrow storage_admission_width window and would wrongly reject a fresh-replication offer it should accept — stalling convergence, and inconsistent with the wider K-window it already accepts client PUTs within. Widen the fresh-offer accept gate to config.paid_list_close_group_size (= K_BUCKET_SIZE = 20). Safe: the sender still fans out only to close_group_size, so this adds no stores; retention/pruning stays at storage_admission_width, so steady-state replication is unchanged and any transient over-coverage is reclaimed once the close group converges (the multi-day prune hysteresis spans the minutes-long eviction window). Sender fan-out deliberately not widened — the repair path heals that case. 675 lib tests + fresh-replication & full-close-group-shunning e2e tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) --- ...R-0003-full-node-detection-and-eviction.md | 46 ++++++++++++++++++- src/replication/mod.rs | 12 +++-- 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/docs/adr/ADR-0003-full-node-detection-and-eviction.md b/docs/adr/ADR-0003-full-node-detection-and-eviction.md index 9a8aa55b..2da2a5db 100644 --- a/docs/adr/ADR-0003-full-node-detection-and-eviction.md +++ b/docs/adr/ADR-0003-full-node-detection-and-eviction.md @@ -141,6 +141,46 @@ mis-attributing the resulting replication failures to honest nodes. strangles the fallback it is meant to coexist with. Express both against the same width and choose them together. +### Fresh-replication admission during convergence + +A healthy node that *should* hold a chunk can transiently fail to recognise its own +responsibility: while full nodes closer to the key still sit in its routing table +(not yet detected and evicted), they push it past the narrow +`storage_admission_width` (close group + small margin) in its *own* view. With that +narrow window it would then **reject** a fresh-replication offer it ought to accept — +so the replication path that is meant to heal coverage instead refuses it, stalling +convergence. This is also inconsistent: the same node already accepts a **client +PUT** for that key within the wider `K_BUCKET_SIZE` window (both the payment +issuer-closeness check and the self-closeness gate above use it), so a key could land +on the node directly from a client but not via replication. + +**Decision:** widen the **fresh-replication accept** admission to the K-wide +paid-close-group neighbourhood (`paid_list_close_group_size`, equal to +`K_BUCKET_SIZE` = 20) — the same window client PUTs use — so transient view-skew +from un-evicted full nodes no longer causes spurious rejection. A node accepts a +fresh offer when it is within its own local `paid_list_close_group_size`-closest to +the key. Two properties keep this safe: + +- **No extra storage from the accept side.** The sender still fans out only to its + `close_group_size` targets, so widening the *receiver's* accept window adds no + stores — it only stops those targets from rejecting due to view-skew. +- **Retention stays narrow.** Long-term retention/pruning keeps using + `storage_admission_width`, so steady-state replication is still ≈ K. Any transient + over-coverage is reclaimed once the close group converges (full nodes evicted → the + node's view tightens → it sits correctly inside or outside the narrow window), and + the multi-day prune hysteresis comfortably spans the minutes-long eviction window, + so a legitimate replica is never pruned mid-convergence. + +**Sender side (deliberately not widened).** The mirror case — the *sender's* own +close group being mostly full, so its `close_group_size` offers never reach the +healthy nodes ranked beyond the full ones — is left to the existing convergence loop: +the possession check evicts the full nodes, the close group tightens to healthy +members, and the responsible-chunk repair (neighbour sync) fetches the chunk to the +nodes that become responsible. Widening the sender fan-out to `K_BUCKET_SIZE` was +considered and rejected — it would triple fresh-replication fan-out on *every* write +and hold large transient over-replication for the full prune-hysteresis window, for a +case the repair path already heals. + ## Consequences ### Positive @@ -205,7 +245,11 @@ How we will know this decision remains correct: present/absent; the adaptive timeout grace tracks widespread timeouts but never deterministic failures; the node emits into saorsa-core such that an evicted-for-fullness node can re-enter after it frees capacity (integration); the - self-closeness gate width is ≥ the fallback ceiling. + self-closeness gate width is ≥ the fallback ceiling; a healthy node with un-evicted + full nodes ahead of it in its routing table (so it ranks outside + `storage_admission_width` but within `K_BUCKET_SIZE`) still **accepts** a + fresh-replication offer for the key, while retention/pruning stays scoped to + `storage_admission_width`. - **Re-open triggers:** revisit thresholds if false positives appear; revisit the near-capacity degradation if the network approaches global capacity. diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 3f15d4a9..fb9f6e11 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -2074,13 +2074,19 @@ async fn handle_fresh_offer( return Ok(()); } - // Rule 7: check storage admission. Fresh chunk receivers accept the close - // group plus a small margin to absorb local routing-table disagreement. + // Rule 7: check storage admission. Fresh chunk receivers accept across the + // paid-close-group neighbourhood (`paid_list_close_group_size`, = K_BUCKET_SIZE, + // the same width client PUTs use), not just the close group plus a small + // margin (ADR-0003). During full-node shunning a healthy replica's routing + // table may still list closer full nodes it hasn't evicted yet, ranking it + // outside the narrow window in its own view; the wider accept window absorbs + // that transient skew so the chunk still lands. Retention (pruning) stays at + // `storage_admission_width`, so steady-state replication is unchanged. if !admission::is_responsible( &self_id, &offer.key, p2p_node, - storage_admission_width(config.close_group_size), + config.paid_list_close_group_size, ) .await {