Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions crates/hotblocks/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ pub struct CLI {
#[arg(long, value_name = "N", default_value = "10")]
pub rocksdb_keep_log_file_num: usize,

/// Grace period, in seconds, before a deleted table's data files are
/// physically unlinked by the background cleanup. MUST exceed the longest
/// query/snapshot lifetime: file unlink ignores snapshots, so a shorter
/// value can silently drop rows from an in-flight query. Default: 15 min.
#[arg(long, value_name = "SECS", default_value = "900")]
pub reclaim_grace_secs: u64,

/// Known client IDs for metrics labeling. Client IDs not in this list
/// will be reported as "unknown" to prevent metrics cardinality abuse.
#[arg(long = "known-client", value_name = "ID")]
Expand Down Expand Up @@ -90,6 +97,18 @@ impl CLI {
.map(Arc::new)
.context("failed to open rocksdb database")?;

// Crash recovery: drop DIRTY_TABLES markers left by a build that died
// before committing its chunk. Must happen before any ingest starts --
// an orphan marker's id pins the disk-reclaim watermark forever, so disk
// from later-deleted datasets is never freed. Safe here: the DB is open
// but no dataset controller / table builder exists yet. Best-effort:
// a failure degrades reclaim but must not block startup.
match db.purge_orphan_dirty_tables() {
Ok(0) => {}
Ok(n) => tracing::info!("purged {n} orphan dirty table(s) left by an interrupted build"),
Err(err) => tracing::warn!(error =? err, "failed to purge orphan dirty tables at startup")
}

let mut metrics_registry = crate::metrics::build_metrics_registry();
metrics_registry.register_collector(Box::new(DatasetMetricsCollector {
db: db.clone(),
Expand Down
19 changes: 17 additions & 2 deletions crates/hotblocks/src/data_service.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::{
collections::{BTreeMap, HashMap},
sync::Arc
sync::Arc,
time::Duration
};

use anyhow::{Context, anyhow};
use futures::{FutureExt, StreamExt, TryStreamExt};
use sqd_data_client::reqwest::ReqwestDataClient;
use sqd_storage::db::DatasetId;
use tracing::{error, info};
use tracing::{debug, error, info};

use crate::{
dataset_config::{DatasetConfig, RetentionConfig},
Expand All @@ -34,6 +35,20 @@ impl DataService {
}
}

// Reclaim the disk of the just-deleted datasets now, BEFORE any controller
// is spawned. The file unlink ignores snapshots, but no ingest/compaction
// or query snapshot exists yet, so a zero grace is safe here -- and it
// frees the most. Doing it later (once controllers hold snapshots) would
// race their reads. Write-free, so it also makes progress at a full disk.
{
let db = db.clone();
match tokio::task::spawn_blocking(move || db.reclaim_disk_space(Duration::ZERO)).await {
Ok(Ok(n)) => debug!("startup reclaim freed {n} table(s)"),
Ok(Err(err)) => error!(error =? err, "startup reclaim failed"),
Err(_) => error!("startup reclaim panicked")
}
}

let mut controllers = futures::stream::iter(datasets.into_iter())
.map(|(dataset_id, cfg)| {
let db = db.clone();
Expand Down
67 changes: 52 additions & 15 deletions crates/hotblocks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,15 @@ fn main() -> anyhow::Result<()> {
.block_on(async {
let app = args.build_app().await?;

tokio::spawn(db_cleanup_task(app.db.clone()));
// NB: startup disk reclaim of unconfigured/deleted datasets runs inside
// DataService::start (in build_app above), before any controller spawns
// -- that is the only point at which a zero grace is safe, since no
// ingest/compaction/query snapshot exists yet.

tokio::spawn(db_cleanup_task(
app.db.clone(),
Duration::from_secs(args.reclaim_grace_secs),
));

let api = build_api(app);

Expand Down Expand Up @@ -89,24 +97,53 @@ async fn shutdown_signal() {
}
}

const CLEANUP_INTERVAL: Duration = Duration::from_secs(10);
/// Backoff after a failed cleanup tick, so a persistent error (e.g. a full disk)
/// doesn't busy-loop failing writes.
const CLEANUP_ERROR_BACKOFF: Duration = Duration::from_secs(30);

#[instrument(name = "db_cleanup", skip_all)]
async fn db_cleanup_task(db: DBRef) {
tokio::time::sleep(Duration::from_secs(10)).await;
async fn db_cleanup_task(db: DBRef, reclaim_grace: Duration) {
tokio::time::sleep(CLEANUP_INTERVAL).await;
loop {
debug!("db cleanup started");
let db = db.clone();
let result = tokio::task::spawn_blocking(move || db.cleanup()).await;
match result {
Ok(Ok(deleted)) => {
if deleted > 0 {
debug!("purged {} tables", deleted)
} else {
debug!("nothing to purge, pausing cleanup for 10 seconds");
tokio::time::sleep(Duration::from_secs(10)).await;
let result = tokio::task::spawn_blocking(move || {
// Run both phases independently.
// Phase 1 (range tombstones) writes, so it errors on a full disk;
// Phase 2 (file unlink) writes nothing and is the only path that frees space
// when the disk is full, so it must run even if Phase 1 just failed.
let purged = db.cleanup();
let reclaimed = db.reclaim_disk_space(reclaim_grace);
(purged, reclaimed)
})
.await;

let failed = match result {
Ok((purged, reclaimed)) => {
if let Err(err) = &purged {
error!(error =? err, "cleanup phase 1 (logical purge) failed");
}
if let Err(err) = &reclaimed {
error!(error =? err, "cleanup phase 2 (disk reclaim) failed");
}
if let (Ok(purged), Ok(reclaimed)) = (&purged, &reclaimed) {
if *purged > 0 || *reclaimed > 0 {
debug!("cleanup: purged {purged} tables, reclaimed {reclaimed} files");
}
}
purged.is_err() || reclaimed.is_err()
}
Ok(Err(err)) => error!(error =? err, "database cleanup task failed"),
Err(_) => error!("database cleanup task panicked")
}
Err(_) => {
error!("database cleanup task panicked");
true
}
};

tokio::time::sleep(if failed {
CLEANUP_ERROR_BACKOFF
} else {
CLEANUP_INTERVAL
})
.await;
}
}
69 changes: 65 additions & 4 deletions crates/storage/src/db/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::{
use crate::db::{
ops::{perform_dataset_compaction, CompactionStatus},
read::datasets::list_all_datasets,
write::{ops::deleted_deleted_tables, table_builder::TableBuilder, tx::Tx},
write::{ops as cleanup_ops, table_builder::TableBuilder, tx::Tx},
Chunk, DatasetUpdate
};

Expand All @@ -36,7 +36,8 @@ pub struct DatabaseSettings {
direct_io: bool,
cache_index_and_filter_blocks: bool,
max_log_file_size: usize,
keep_log_file_num: usize
keep_log_file_num: usize,
auto_compactions: bool
}

impl Default for DatabaseSettings {
Expand All @@ -48,7 +49,8 @@ impl Default for DatabaseSettings {
direct_io: false,
cache_index_and_filter_blocks: false,
max_log_file_size: 10,
keep_log_file_num: 10
keep_log_file_num: 10,
auto_compactions: true
}
}
}
Expand Down Expand Up @@ -91,6 +93,14 @@ impl DatabaseSettings {
self
}

/// Enable/disable RocksDB background auto-compaction of the table data.
/// Defaults to `true`; mainly for tests that need deterministic control over
/// when compaction runs (manual `compact_tables`/reclaim still work).
pub fn with_auto_compactions(mut self, yes: bool) -> Self {
self.auto_compactions = yes;
self
}

fn db_options(&self) -> RocksOptions {
let mut options = RocksOptions::default();
options.create_if_missing(true);
Expand Down Expand Up @@ -141,6 +151,16 @@ impl DatabaseSettings {
let mut options = RocksOptions::default();
options.set_block_based_table_factory(&block_based_table_factory);
options.set_compression_type(rocksdb::DBCompressionType::Lz4);
// Help compaction find tombstone-heavy SSTs (table deletes leave range
// tombstones), and bound staleness so no dead file lingers uncompacted.
// A lone range tombstone over a whole table has low deletion *density*,
// so periodic compaction is the real backstop here; the collector mostly
// catches denser boundary files. Thresholds are provisional.
options.add_compact_on_deletion_collector_factory(128 * 1024, 64 * 1024, 0.5);
options.set_periodic_compaction_seconds(24 * 60 * 60);
if !self.auto_compactions {
options.set_disable_auto_compactions(true);
}
options
}

Expand Down Expand Up @@ -287,8 +307,49 @@ impl Database {
Ok(())
}

/// Phase 1 -- logically purge deleted tables (snapshot-safe range tombstones).
/// Returns the number of tables logically deleted by this call.
pub fn cleanup(&self) -> anyhow::Result<usize> {
deleted_deleted_tables(&self.db)
cleanup_ops::logical_cleanup(&self.db)
}

/// Phase 2 -- physically reclaim disk space from tables deleted longer than
/// `grace` ago, by unlinking whole SST files below the live watermark.
///
/// Performs no writes and needs no scratch space, so it works at a full disk.
/// `grace` MUST exceed the maximum in-flight query/snapshot lifetime: file
/// unlinks ignore snapshots, so a shorter grace can drop rows from a running
/// query. Use [`Duration::ZERO`] only where no readers exist (e.g. startup).
/// Returns the number of deleted-table records reclaimed.
pub fn reclaim_disk_space(&self, grace: std::time::Duration) -> anyhow::Result<usize> {
cleanup_ops::reclaim_disk_space(&self.db, grace)
}

/// Crash recovery: purge `DIRTY_TABLES` markers left by builds that died
/// before committing their chunk. Such an orphan's id would otherwise pin
/// the disk-reclaim watermark forever (see [`Database::reclaim_disk_space`]).
///
/// MUST be called only before any ingest/table build starts (e.g. at
/// startup), since it treats every dirty marker as an orphan. Returns the
/// number of orphan markers purged.
pub fn purge_orphan_dirty_tables(&self) -> anyhow::Result<usize> {
cleanup_ops::purge_orphan_dirty_tables(&self.db)
}

/// Flush the table-data column family's memtable to SST files. Useful before
/// a reclaim/shutdown so freshly written data is on disk as files.
pub fn flush(&self) -> anyhow::Result<()> {
self.db.flush_cf(self.db.cf_handle(CF_TABLES).unwrap())?;
Ok(())
}

/// Force a full compaction of the table-data column family. Unlike
/// [`Database::reclaim_disk_space`] this *writes* (needs scratch space, so it
/// is not safe at a full disk); it exists to push data into the bottom level
/// and to rewrite tombstone-heavy boundary files that file-unlink can't reach.
pub fn compact_tables(&self) {
let cf = self.db.cf_handle(CF_TABLES).unwrap();
self.db.compact_range_cf(cf, None::<&[u8]>, None::<&[u8]>);
}

pub fn get_statistics(&self) -> Option<String> {
Expand Down
30 changes: 27 additions & 3 deletions crates/storage/src/db/table_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@ impl TableId {
}

pub fn from_slice(bytes: &[u8]) -> Self {
Self {
uuid: Uuid::from_slice(bytes).unwrap()
}
Self::try_from_slice(bytes).expect("TableId::from_slice: not a 16-byte UUID")
}

/// Decode a key into a `TableId`, or `None` if it is not exactly a 16-byte
/// UUID. Use this (not [`TableId::from_slice`]) when scanning a column family
/// whose keys could in principle be corrupt/short: a malformed key is then
/// skipped instead of panicking and wedging the scan.
pub fn try_from_slice(bytes: &[u8]) -> Option<Self> {
Uuid::from_slice(bytes).ok().map(|uuid| Self { uuid })
}
}

Expand All @@ -31,3 +37,21 @@ impl Display for TableId {
self.uuid.fmt(f)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn try_from_slice_rejects_non_16_byte_keys() {
// A real 16-byte id round-trips.
let id = TableId::new();
assert_eq!(TableId::try_from_slice(id.as_ref()), Some(id));

// Anything that is not exactly 16 bytes decodes to None instead of
// panicking (so a corrupt/short key can be skipped, not wedge cleanup).
assert_eq!(TableId::try_from_slice(&[]), None);
assert_eq!(TableId::try_from_slice(&[0u8; 15]), None);
assert_eq!(TableId::try_from_slice(&[0u8; 17]), None);
}
}
Loading
Loading