diff --git a/Cargo.lock b/Cargo.lock index e212fa4..91545e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4867,8 +4867,7 @@ dependencies = [ [[package]] name = "saorsa-core" version = "0.26.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa8cc1b7f59f97d018760ff150bbb4f217197c41622b83f7085c9cf0424b736e" +source = "git+https://github.com/WithAutonomi/saorsa-core?branch=feat/trust-quarantine-thresholds#3afe290442df42a7b6ca8989f860d185d1f6f9b4" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 7d08918..88f570a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,8 +41,11 @@ mimalloc = "0.1" # with ant-protocol's re-exports. ant-protocol = "2.2.0" -# Core (provides EVERYTHING: networking, DHT, security, trust, storage) -saorsa-core = "0.26.0" +# Core (provides EVERYTHING: networking, DHT, security, trust, storage). +# Pinned to WithAutonomi/saorsa-core PR #119 (trust quarantine thresholds); +# the matching `[patch.crates-io]` below redirects ant-protocol's transitive +# saorsa-core to the same source so Cargo unifies on one copy. +saorsa-core = { git = "https://github.com/WithAutonomi/saorsa-core", branch = "feat/trust-quarantine-thresholds" } saorsa-pqc = "0.5" # Payment verification - autonomi network lookup + EVM payment @@ -196,3 +199,11 @@ unused_async = "allow" cognitive_complexity = "allow" # Allow non-const functions during initial development (may need runtime features later) missing_const_for_fn = "allow" + +[patch.crates-io] +# Redirect the saorsa-core that ant-protocol (and other crates) pull from +# crates.io onto the same git source as the node's direct dependency, +# eliminating the duplicate 0.26.0 copies (crates.io vs git) that otherwise +# collide on shared types such as `saorsa_core::address::MultiAddr`. +# Tracks WithAutonomi/saorsa-core PR #119 (trust quarantine thresholds). +saorsa-core = { git = "https://github.com/WithAutonomi/saorsa-core", branch = "feat/trust-quarantine-thresholds" } diff --git a/docs/adr/ADR-0003-full-node-detection-and-eviction.md b/docs/adr/ADR-0003-full-node-detection-and-eviction.md index 9a8aa55..2da2a5d 100644 --- a/docs/adr/ADR-0003-full-node-detection-and-eviction.md +++ b/docs/adr/ADR-0003-full-node-detection-and-eviction.md @@ -141,6 +141,46 @@ mis-attributing the resulting replication failures to honest nodes. strangles the fallback it is meant to coexist with. Express both against the same width and choose them together. +### Fresh-replication admission during convergence + +A healthy node that *should* hold a chunk can transiently fail to recognise its own +responsibility: while full nodes closer to the key still sit in its routing table +(not yet detected and evicted), they push it past the narrow +`storage_admission_width` (close group + small margin) in its *own* view. With that +narrow window it would then **reject** a fresh-replication offer it ought to accept — +so the replication path that is meant to heal coverage instead refuses it, stalling +convergence. This is also inconsistent: the same node already accepts a **client +PUT** for that key within the wider `K_BUCKET_SIZE` window (both the payment +issuer-closeness check and the self-closeness gate above use it), so a key could land +on the node directly from a client but not via replication. + +**Decision:** widen the **fresh-replication accept** admission to the K-wide +paid-close-group neighbourhood (`paid_list_close_group_size`, equal to +`K_BUCKET_SIZE` = 20) — the same window client PUTs use — so transient view-skew +from un-evicted full nodes no longer causes spurious rejection. A node accepts a +fresh offer when it is within its own local `paid_list_close_group_size`-closest to +the key. Two properties keep this safe: + +- **No extra storage from the accept side.** The sender still fans out only to its + `close_group_size` targets, so widening the *receiver's* accept window adds no + stores — it only stops those targets from rejecting due to view-skew. +- **Retention stays narrow.** Long-term retention/pruning keeps using + `storage_admission_width`, so steady-state replication is still ≈ K. Any transient + over-coverage is reclaimed once the close group converges (full nodes evicted → the + node's view tightens → it sits correctly inside or outside the narrow window), and + the multi-day prune hysteresis comfortably spans the minutes-long eviction window, + so a legitimate replica is never pruned mid-convergence. + +**Sender side (deliberately not widened).** The mirror case — the *sender's* own +close group being mostly full, so its `close_group_size` offers never reach the +healthy nodes ranked beyond the full ones — is left to the existing convergence loop: +the possession check evicts the full nodes, the close group tightens to healthy +members, and the responsible-chunk repair (neighbour sync) fetches the chunk to the +nodes that become responsible. Widening the sender fan-out to `K_BUCKET_SIZE` was +considered and rejected — it would triple fresh-replication fan-out on *every* write +and hold large transient over-replication for the full prune-hysteresis window, for a +case the repair path already heals. + ## Consequences ### Positive @@ -205,7 +245,11 @@ How we will know this decision remains correct: present/absent; the adaptive timeout grace tracks widespread timeouts but never deterministic failures; the node emits into saorsa-core such that an evicted-for-fullness node can re-enter after it frees capacity (integration); the - self-closeness gate width is ≥ the fallback ceiling. + self-closeness gate width is ≥ the fallback ceiling; a healthy node with un-evicted + full nodes ahead of it in its routing table (so it ranks outside + `storage_admission_width` but within `K_BUCKET_SIZE`) still **accepts** a + fresh-replication offer for the key, while retention/pruning stays scoped to + `storage_admission_width`. - **Re-open triggers:** revisit thresholds if false positives appear; revisit the near-capacity degradation if the network approaches global capacity. diff --git a/src/replication/config.rs b/src/replication/config.rs index 571c934..98d9259 100644 --- a/src/replication/config.rs +++ b/src/replication/config.rs @@ -47,6 +47,47 @@ pub const NEIGHBOR_SYNC_SCOPE: usize = 20; /// round. pub const NEIGHBOR_SYNC_PEER_COUNT: usize = 4; +/// Best-effort delivery retries for a fresh-replication push, per peer. +/// +/// ADR-0003: on a transport/send failure the offer is retried up to this many +/// times so a transient hiccup does not silently drop it. This is delivery +/// assurance only — possession is judged separately by the delayed possession +/// check, which still penalises a close peer that lacks the chunk even if the +/// push never reached it. +pub const FRESH_REPLICATION_DELIVERY_MAX_RETRIES: u32 = 2; + +const POSSESSION_CHECK_DELAY_MIN_SECS: u64 = 5 * 60; +const POSSESSION_CHECK_DELAY_MAX_SECS: u64 = 15 * 60; +const POSSESSION_CHECK_TIMEOUT_SECS: u64 = 15; +const POSSESSION_CHECK_RETRY_BACKOFF_SECS: u64 = 30; + +/// Lower bound of the delay before a fresh-replication possession check runs +/// (ADR-0003). +/// +/// The delay lets replication settle so an honest peer still mid-store is not +/// judged prematurely, and makes the check unpredictable to the peer. +pub const POSSESSION_CHECK_DELAY_MIN: Duration = + Duration::from_secs(POSSESSION_CHECK_DELAY_MIN_SECS); + +/// Upper bound of the possession-check delay (ADR-0003). +pub const POSSESSION_CHECK_DELAY_MAX: Duration = + Duration::from_secs(POSSESSION_CHECK_DELAY_MAX_SECS); + +/// Per-peer request timeout for a single possession probe (ADR-0003). +pub const POSSESSION_CHECK_TIMEOUT: Duration = Duration::from_secs(POSSESSION_CHECK_TIMEOUT_SECS); + +/// Maximum possession-probe attempts per peer before giving up without a +/// verdict (ADR-0003). +/// +/// A peer unreachable at check time is re-attempted under this bounded grace +/// and is never penalised as absent. +pub const POSSESSION_CHECK_MAX_ATTEMPTS: u32 = 3; + +/// Backoff between possession-probe attempts when a peer yields no verdict +/// (ADR-0003). +pub const POSSESSION_CHECK_RETRY_BACKOFF: Duration = + Duration::from_secs(POSSESSION_CHECK_RETRY_BACKOFF_SECS); + /// Width used when deciding whether this node may locally store or retain a /// chunk. #[must_use] @@ -428,6 +469,13 @@ pub struct ReplicationConfig { /// Seconds to wait for `DhtNetworkEvent::BootstrapComplete` before /// proceeding with bootstrap sync (covers bootstrap nodes with no peers). pub bootstrap_complete_timeout_secs: u64, + /// Lower bound of the delay before a fresh-replication possession check + /// runs (ADR-0003). Defaults to [`POSSESSION_CHECK_DELAY_MIN`]; tests + /// shorten it so the scheduled check fires quickly. + pub possession_check_delay_min: Duration, + /// Upper bound of the possession-check delay window (ADR-0003). Defaults + /// to [`POSSESSION_CHECK_DELAY_MAX`]. + pub possession_check_delay_max: Duration, } impl Default for ReplicationConfig { @@ -454,6 +502,8 @@ impl Default for ReplicationConfig { verification_request_timeout: VERIFICATION_REQUEST_TIMEOUT, fetch_request_timeout: FETCH_REQUEST_TIMEOUT, bootstrap_complete_timeout_secs: BOOTSTRAP_COMPLETE_TIMEOUT_SECS, + possession_check_delay_min: POSSESSION_CHECK_DELAY_MIN, + possession_check_delay_max: POSSESSION_CHECK_DELAY_MAX, } } } diff --git a/src/replication/fresh.rs b/src/replication/fresh.rs index af3a93a..80e9766 100644 --- a/src/replication/fresh.rs +++ b/src/replication/fresh.rs @@ -14,7 +14,9 @@ use saorsa_core::P2PNode; use tokio::sync::Semaphore; use crate::ant_protocol::XorName; -use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID}; +use crate::replication::config::{ + ReplicationConfig, FRESH_REPLICATION_DELIVERY_MAX_RETRIES, REPLICATION_PROTOCOL_ID, +}; use crate::replication::paid_list::PaidList; use crate::replication::protocol::{ FreshReplicationOffer, PaidNotify, ReplicationMessage, ReplicationMessageBody, @@ -36,9 +38,10 @@ pub struct FreshWriteEvent { /// Execute fresh replication for a newly accepted record. /// -/// Sends fresh offers to close group members and `PaidNotify` to -/// `PaidCloseGroup`. Both are fire-and-forget (no ack tracking or retry per -/// Section 6.1, rule 8). +/// Sends fresh offers to close group members (with bounded delivery retries, +/// ADR-0003) and `PaidNotify` to `PaidCloseGroup`. Returns the close-group +/// peers responsible for the key (excluding self) so the caller can schedule +/// the delayed possession check; `PaidNotify` remains fire-and-forget. /// /// The `send_semaphore` limits how many outbound chunk transfers can be /// in-flight concurrently across the entire replication engine, preventing @@ -51,7 +54,7 @@ pub async fn replicate_fresh( paid_list: &Arc, config: &ReplicationConfig, send_semaphore: &Arc, -) { +) -> Vec { let self_id = *p2p_node.peer_id(); // Rule 6: Node that validates PoP adds K to PaidForList(self). @@ -88,11 +91,15 @@ pub async fn replicate_fresh( "Failed to encode FreshReplicationOffer for {}", hex::encode(key), ); - return; + return Vec::new(); }; + // Share one encoded copy across the per-peer send tasks so a retry only + // re-materialises the buffer for the (consuming) send call, keeping the + // common single-attempt path at one clone per peer. + let encoded = Arc::new(encoded); for peer in &target_peers { let p2p = Arc::clone(p2p_node); - let data = encoded.clone(); + let data = Arc::clone(&encoded); let peer_id = *peer; let sem = Arc::clone(send_semaphore); tokio::spawn(async move { @@ -103,11 +110,37 @@ pub async fn replicate_fresh( "Replication send permit acquired for peer {peer_id} ({} available)", sem.available_permits() ); - if let Err(e) = p2p - .send_message(&peer_id, REPLICATION_PROTOCOL_ID, data, &[]) - .await - { - debug!("Failed to send fresh offer to {peer_id}: {e}"); + // ADR-0003: best-effort delivery. Retry the push up to + // FRESH_REPLICATION_DELIVERY_MAX_RETRIES times on a transport + // failure so a transient hiccup doesn't silently drop the offer. + // Possession is judged separately by the delayed possession check. + let mut attempt = 0u32; + loop { + match p2p + .send_message( + &peer_id, + REPLICATION_PROTOCOL_ID, + data.as_ref().clone(), + &[], + ) + .await + { + Ok(()) => break, + Err(e) => { + if attempt >= FRESH_REPLICATION_DELIVERY_MAX_RETRIES { + debug!( + "Failed to send fresh offer to {peer_id} after {} attempts: {e}", + attempt + 1 + ); + break; + } + attempt += 1; + debug!( + "Retrying fresh offer to {peer_id} (attempt {}): {e}", + attempt + 1 + ); + } + } } }); } @@ -122,6 +155,8 @@ pub async fn replicate_fresh( hex::encode(key), target_peers.len() ); + + target_peers } /// Send `PaidNotify(K)` to every peer in `PaidCloseGroup(K)` (fire-and-forget). diff --git a/src/replication/mod.rs b/src/replication/mod.rs index 0f6394a..fb9f6e1 100644 --- a/src/replication/mod.rs +++ b/src/replication/mod.rs @@ -23,6 +23,7 @@ pub mod config; pub mod fresh; pub mod neighbor_sync; pub mod paid_list; +pub mod possession; pub mod protocol; pub mod pruning; pub mod quorum; @@ -311,6 +312,12 @@ pub struct ReplicationEngine { /// When present, `start()` spawns a drainer task that calls /// `replicate_fresh` for each event. fresh_write_rx: Option>, + /// Sender for delayed possession-check events (ADR-0003). The fresh-write + /// drainer pushes the responsible close-group peers here after each fresh + /// replication; the possession-check scheduler drains the paired receiver. + possession_check_tx: mpsc::UnboundedSender, + /// Receiver paired with `possession_check_tx`; taken by the scheduler task. + possession_check_rx: Option>, /// Shutdown token. shutdown: CancellationToken, /// Background task handles. @@ -345,6 +352,7 @@ impl ReplicationEngine { let initial_neighbors = NeighborSyncState::new_cycle(Vec::new()); let config = Arc::new(config); + let (possession_check_tx, possession_check_rx) = mpsc::unbounded_channel(); Ok(Self { config: Arc::clone(&config), @@ -373,6 +381,8 @@ impl ReplicationEngine { audit_responder_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_AUDIT_RESPONSES)), audit_responder_inflight: Arc::new(RwLock::new(HashMap::new())), fresh_write_rx: Some(fresh_write_rx), + possession_check_tx, + possession_check_rx: Some(possession_check_rx), shutdown, task_handles: Vec::new(), }) @@ -475,6 +485,17 @@ impl ReplicationEngine { .await } + /// Test-only: run the possession check immediately for `key` against + /// `peers`, bypassing the scheduler's randomised 5-15 minute settle delay. + /// + /// Penalises any peer that does not hold `key` at `AuditChallenge` + /// severity (ADR-0003). Lets e2e tests assert the detection+penalty path + /// deterministically without waiting for the scheduled check. + #[cfg(any(test, feature = "test-utils"))] + pub async fn run_possession_check_now(&self, key: XorName, peers: Vec) { + possession::run_possession_check(key, peers, &self.p2p_node, &self.shutdown).await; + } + /// Start all background tasks. /// /// `dht_events` must be subscribed **before** `P2PNode::start()` so that @@ -500,6 +521,7 @@ impl ReplicationEngine { self.start_verification_worker(); self.start_bootstrap_sync(dht_events); self.start_fresh_write_drainer(); + self.start_possession_check_scheduler(); info!( "Replication engine started with {} background tasks", @@ -566,9 +588,13 @@ impl ReplicationEngine { self.sync_trigger.notify_one(); } - /// Execute fresh replication for a newly stored record. + /// Execute fresh replication for a newly stored record, then schedule the + /// delayed possession check for the responsible close-group peers + /// (ADR-0003). The production PUT path schedules via the fresh-write + /// drainer; this direct entry point schedules here so callers (and tests) + /// that drive replication directly still get the possession check. pub async fn replicate_fresh(&self, key: &XorName, data: &[u8], proof_of_payment: &[u8]) { - fresh::replicate_fresh( + let peers = fresh::replicate_fresh( key, data, proof_of_payment, @@ -578,6 +604,11 @@ impl ReplicationEngine { &self.send_semaphore, ) .await; + if !peers.is_empty() { + let _ = self + .possession_check_tx + .send(possession::PossessionCheckEvent { key: *key, peers }); + } } // ======================================================================= @@ -594,6 +625,7 @@ impl ReplicationEngine { let paid_list = Arc::clone(&self.paid_list); let config = Arc::clone(&self.config); let send_semaphore = Arc::clone(&self.send_semaphore); + let possession_tx = self.possession_check_tx.clone(); let shutdown = self.shutdown.clone(); let handle = tokio::spawn(async move { @@ -602,7 +634,7 @@ impl ReplicationEngine { () = shutdown.cancelled() => break, event = rx.recv() => { let Some(event) = event else { break }; - fresh::replicate_fresh( + let peers = fresh::replicate_fresh( &event.key, &event.data, &event.payment_proof, @@ -612,6 +644,15 @@ impl ReplicationEngine { &send_semaphore, ) .await; + // Schedule the delayed possession check (ADR-0003) for + // the responsible close-group peers. A closed receiver + // (engine shutting down) is ignored. + if !peers.is_empty() { + let _ = possession_tx.send(possession::PossessionCheckEvent { + key: event.key, + peers, + }); + } } } } @@ -620,6 +661,57 @@ impl ReplicationEngine { self.task_handles.push(handle); } + /// Spawn the possession-check scheduler (ADR-0003). + /// + /// Drains scheduled possession-check events and, for each, waits a + /// randomised 5-15 minute settle delay before probing every responsible + /// peer for actual possession. A peer that lacks the chunk is penalised at + /// `AuditChallenge` severity; an unreachable peer is re-probed under a + /// bounded grace and never penalised. + fn start_possession_check_scheduler(&mut self) { + let Some(mut rx) = self.possession_check_rx.take() else { + return; + }; + let p2p = Arc::clone(&self.p2p_node); + let config = Arc::clone(&self.config); + let shutdown = self.shutdown.clone(); + + let handle = tokio::spawn(async move { + loop { + tokio::select! { + () = shutdown.cancelled() => break, + event = rx.recv() => { + let Some(event) = event else { break }; + // Spawn a per-chunk delayed check so the drain loop + // keeps pace with the write rate. Each check sleeps the + // randomised settle delay, then probes every peer. + let p2p = Arc::clone(&p2p); + let shutdown = shutdown.clone(); + let delay_min = config.possession_check_delay_min; + let delay_max = config.possession_check_delay_max; + tokio::spawn(async move { + let delay = possession::random_delay(delay_min, delay_max); + tokio::select! { + () = shutdown.cancelled() => {} + () = tokio::time::sleep(delay) => { + possession::run_possession_check( + event.key, + event.peers, + &p2p, + &shutdown, + ) + .await; + } + } + }); + } + } + } + debug!("Possession-check scheduler shut down"); + }); + self.task_handles.push(handle); + } + #[allow(clippy::too_many_lines)] fn start_message_handler(&mut self) { let mut p2p_events = self.p2p_node.subscribe_events(); @@ -1982,13 +2074,19 @@ async fn handle_fresh_offer( return Ok(()); } - // Rule 7: check storage admission. Fresh chunk receivers accept the close - // group plus a small margin to absorb local routing-table disagreement. + // Rule 7: check storage admission. Fresh chunk receivers accept across the + // paid-close-group neighbourhood (`paid_list_close_group_size`, = K_BUCKET_SIZE, + // the same width client PUTs use), not just the close group plus a small + // margin (ADR-0003). During full-node shunning a healthy replica's routing + // table may still list closer full nodes it hasn't evicted yet, ranking it + // outside the narrow window in its own view; the wider accept window absorbs + // that transient skew so the chunk still lands. Retention (pruning) stays at + // `storage_admission_width`, so steady-state replication is unchanged. if !admission::is_responsible( &self_id, &offer.key, p2p_node, - storage_admission_width(config.close_group_size), + config.paid_list_close_group_size, ) .await { diff --git a/src/replication/possession.rs b/src/replication/possession.rs new file mode 100644 index 0000000..79b3b6a --- /dev/null +++ b/src/replication/possession.rs @@ -0,0 +1,194 @@ +//! Delayed possession verification for fresh replication (ADR-0003). +//! +//! After a node fresh-replicates a chunk, every close-group peer responsible +//! for it is checked 5-15 minutes later for actual possession. A peer that +//! holds the chunk earns nothing — storing what it was paid to store is the +//! baseline expectation, not meritorious; a peer confirmed *not* to hold it is +//! penalised at `AuditChallenge` severity. Delivery of the original push is +//! irrelevant: a peer the push never reached is still checked and penalised if +//! it lacks the chunk. A peer merely unreachable at check time yields no +//! verdict — it is re-attempted under a bounded grace and never penalised as +//! absent. + +use std::sync::Arc; +use std::time::Duration; + +use rand::Rng; +use saorsa_core::identity::PeerId; +use saorsa_core::{P2PNode, TrustEvent}; +use tokio_util::sync::CancellationToken; + +use crate::ant_protocol::XorName; +use crate::logging::{debug, warn}; +use crate::replication::config::{ + self, POSSESSION_CHECK_MAX_ATTEMPTS, POSSESSION_CHECK_RETRY_BACKOFF, POSSESSION_CHECK_TIMEOUT, + REPLICATION_PROTOCOL_ID, +}; +use crate::replication::protocol::{ + ReplicationMessage, ReplicationMessageBody, VerificationRequest, +}; + +/// A scheduled possession check for one freshly-replicated chunk. +pub struct PossessionCheckEvent { + /// Content-address of the chunk. + pub key: XorName, + /// Close-group peers responsible for holding it (excludes self). + pub peers: Vec, +} + +/// Verdict of probing a single peer for possession of a chunk. +enum ProbeOutcome { + /// Peer confirmed it holds the chunk. + Present, + /// Peer confirmed it does not hold the chunk. + Absent, + /// No verdict obtained (timeout / transport error / malformed response). + NoVerdict, +} + +/// Pick a randomised delay in `[min, max]` to wait before a possession check +/// runs. The bounds come from `ReplicationConfig` (defaulting to +/// `POSSESSION_CHECK_DELAY_MIN`/`MAX`) so tests can shorten them. +#[must_use] +pub fn random_delay(min: Duration, max: Duration) -> Duration { + let to_millis = |d: Duration| u64::try_from(d.as_millis()).unwrap_or(u64::MAX); + let min_ms = to_millis(min); + let max_ms = to_millis(max); + if min_ms >= max_ms { + return min; + } + Duration::from_millis(rand::thread_rng().gen_range(min_ms..=max_ms)) +} + +/// Run the possession check for one chunk against every responsible peer. +/// +/// Penalises each peer confirmed absent at `AuditChallenge` severity, leaves +/// present peers unrewarded, and never penalises a peer that only failed to +/// yield a verdict. +pub async fn run_possession_check( + key: XorName, + peers: Vec, + p2p_node: &Arc, + shutdown: &CancellationToken, +) { + let key_hex = hex::encode(key); + for peer in peers { + if shutdown.is_cancelled() { + return; + } + match probe_with_grace(&key, &peer, p2p_node, shutdown).await { + ProbeOutcome::Present => { + debug!("Possession check: {peer} holds {key_hex}"); + } + ProbeOutcome::Absent => { + warn!( + "Possession check: {peer} is missing {key_hex}; penalising at audit severity" + ); + p2p_node + .report_trust_event( + &peer, + TrustEvent::ApplicationFailure(config::AUDIT_FAILURE_TRUST_WEIGHT), + ) + .await; + } + ProbeOutcome::NoVerdict => { + debug!( + "Possession check: no verdict from {peer} for {key_hex} after grace; \ + not penalised" + ); + } + } + } +} + +/// Probe a peer for possession, re-attempting on no-verdict up to the grace +/// bound. A definite Present/Absent verdict short-circuits immediately. +async fn probe_with_grace( + key: &XorName, + peer: &PeerId, + p2p_node: &Arc, + shutdown: &CancellationToken, +) -> ProbeOutcome { + for attempt in 1..=POSSESSION_CHECK_MAX_ATTEMPTS { + match probe_once(key, peer, p2p_node).await { + ProbeOutcome::NoVerdict if attempt < POSSESSION_CHECK_MAX_ATTEMPTS => { + tokio::select! { + () = shutdown.cancelled() => return ProbeOutcome::NoVerdict, + () = tokio::time::sleep(POSSESSION_CHECK_RETRY_BACKOFF) => {} + } + } + outcome => return outcome, + } + } + ProbeOutcome::NoVerdict +} + +/// Send one presence-only `VerificationRequest` and interpret the response. +async fn probe_once(key: &XorName, peer: &PeerId, p2p_node: &Arc) -> ProbeOutcome { + let request = VerificationRequest { + keys: vec![*key], + // Presence-only: no paid-list status is needed to judge possession. + paid_list_check_indices: Vec::new(), + }; + let msg = ReplicationMessage { + request_id: rand::random(), + body: ReplicationMessageBody::VerificationRequest(request), + }; + let Ok(encoded) = msg.encode() else { + warn!( + "Failed to encode possession request for {}", + hex::encode(key) + ); + return ProbeOutcome::NoVerdict; + }; + + let response = match p2p_node + .send_request( + peer, + REPLICATION_PROTOCOL_ID, + encoded, + POSSESSION_CHECK_TIMEOUT, + ) + .await + { + Ok(response) => response, + Err(e) => { + debug!("Possession probe to {peer} failed: {e}"); + return ProbeOutcome::NoVerdict; + } + }; + + let decoded = match ReplicationMessage::decode(&response.data) { + Ok(decoded) => decoded, + Err(e) => { + debug!("Failed to decode possession response from {peer}: {e}"); + return ProbeOutcome::NoVerdict; + } + }; + + let ReplicationMessageBody::VerificationResponse(resp) = decoded.body else { + debug!("Unexpected possession response type from {peer}"); + return ProbeOutcome::NoVerdict; + }; + + match resp.results.iter().find(|r| r.key == *key) { + Some(result) if result.present => ProbeOutcome::Present, + Some(_) => ProbeOutcome::Absent, + None => ProbeOutcome::NoVerdict, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::replication::config::{POSSESSION_CHECK_DELAY_MAX, POSSESSION_CHECK_DELAY_MIN}; + + #[test] + fn random_delay_is_within_bounds() { + for _ in 0..100 { + let d = random_delay(POSSESSION_CHECK_DELAY_MIN, POSSESSION_CHECK_DELAY_MAX); + assert!(d >= POSSESSION_CHECK_DELAY_MIN); + assert!(d <= POSSESSION_CHECK_DELAY_MAX); + } + } +} diff --git a/src/storage/handler.rs b/src/storage/handler.rs index 586b517..13c52d0 100644 --- a/src/storage/handler.rs +++ b/src/storage/handler.rs @@ -38,13 +38,27 @@ use crate::client::compute_address; use crate::error::{Error, Result}; use crate::logging::{debug, info, warn}; use crate::payment::{PaymentVerifier, QuoteGenerator, VerificationContext}; +use crate::replication::admission; +use crate::replication::config::K_BUCKET_SIZE; use crate::replication::fresh::FreshWriteEvent; use crate::storage::lmdb::LmdbStorage; use bytes::Bytes; +use parking_lot::RwLock; use saorsa_core::P2PNode; use std::sync::Arc; use tokio::sync::mpsc; +/// Width of the self-closeness gate on client PUTs (ADR-0003): a node accepts +/// a PUT only when it is within its own local `SELF_CLOSENESS_GATE_WIDTH` +/// closest peers to the address. +/// +/// Set to the client's PUT fallback ceiling (`K_BUCKET_SIZE`), wider than the +/// storage-admission width, so a client routing past full close-group members +/// onto further peers (ADR-0002) is still accepted here, while a genuinely far +/// node — which could only mis-attribute fresh-replication failures — is +/// turned away. +const SELF_CLOSENESS_GATE_WIDTH: usize = K_BUCKET_SIZE; + /// ANT protocol handler. /// /// Handles chunk PUT/GET/Quote requests using LMDB storage for persistence @@ -59,6 +73,10 @@ pub struct AntProtocol { quote_generator: Arc, /// Channel for notifying the replication engine about newly-stored chunks. fresh_write_tx: Option>, + /// The node's P2P handle, attached post-construction via + /// `attach_p2p_node`. Drives the self-closeness gate on client PUTs; + /// `None` in unit tests that never attach a node. + p2p_node: RwLock>>, } impl AntProtocol { @@ -90,6 +108,7 @@ impl AntProtocol { payment_verifier, quote_generator, fresh_write_tx: None, + p2p_node: RwLock::new(None), } } @@ -99,8 +118,9 @@ impl AntProtocol { /// checks can use the live routing view. Idempotent: calling twice /// replaces the verifier handle. pub fn attach_p2p_node(&self, node: Arc) { + *self.p2p_node.write() = Some(Arc::clone(&node)); self.payment_verifier.attach_p2p_node(node); - debug!("AntProtocol: P2PNode attached for payment live-DHT checks"); + debug!("AntProtocol: P2PNode attached for payment live-DHT checks and self-closeness gate"); } /// Set the channel sender for fresh-write replication events. @@ -280,9 +300,28 @@ impl AntProtocol { return ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string())); } + // Self-closeness gate (ADR-0003): accept a client PUT only when this + // node is within its own local closest view of the address, so the + // fresh replication it triggers is legitimate and cannot mis-penalise + // honest peers. The width is the client's PUT fallback ceiling + // (`SELF_CLOSENESS_GATE_WIDTH`), so a client routing past full + // close-group members onto further peers is still accepted here. + // Skipped when no P2P handle is attached (unit tests). Bind the handle + // out of the lock first so no guard is held across the `.await`. + let attached = self.p2p_node.read().as_ref().map(Arc::clone); + if let Some(p2p) = attached { + let self_id = *p2p.peer_id(); + if !admission::is_responsible(&self_id, &address, &p2p, SELF_CLOSENESS_GATE_WIDTH).await + { + debug!("Rejecting PUT for {addr_hex}: not within local closest peers"); + return ChunkPutResponse::Error(ProtocolError::StorageFailed( + "node is not within its local closest peers for this address".to_string(), + )); + } + } + // 5. Verify payment. The ClientPut context applies the store-strength - // payment cache and verifies live proofs. Direct client PUT does not - // reject based on this node's local storage-responsibility view. + // payment cache and verifies live proofs. let payment_result = self .payment_verifier .verify_payment( diff --git a/tests/e2e/replication.rs b/tests/e2e/replication.rs index eb8ee49..ab3dc01 100644 --- a/tests/e2e/replication.rs +++ b/tests/e2e/replication.rs @@ -5,11 +5,12 @@ #![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] +use super::testnet::TestNetworkConfig; use super::TestHarness; use ant_node::client::compute_address; use ant_node::replication::commitment_state::{BuiltCommitment, ResponderCommitmentState}; use ant_node::replication::config::{ - storage_admission_width, REPAIR_HINT_MIN_AGE, REPLICATION_PROTOCOL_ID, + storage_admission_width, K_BUCKET_SIZE, REPAIR_HINT_MIN_AGE, REPLICATION_PROTOCOL_ID, }; use ant_node::replication::protocol::{ compute_audit_digest, AuditChallenge, AuditResponse, FetchRequest, FetchResponse, @@ -32,6 +33,20 @@ use tokio::sync::RwLock; const PROPAGATION_TIMEOUT: Duration = Duration::from_secs(15); /// Interval between propagation poll checks. const PROPAGATION_POLL_INTERVAL: Duration = Duration::from_millis(200); +/// Checker node used by the full-node shunning regression. +const FULL_NODE_SHUN_CHECKER_INDEX: usize = 3; +/// Target node made disk-full by the full-node shunning regression. +const FULL_NODE_SHUN_TARGET_INDEX: usize = 4; +/// Search budget for finding a key whose close group contains the full node. +const FULL_NODE_SHUN_KEY_SEARCH_ATTEMPTS: usize = 10_000; +/// Fast lower bound for the full-node shunning scheduler check. +const FULL_NODE_SHUN_POSSESSION_DELAY_MIN: Duration = Duration::from_millis(200); +/// Fast upper bound for the full-node shunning scheduler check. +const FULL_NODE_SHUN_POSSESSION_DELAY_MAX: Duration = Duration::from_millis(500); +/// Dummy proof length used when a test only needs to reach pre-payment gates. +const DUMMY_PAYMENT_PROOF_LEN: usize = 64; +/// Dummy proof byte used when a test only needs to reach pre-payment gates. +const DUMMY_PAYMENT_PROOF_BYTE: u8 = 0x01; /// Send a replication request via saorsa-core's request-response mechanism /// and decode the response. @@ -252,6 +267,393 @@ async fn test_fresh_replication_propagates_to_close_group() { harness.teardown().await.expect("teardown"); } +/// ADR-0003: the delayed possession check penalises a responsible peer that +/// does NOT hold the chunk, and leaves a peer that DOES hold it unpenalised. +/// +/// Drives the check directly (`run_possession_check_now`, bypassing the 5-15 +/// minute settle delay) so the detection+penalty path is asserted +/// deterministically over real transport. The penalty is observed as a drop in +/// the checker's trust score for the absent peer (the same signal saorsa-core +/// eviction acts on), via `P2PNode::peer_trust`. +#[tokio::test] +#[serial] +async fn possession_check_penalises_absent_peer_only() { + let harness = TestHarness::setup_small().await.expect("setup"); + harness.warmup_dht().await.expect("warmup"); + + // A is the checker; B will be absent, C will hold the chunk. All three are + // regular nodes (idx >= 3) with running replication engines and storage. + let a = harness.test_node(3).expect("node a"); + let b = harness.test_node(4).expect("node b"); + let c = harness.test_node(5).expect("node c"); + + let p2p_a = a.p2p_node.as_ref().expect("p2p a"); + let engine_a = a.replication_engine.as_ref().expect("engine a"); + let peer_b = *b.p2p_node.as_ref().expect("p2p b").peer_id(); + let peer_c = *c.p2p_node.as_ref().expect("p2p c").peer_id(); + + let content = b"adr-0003 possession-check payload"; + let address = compute_address(content); + + // C holds the chunk; B never stores it. + c.ant_protocol + .as_ref() + .expect("proto c") + .storage() + .put(&address, content) + .await + .expect("put on c"); + + assert!( + !b.ant_protocol + .as_ref() + .expect("proto b") + .storage() + .exists(&address) + .expect("exists b"), + "precondition: B must not hold the chunk" + ); + assert!( + c.ant_protocol + .as_ref() + .expect("proto c") + .storage() + .exists(&address) + .expect("exists c"), + "precondition: C must hold the chunk" + ); + + let trust_b_before = p2p_a.peer_trust(&peer_b); + let trust_c_before = p2p_a.peer_trust(&peer_c); + + // Probe both peers now (no scheduler delay). B is absent -> penalised; C is + // present -> untouched. + engine_a + .run_possession_check_now(address, vec![peer_b, peer_c]) + .await; + + let trust_b_after = p2p_a.peer_trust(&peer_b); + let trust_c_after = p2p_a.peer_trust(&peer_c); + + assert!( + trust_b_after < trust_b_before, + "absent peer B must be penalised: {trust_b_before} -> {trust_b_after}" + ); + assert!( + trust_c_after >= trust_c_before - f64::EPSILON, + "present peer C must not be penalised: {trust_c_before} -> {trust_c_after}" + ); + + harness.teardown().await.expect("teardown"); +} + +/// ADR-0003: the possession-check *scheduler* (not the direct-drive path) +/// fires after the configured delay and penalises an absent close peer. +/// +/// Uses a shortened possession delay so the scheduled check runs in well under +/// a second. No payment cache is pre-populated, so the close-group peers reject +/// the fresh offer and are absent when the scheduled check probes them. Proves +/// the `replicate_fresh` -> enqueue -> delayed scheduler -> penalty wiring. +#[tokio::test] +#[serial] +async fn possession_scheduler_penalises_absent_close_peer_after_delay() { + let mut net_config = TestNetworkConfig::small(); + net_config.replication_config = Some(ReplicationConfig { + possession_check_delay_min: Duration::from_millis(200), + possession_check_delay_max: Duration::from_millis(500), + ..ReplicationConfig::default() + }); + let harness = TestHarness::setup_with_config(net_config) + .await + .expect("setup"); + harness.warmup_dht().await.expect("warmup"); + + let a = harness.test_node(3).expect("node a"); + let p2p_a = a.p2p_node.as_ref().expect("p2p a"); + let engine_a = a.replication_engine.as_ref().expect("engine a"); + let self_a = *p2p_a.peer_id(); + + let content = b"adr-0003 scheduler-wiring payload"; + let address = compute_address(content); + + // A's close group for this key = exactly the peers the scheduled possession + // check targets. With no payment cache anywhere, they reject the fresh offer + // and are absent when probed. + let close_group_size = ReplicationConfig::default().close_group_size; + let close_group: Vec = p2p_a + .dht_manager() + .find_closest_nodes_local_with_self(&address, close_group_size) + .await + .iter() + .filter(|n| n.peer_id != self_a) + .map(|n| n.peer_id) + .collect(); + assert!(!close_group.is_empty(), "expected a non-empty close group"); + + let trust_before: Vec = close_group.iter().map(|p| p2p_a.peer_trust(p)).collect(); + + // Trigger fresh replication; the engine enqueues the possession check, which + // fires ~200-500 ms later and penalises the absent close peers. + let dummy_pop = [0x01u8; 64]; + engine_a + .replicate_fresh(&address, content, &dummy_pop) + .await; + + // Poll until at least one absent close peer is penalised (trust drops). + let deadline = tokio::time::Instant::now() + PROPAGATION_TIMEOUT; + let mut penalised = false; + while tokio::time::Instant::now() < deadline { + penalised = close_group + .iter() + .zip(trust_before.iter()) + .any(|(peer, &before)| p2p_a.peer_trust(peer) < before - f64::EPSILON); + if penalised { + break; + } + tokio::time::sleep(PROPAGATION_POLL_INTERVAL).await; + } + assert!( + penalised, + "the scheduled possession check should have penalised an absent close peer" + ); + + harness.teardown().await.expect("teardown"); +} + +/// ADR-0003 full-node shunning: a close-group peer that is disk-full rejects a +/// fresh-replication offer before payment verification, remains absent for the +/// key, and is penalised when the checker probes possession. +/// +/// This bridges the two protections that make a full node get shunned by close +/// groups: capacity rejection creates a missing replica, and the delayed +/// possession-check verdict turns that absence into the trust signal that +/// saorsa-core eviction acts on. +#[tokio::test] +#[serial] +async fn full_close_group_node_rejects_replica_and_is_penalised_as_absent() { + let mut net_config = TestNetworkConfig::small(); + net_config.replication_config = Some(ReplicationConfig { + possession_check_delay_min: FULL_NODE_SHUN_POSSESSION_DELAY_MIN, + possession_check_delay_max: FULL_NODE_SHUN_POSSESSION_DELAY_MAX, + ..ReplicationConfig::default() + }); + net_config + .storage_disk_reserve_overrides + .insert(FULL_NODE_SHUN_TARGET_INDEX, u64::MAX); + let harness = TestHarness::setup_with_config(net_config) + .await + .expect("setup"); + harness.warmup_dht().await.expect("warmup"); + + let checker = harness + .test_node(FULL_NODE_SHUN_CHECKER_INDEX) + .expect("checker node"); + let full_node = harness + .test_node(FULL_NODE_SHUN_TARGET_INDEX) + .expect("full node"); + let checker_p2p = checker.p2p_node.as_ref().expect("checker p2p"); + let checker_engine = checker.replication_engine.as_ref().expect("checker engine"); + let full_p2p = full_node.p2p_node.as_ref().expect("full node p2p"); + let full_peer = *full_p2p.peer_id(); + + let close_group_size = ReplicationConfig::default().close_group_size; + let admission_width = storage_admission_width(close_group_size); + let mut candidate = None; + for attempt in 0..FULL_NODE_SHUN_KEY_SEARCH_ATTEMPTS { + let content = format!("adr-0003 full-node shunning payload {attempt}").into_bytes(); + let address = compute_address(&content); + let full_node_in_checker_close_group = checker_p2p + .dht_manager() + .find_closest_nodes_local_with_self(&address, close_group_size) + .await + .iter() + .any(|node| node.peer_id == full_peer); + if !full_node_in_checker_close_group { + continue; + } + + let full_node_admits_self = full_p2p + .dht_manager() + .find_closest_nodes_local_with_self(&address, admission_width) + .await + .iter() + .any(|node| node.peer_id == full_peer); + if full_node_admits_self { + candidate = Some((content, address)); + break; + } + } + let (content, address) = + candidate.expect("find key where full node is a responsible close-group peer"); + + for idx in 0..harness.node_count() { + if let Some(protocol) = harness + .test_node(idx) + .and_then(|node| node.ant_protocol.as_ref()) + { + protocol.payment_verifier().cache_insert(address); + } + } + + let dummy_payment_proof = vec![DUMMY_PAYMENT_PROOF_BYTE; DUMMY_PAYMENT_PROOF_LEN]; + let offer = FreshReplicationOffer { + key: address, + data: content.clone(), + proof_of_payment: dummy_payment_proof.clone(), + }; + let response = send_replication_request( + checker_p2p, + &full_peer, + ReplicationMessage { + request_id: rand::random(), + body: ReplicationMessageBody::FreshReplicationOffer(offer), + }, + PROPAGATION_TIMEOUT, + ) + .await; + + match response.body { + ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected { + key, + reason, + }) => { + assert_eq!(key, address); + assert!( + reason.contains("Insufficient disk space"), + "expected disk-full rejection, got: {reason}" + ); + } + other => panic!("expected disk-full rejection, got: {other:?}"), + } + + let full_storage = full_node + .ant_protocol + .as_ref() + .expect("full node protocol") + .storage(); + assert!( + !full_storage.exists(&address).expect("exists on full node"), + "full node must not store the rejected replica" + ); + + let trust_before = checker_p2p.peer_trust(&full_peer); + checker_engine + .replicate_fresh(&address, &content, &dummy_payment_proof) + .await; + + let deadline = tokio::time::Instant::now() + PROPAGATION_TIMEOUT; + let mut trust_after = trust_before; + while tokio::time::Instant::now() < deadline { + trust_after = checker_p2p.peer_trust(&full_peer); + if trust_after < trust_before - f64::EPSILON { + break; + } + tokio::time::sleep(PROPAGATION_POLL_INTERVAL).await; + } + assert!( + trust_after < trust_before - f64::EPSILON, + "full close-group peer should be shunned by the scheduled possession check: \ + {trust_before} -> {trust_after}" + ); + + harness.teardown().await.expect("teardown"); +} + +/// ADR-0003 self-closeness gate: a node accepts a client PUT only when it is +/// within its own local `K_BUCKET_SIZE`-closest to the address. +/// +/// Needs more than `K_BUCKET_SIZE` nodes so a far node falls outside its own +/// closest view. The responsible (closest) node accepts and stores; the +/// non-responsible (farthest) node rejects before payment with a closeness +/// error. Guards both the regression risk (gate must not reject responsible +/// puts) and the intended reject behaviour. +#[tokio::test] +#[serial] +async fn self_closeness_gate_accepts_responsible_rejects_far_node() { + let harness = TestHarness::setup().await.expect("setup"); + harness.warmup_dht().await.expect("warmup"); + + let content = b"adr-0003 self-closeness gate payload"; + let address = compute_address(content); + + // Rank all peers by XOR distance to the address (identical from any node's + // view once routing tables are warm). + let ranker_p2p = harness + .test_node(3) + .expect("ranker") + .p2p_node + .as_ref() + .expect("p2p"); + let ranked: Vec = ranker_p2p + .dht_manager() + .find_closest_nodes_local_with_self(&address, harness.node_count()) + .await + .iter() + .map(|n| n.peer_id) + .collect(); + assert!( + ranked.len() > K_BUCKET_SIZE, + "need > K_BUCKET_SIZE nodes to exercise the reject path; got {}", + ranked.len() + ); + + // Only nodes that actually run a protocol handler can serve a client PUT. + let has_protocol = |peer: &PeerId| { + node_index_for_peer(&harness, peer) + .and_then(|idx| harness.test_node(idx)) + .is_some_and(|n| n.ant_protocol.is_some()) + }; + + // Closest node with a handler -> within its own K-closest -> gate accepts. + let close_peer = ranked + .iter() + .copied() + .find(|p| has_protocol(p)) + .expect("a close node with a handler"); + // Farthest node beyond the gate width with a handler -> gate rejects. + let far_peer = ranked + .iter() + .copied() + .enumerate() + .rev() + .find(|(rank, p)| *rank >= K_BUCKET_SIZE && has_protocol(p)) + .map(|(_, p)| p) + .expect("a far node (rank >= K_BUCKET_SIZE) with a handler"); + + let close_idx = node_index_for_peer(&harness, &close_peer).expect("close idx"); + let far_idx = node_index_for_peer(&harness, &far_peer).expect("far idx"); + + // Accept path: a responsible node stores the chunk (gate passes). + let close_result = harness + .test_node(close_idx) + .expect("close node") + .store_chunk(content) + .await; + assert!( + close_result.is_ok(), + "responsible (closest) node must accept the PUT, got {close_result:?}" + ); + + // Reject path: a non-responsible node rejects before payment with a + // closeness error. + let far_result = harness + .test_node(far_idx) + .expect("far node") + .store_chunk(content) + .await; + assert!( + far_result.is_err(), + "non-responsible (farthest) node must reject the PUT" + ); + let err = format!("{}", far_result.expect_err("far rejection")); + assert!( + err.contains("closest"), + "rejection should cite closeness, got: {err}" + ); + + harness.teardown().await.expect("teardown"); +} + /// `PaidForList` persistence (Section 18 #43). /// /// Insert a key into the `PaidList`, verify it persists by reopening the diff --git a/tests/e2e/testnet.rs b/tests/e2e/testnet.rs index 6b5ae84..3eb19a8 100644 --- a/tests/e2e/testnet.rs +++ b/tests/e2e/testnet.rs @@ -35,6 +35,7 @@ use saorsa_core::{ identity::NodeIdentity, IPDiversityConfig as CoreDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, }; +use std::collections::HashMap; use std::net::{Ipv4Addr, SocketAddr}; use std::path::PathBuf; use std::sync::Arc; @@ -222,6 +223,17 @@ pub struct TestNetworkConfig { /// this network (e.g. Anvil testnet) for on-chain verification. /// When `None`, defaults to `ArbitrumOne`. pub evm_network: Option, + + /// Optional replication-config override applied to every node's + /// replication engine. `None` uses `ReplicationConfig::default()`. Tests + /// use this to shorten timers — e.g. the ADR-0003 possession-check delay. + pub replication_config: Option, + + /// Optional per-node storage disk-reserve overrides. + /// + /// Tests can set a node's reserve above available disk space to make its + /// capacity pre-check fail deterministically without filling the host disk. + pub storage_disk_reserve_overrides: HashMap, } impl Default for TestNetworkConfig { @@ -255,6 +267,8 @@ impl Default for TestNetworkConfig { enable_node_logging: false, payment_enforcement: false, evm_network: None, + replication_config: None, + storage_disk_reserve_overrides: HashMap::new(), } } } @@ -1040,9 +1054,19 @@ impl TestNetwork { })?); // Initialize AntProtocol for this node with payment enforcement setting - let ant_protocol = - Self::create_ant_protocol(&data_dir, self.config.evm_network.clone(), &identity) - .await?; + let storage_disk_reserve = self + .config + .storage_disk_reserve_overrides + .get(&index) + .copied() + .unwrap_or_default(); + let ant_protocol = Self::create_ant_protocol_with_disk_reserve( + &data_dir, + self.config.evm_network.clone(), + storage_disk_reserve, + &identity, + ) + .await?; Ok(TestNode { index, @@ -1080,10 +1104,25 @@ impl TestNetwork { data_dir: &std::path::Path, evm_network: Option, identity: &saorsa_core::identity::NodeIdentity, + ) -> Result { + Self::create_ant_protocol_with_disk_reserve(data_dir, evm_network, 0, identity).await + } + + /// Create an `AntProtocol` handler with an explicit storage disk reserve. + /// + /// # Errors + /// + /// Returns an error if LMDB storage initialisation fails. + pub async fn create_ant_protocol_with_disk_reserve( + data_dir: &std::path::Path, + evm_network: Option, + disk_reserve: u64, + identity: &saorsa_core::identity::NodeIdentity, ) -> Result { // Create LMDB storage let storage_config = LmdbStorageConfig { root_dir: data_dir.to_path_buf(), + disk_reserve, ..LmdbStorageConfig::test_default() }; let storage = LmdbStorage::new(storage_config) @@ -1284,7 +1323,7 @@ impl TestNetwork { (&node.p2p_node, &node.ant_protocol, &node.node_identity) { let shutdown = CancellationToken::new(); - let repl_config = ReplicationConfig::default(); + let repl_config = self.config.replication_config.clone().unwrap_or_default(); let (_fresh_tx, fresh_rx) = tokio::sync::mpsc::unbounded_channel(); let node_identity = Arc::clone(id); match ReplicationEngine::new(