Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 66 additions & 2 deletions src/replication/audit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::replication::protocol::{
use crate::replication::types::{
AuditFailureReason, AuditFailureSummary, FailureEvidence, PeerSyncRecord, RepairProofs,
};
use crate::replication::{is_inbound_replication_overloaded_reason, OverloadClaimTracker};
use crate::storage::LmdbStorage;
use saorsa_core::identity::PeerId;
use saorsa_core::P2PNode;
Expand Down Expand Up @@ -86,20 +87,55 @@ pub async fn audit_tick(
.await
}

/// Execute one repair-proof-gated audit tick with a fresh overload-claim
/// tracker.
///
/// This preserves the public helper signature that existed before inbound
/// overload accounting was added. The replication engine uses the crate-private
/// tracker-aware helper so overload claim budgets persist across audit ticks.
#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
pub async fn audit_tick_with_repair_proofs(
p2p_node: &Arc<P2PNode>,
storage: &Arc<LmdbStorage>,
config: &ReplicationConfig,
sync_history: &HashMap<PeerId, PeerSyncRecord>,
repair_proofs: &Arc<RwLock<RepairProofs>>,
current_sync_epoch: u64,
is_bootstrapping: bool,
) -> AuditTickResult {
let overload_tracker = Arc::new(OverloadClaimTracker::new());
audit_tick_with_repair_proofs_and_overload_tracker(
p2p_node,
storage,
config,
sync_history,
repair_proofs,
current_sync_epoch,
&overload_tracker,
is_bootstrapping,
)
.await
}

/// Execute one repair-proof-gated audit tick.
///
/// This is the production path used by the replication engine. The
/// compatibility [`audit_tick`] wrapper passes an empty proof table, so direct
/// callers that have not adopted repair proofs remain conservative and do not
/// audit peers for unproven keys.
#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
pub async fn audit_tick_with_repair_proofs(
#[allow(
clippy::implicit_hasher,
clippy::too_many_lines,
clippy::too_many_arguments
)]
pub(crate) async fn audit_tick_with_repair_proofs_and_overload_tracker(
p2p_node: &Arc<P2PNode>,
storage: &Arc<LmdbStorage>,
config: &ReplicationConfig,
sync_history: &HashMap<PeerId, PeerSyncRecord>,
repair_proofs: &Arc<RwLock<RepairProofs>>,
current_sync_epoch: u64,
overload_tracker: &Arc<OverloadClaimTracker>,
is_bootstrapping: bool,
) -> AuditTickResult {
// Invariant 19: never audit while still bootstrapping.
Expand Down Expand Up @@ -264,6 +300,8 @@ pub async fn audit_tick_with_repair_proofs(
)
.await;
}
// A cooperative response clears any prior overload streak.
overload_tracker.record_normal_response(&challenged_peer);
// Step 7b: Bootstrapping claim.
AuditTickResult::BootstrapClaim {
peer: challenged_peer,
Expand All @@ -285,6 +323,8 @@ pub async fn audit_tick_with_repair_proofs(
)
.await;
}
// A cooperative response clears any prior overload streak.
overload_tracker.record_normal_response(&challenged_peer);
verify_digests(
&challenged_peer,
challenge_id,
Expand Down Expand Up @@ -313,6 +353,30 @@ pub async fn audit_tick_with_repair_proofs(
)
.await;
}
if is_inbound_replication_overloaded_reason(&reason) {
// Honor a brief genuine overload, but do not let a peer dodge
// the audit indefinitely by always claiming to be overloaded.
if overload_tracker.honor_overload_claim(&challenged_peer) {
debug!(
"Audit: challenge deferred by overloaded peer {challenged_peer}: {reason}"
);
return AuditTickResult::Idle;
}
warn!(
"Audit: peer {challenged_peer} exceeded overload-claim budget; treating challenge rejection as a failure"
);
return handle_audit_failure(
&challenged_peer,
challenge_id,
&peer_keys,
AuditFailureReason::Rejected,
p2p_node,
config,
)
.await;
}
// A genuine (non-overload) rejection clears any prior overload streak.
overload_tracker.record_normal_response(&challenged_peer);
warn!("Audit: challenge rejected by {challenged_peer}: {reason}");
handle_audit_failure(
&challenged_peer,
Expand Down
83 changes: 77 additions & 6 deletions src/replication/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::logging::{debug, info, warn};
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;

use saorsa_core::identity::PeerId;
use saorsa_core::DhtNetworkEvent;

use crate::ant_protocol::XorName;
Expand Down Expand Up @@ -134,6 +135,12 @@ pub async fn check_bootstrap_drained(
return false;
}

if !state.overload_deferred_sync_peers.is_empty() {
let n = state.overload_deferred_sync_peers.len();
debug!("Bootstrap NOT drained: {n} peer(s) have overload-deferred sync retries");
return false;
}

if queues.is_bootstrap_work_empty(&state.pending_keys) {
state.drained = true;
info!("Bootstrap drained: all peer requests completed and work queues empty");
Expand All @@ -150,10 +157,7 @@ pub async fn check_bootstrap_drained(
/// [`clear_capacity_rejected`] when the same source's next admission cycle
/// completes with zero rejections (i.e. the source successfully
/// re-delivered everything that previously overflowed).
pub async fn note_capacity_rejected(
bootstrap_state: &Arc<RwLock<BootstrapState>>,
source: saorsa_core::identity::PeerId,
) {
pub async fn note_capacity_rejected(bootstrap_state: &Arc<RwLock<BootstrapState>>, source: PeerId) {
let mut state = bootstrap_state.write().await;
if state.capacity_rejected_sources.insert(source) {
let n = state.capacity_rejected_sources.len();
Expand All @@ -172,16 +176,57 @@ pub async fn note_capacity_rejected(
/// drained" is retired. No-op if the source had no outstanding rejections.
pub async fn clear_capacity_rejected(
bootstrap_state: &Arc<RwLock<BootstrapState>>,
source: &saorsa_core::identity::PeerId,
) {
source: &PeerId,
) -> bool {
let mut state = bootstrap_state.write().await;
if state.capacity_rejected_sources.remove(source) {
let n = state.capacity_rejected_sources.len();
debug!(
"Bootstrap: cleared outstanding capacity rejections for {source} \
({n} sources still outstanding)"
);
return true;
}
false
}

/// Record that `peer` reported overload during the initial bootstrap sync.
///
/// The peer has been queued for a later priority neighbor-sync retry. Bootstrap
/// cannot drain until that retry completes, fails, or the peer leaves the local
/// close-neighbor set.
pub async fn note_overload_deferred_sync(
bootstrap_state: &Arc<RwLock<BootstrapState>>,
peer: PeerId,
) {
let mut state = bootstrap_state.write().await;
if state.overload_deferred_sync_peers.insert(peer) {
let n = state.overload_deferred_sync_peers.len();
debug!(
"Bootstrap: peer {peer} has an overload-deferred sync retry \
({n} peers outstanding)"
);
}
}

/// Clear an outstanding overload-deferred bootstrap sync for `peer`.
///
/// Returns `true` when this call removed a blocker and the caller should re-run
/// the bootstrap drain check.
pub async fn clear_overload_deferred_sync(
bootstrap_state: &Arc<RwLock<BootstrapState>>,
peer: &PeerId,
) -> bool {
let mut state = bootstrap_state.write().await;
if state.overload_deferred_sync_peers.remove(peer) {
let n = state.overload_deferred_sync_peers.len();
debug!(
"Bootstrap: cleared overload-deferred sync retry for {peer} \
({n} peers still outstanding)"
);
return true;
}
false
}

/// Record a set of discovered keys into the bootstrap state for drain tracking.
Expand Down Expand Up @@ -247,6 +292,7 @@ mod tests {
pending_peer_requests: 5,
pending_keys: HashSet::new(),
capacity_rejected_sources: HashSet::new(),
overload_deferred_sync_peers: HashSet::new(),
}));
let queues = ReplicationQueues::new();

Expand All @@ -263,6 +309,7 @@ mod tests {
pending_peer_requests: 2,
pending_keys: HashSet::new(),
capacity_rejected_sources: HashSet::new(),
overload_deferred_sync_peers: HashSet::new(),
}));
let queues = ReplicationQueues::new();

Expand All @@ -279,6 +326,7 @@ mod tests {
pending_peer_requests: 0,
pending_keys: std::iter::once(xor_name_from_byte(0x01)).collect(),
capacity_rejected_sources: HashSet::new(),
overload_deferred_sync_peers: HashSet::new(),
}));
let queues = ReplicationQueues::new();

Expand All @@ -294,6 +342,7 @@ mod tests {
pending_peer_requests: 0,
pending_keys: std::iter::once(xor_name_from_byte(0x01)).collect(),
capacity_rejected_sources: HashSet::new(),
overload_deferred_sync_peers: HashSet::new(),
}));
let mut queues = ReplicationQueues::new();

Expand All @@ -305,6 +354,7 @@ mod tests {
tried_sources: HashSet::new(),
created_at: Instant::now(),
hint_sender: saorsa_core::identity::PeerId::from_bytes([0u8; 32]),
inconclusive_rounds: 0,
};
queues.add_pending_verify(xor_name_from_byte(0x01), entry);

Expand Down Expand Up @@ -406,4 +456,25 @@ mod tests {
clear_capacity_rejected(&state, &source_b).await;
assert!(check_bootstrap_drained(&state, &queues).await);
}

#[tokio::test]
async fn overload_deferred_sync_blocks_until_cleared() {
const TEST_PEER_BYTE: u8 = 0xCC;

let state = Arc::new(RwLock::new(BootstrapState::new()));
let queues = ReplicationQueues::new();
let peer = PeerId::from_bytes([TEST_PEER_BYTE; 32]);

note_overload_deferred_sync(&state, peer).await;
assert!(
!check_bootstrap_drained(&state, &queues).await,
"overload-deferred sync must block bootstrap drain"
);

assert!(clear_overload_deferred_sync(&state, &peer).await);
assert!(
check_bootstrap_drained(&state, &queues).await,
"bootstrap should drain once the overload-deferred retry is cleared"
);
}
}
9 changes: 9 additions & 0 deletions src/replication/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,15 @@ const PENDING_VERIFY_MAX_AGE_SECS: u64 = 30 * 60;
/// Maximum age for pending-verification entries before stale eviction.
pub const PENDING_VERIFY_MAX_AGE: Duration = Duration::from_secs(PENDING_VERIFY_MAX_AGE_SECS);

/// Maximum consecutive `QuorumInconclusive` rounds a pending key is retried
/// before it is abandoned.
///
/// A backstop complementing the age-based [`PENDING_VERIFY_MAX_AGE`] eviction:
/// it bounds wasted re-verification (and per-source pending-slot occupancy) for
/// keys whose targets never converge — persistently unreachable peers, or peers
/// that keep replying `Overloaded` beyond the per-peer overload budget.
pub const MAX_INCONCLUSIVE_VERIFY_ROUNDS: u32 = 10;

/// Trust event weight for confirmed audit failures.
pub const AUDIT_FAILURE_TRUST_WEIGHT: f64 = 5.0;

Expand Down
Loading
Loading