From 98eac3f389afa48231167f32350ef44605243583 Mon Sep 17 00:00:00 2001 From: Evgeny Formanenko Date: Thu, 25 Jun 2026 18:07:52 +0300 Subject: [PATCH] Reclaim hotblocks disk via DeleteFilesInRange + range deletes Replace the per-key tombstone purge with a two-phase table cleanup that can actually reclaim disk space, including at a full disk. Phase 1 (logical, snapshot-safe): cleanup() drops each deleted table with a single range tombstone (OptimisticTransactionDB::delete_range_cf) instead of millions of point deletes, then marks it reclaim-pending. Range tombstones respect snapshots, so in-flight queries are unaffected. Phase 2 (physical): reclaim_disk_space(grace) unlinks whole SST files below the live watermark (min live TableId across all chunks + dirty tables) via DeleteFilesInRange. It performs no writes and needs no scratch space, so it makes progress even at 100% disk. A per-table deletion timestamp in CF_DELETED_TABLES gates the unlink behind a grace period -- the file unlink ignores snapshots, so grace must exceed the max query/snapshot lifetime. reclaim_disk_space only clears bookkeeping for tables Phase 1 has already tombstoned, so a still-pending table is never forgotten with its data left un-tombstoned. Crash recovery: a DIRTY_TABLES marker with no committed chunk is an orphan left by a build that died before commit; its id would otherwise pin the watermark forever. purge_orphan_dirty_tables() drops such markers at startup, before any ingest, range-tombstoning the orphaned data so it can be reclaimed. Also: - TABLES CF: compact-on-deletion collector + 24h periodic compaction so compaction finds tombstone-heavy / boundary files. - hotblocks: the cleanup loop runs both phases each tick -- Phase 2 runs even when Phase 1's writes fail, so a full disk still gets freed -- and backs off on error instead of busy-looping; startup purges orphan dirty markers and reclaims unconfigured datasets' files before any controller spawns, the one point a zero grace is safe (no ingest/compaction/query snapshot exists yet). New --reclaim-grace-secs flag (default 15m). - Robustness: cleanup scans skip a malformed CF key instead of panicking (a panic would re-fire every tick and wedge cleanup). Tests: a MockDB harness drives the lifecycle (commit / delete / cleanup / reclaim / snapshot). Covers logical-delete snapshot safety; the Phase-2 grace/snapshot invariant (a pre-deletion reader still reads within grace and the files survive, unlinked only past grace) plus the negative case (a past-grace unlink under a live snapshot breaks the read); orphan-marker watermark pinning; un-tombstoned bookkeeping survival; physical reclaim; watermark pinning by a live table; idempotency; end-to-end delete_dataset + reclaim; and a value-codec unit test. Supersedes the sync_dataset_cleanup flag. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/hotblocks/src/cli.rs | 19 ++ crates/hotblocks/src/data_service.rs | 19 +- crates/hotblocks/src/main.rs | 67 +++-- crates/storage/src/db/db.rs | 69 ++++- crates/storage/src/db/table_id.rs | 30 ++- crates/storage/src/db/write/ops.rs | 321 +++++++++++++++++++++--- crates/storage/src/db/write/tx.rs | 9 +- crates/storage/tests/cleanup_reclaim.rs | 284 +++++++++++++++++++++ crates/storage/tests/mock_db/mod.rs | 211 ++++++++++++++++ 9 files changed, 968 insertions(+), 61 deletions(-) create mode 100644 crates/storage/tests/cleanup_reclaim.rs create mode 100644 crates/storage/tests/mock_db/mod.rs diff --git a/crates/hotblocks/src/cli.rs b/crates/hotblocks/src/cli.rs index 1e14ff72..0927d7af 100644 --- a/crates/hotblocks/src/cli.rs +++ b/crates/hotblocks/src/cli.rs @@ -61,6 +61,13 @@ pub struct CLI { #[arg(long, value_name = "N", default_value = "10")] pub rocksdb_keep_log_file_num: usize, + /// Grace period, in seconds, before a deleted table's data files are + /// physically unlinked by the background cleanup. MUST exceed the longest + /// query/snapshot lifetime: file unlink ignores snapshots, so a shorter + /// value can silently drop rows from an in-flight query. Default: 15 min. + #[arg(long, value_name = "SECS", default_value = "900")] + pub reclaim_grace_secs: u64, + /// Known client IDs for metrics labeling. Client IDs not in this list /// will be reported as "unknown" to prevent metrics cardinality abuse. #[arg(long = "known-client", value_name = "ID")] @@ -90,6 +97,18 @@ impl CLI { .map(Arc::new) .context("failed to open rocksdb database")?; + // Crash recovery: drop DIRTY_TABLES markers left by a build that died + // before committing its chunk. Must happen before any ingest starts -- + // an orphan marker's id pins the disk-reclaim watermark forever, so disk + // from later-deleted datasets is never freed. Safe here: the DB is open + // but no dataset controller / table builder exists yet. Best-effort: + // a failure degrades reclaim but must not block startup. + match db.purge_orphan_dirty_tables() { + Ok(0) => {} + Ok(n) => tracing::info!("purged {n} orphan dirty table(s) left by an interrupted build"), + Err(err) => tracing::warn!(error =? err, "failed to purge orphan dirty tables at startup") + } + let mut metrics_registry = crate::metrics::build_metrics_registry(); metrics_registry.register_collector(Box::new(DatasetMetricsCollector { db: db.clone(), diff --git a/crates/hotblocks/src/data_service.rs b/crates/hotblocks/src/data_service.rs index a95256c1..152778a8 100644 --- a/crates/hotblocks/src/data_service.rs +++ b/crates/hotblocks/src/data_service.rs @@ -1,13 +1,14 @@ use std::{ collections::{BTreeMap, HashMap}, - sync::Arc + sync::Arc, + time::Duration }; use anyhow::{Context, anyhow}; use futures::{FutureExt, StreamExt, TryStreamExt}; use sqd_data_client::reqwest::ReqwestDataClient; use sqd_storage::db::DatasetId; -use tracing::{error, info}; +use tracing::{debug, error, info}; use crate::{ dataset_config::{DatasetConfig, RetentionConfig}, @@ -34,6 +35,20 @@ impl DataService { } } + // Reclaim the disk of the just-deleted datasets now, BEFORE any controller + // is spawned. The file unlink ignores snapshots, but no ingest/compaction + // or query snapshot exists yet, so a zero grace is safe here -- and it + // frees the most. Doing it later (once controllers hold snapshots) would + // race their reads. Write-free, so it also makes progress at a full disk. + { + let db = db.clone(); + match tokio::task::spawn_blocking(move || db.reclaim_disk_space(Duration::ZERO)).await { + Ok(Ok(n)) => debug!("startup reclaim freed {n} table(s)"), + Ok(Err(err)) => error!(error =? err, "startup reclaim failed"), + Err(_) => error!("startup reclaim panicked") + } + } + let mut controllers = futures::stream::iter(datasets.into_iter()) .map(|(dataset_id, cfg)| { let db = db.clone(); diff --git a/crates/hotblocks/src/main.rs b/crates/hotblocks/src/main.rs index 409ffecd..cac38ec3 100644 --- a/crates/hotblocks/src/main.rs +++ b/crates/hotblocks/src/main.rs @@ -36,7 +36,15 @@ fn main() -> anyhow::Result<()> { .block_on(async { let app = args.build_app().await?; - tokio::spawn(db_cleanup_task(app.db.clone())); + // NB: startup disk reclaim of unconfigured/deleted datasets runs inside + // DataService::start (in build_app above), before any controller spawns + // -- that is the only point at which a zero grace is safe, since no + // ingest/compaction/query snapshot exists yet. + + tokio::spawn(db_cleanup_task( + app.db.clone(), + Duration::from_secs(args.reclaim_grace_secs), + )); let api = build_api(app); @@ -89,24 +97,53 @@ async fn shutdown_signal() { } } +const CLEANUP_INTERVAL: Duration = Duration::from_secs(10); +/// Backoff after a failed cleanup tick, so a persistent error (e.g. a full disk) +/// doesn't busy-loop failing writes. +const CLEANUP_ERROR_BACKOFF: Duration = Duration::from_secs(30); + #[instrument(name = "db_cleanup", skip_all)] -async fn db_cleanup_task(db: DBRef) { - tokio::time::sleep(Duration::from_secs(10)).await; +async fn db_cleanup_task(db: DBRef, reclaim_grace: Duration) { + tokio::time::sleep(CLEANUP_INTERVAL).await; loop { - debug!("db cleanup started"); let db = db.clone(); - let result = tokio::task::spawn_blocking(move || db.cleanup()).await; - match result { - Ok(Ok(deleted)) => { - if deleted > 0 { - debug!("purged {} tables", deleted) - } else { - debug!("nothing to purge, pausing cleanup for 10 seconds"); - tokio::time::sleep(Duration::from_secs(10)).await; + let result = tokio::task::spawn_blocking(move || { + // Run both phases independently. + // Phase 1 (range tombstones) writes, so it errors on a full disk; + // Phase 2 (file unlink) writes nothing and is the only path that frees space + // when the disk is full, so it must run even if Phase 1 just failed. + let purged = db.cleanup(); + let reclaimed = db.reclaim_disk_space(reclaim_grace); + (purged, reclaimed) + }) + .await; + + let failed = match result { + Ok((purged, reclaimed)) => { + if let Err(err) = &purged { + error!(error =? err, "cleanup phase 1 (logical purge) failed"); + } + if let Err(err) = &reclaimed { + error!(error =? err, "cleanup phase 2 (disk reclaim) failed"); } + if let (Ok(purged), Ok(reclaimed)) = (&purged, &reclaimed) { + if *purged > 0 || *reclaimed > 0 { + debug!("cleanup: purged {purged} tables, reclaimed {reclaimed} files"); + } + } + purged.is_err() || reclaimed.is_err() } - Ok(Err(err)) => error!(error =? err, "database cleanup task failed"), - Err(_) => error!("database cleanup task panicked") - } + Err(_) => { + error!("database cleanup task panicked"); + true + } + }; + + tokio::time::sleep(if failed { + CLEANUP_ERROR_BACKOFF + } else { + CLEANUP_INTERVAL + }) + .await; } } diff --git a/crates/storage/src/db/db.rs b/crates/storage/src/db/db.rs index 34e0d71c..af3daa02 100644 --- a/crates/storage/src/db/db.rs +++ b/crates/storage/src/db/db.rs @@ -12,7 +12,7 @@ use super::{ use crate::db::{ ops::{perform_dataset_compaction, CompactionStatus}, read::datasets::list_all_datasets, - write::{ops::deleted_deleted_tables, table_builder::TableBuilder, tx::Tx}, + write::{ops as cleanup_ops, table_builder::TableBuilder, tx::Tx}, Chunk, DatasetUpdate }; @@ -36,7 +36,8 @@ pub struct DatabaseSettings { direct_io: bool, cache_index_and_filter_blocks: bool, max_log_file_size: usize, - keep_log_file_num: usize + keep_log_file_num: usize, + auto_compactions: bool } impl Default for DatabaseSettings { @@ -48,7 +49,8 @@ impl Default for DatabaseSettings { direct_io: false, cache_index_and_filter_blocks: false, max_log_file_size: 10, - keep_log_file_num: 10 + keep_log_file_num: 10, + auto_compactions: true } } } @@ -91,6 +93,14 @@ impl DatabaseSettings { self } + /// Enable/disable RocksDB background auto-compaction of the table data. + /// Defaults to `true`; mainly for tests that need deterministic control over + /// when compaction runs (manual `compact_tables`/reclaim still work). + pub fn with_auto_compactions(mut self, yes: bool) -> Self { + self.auto_compactions = yes; + self + } + fn db_options(&self) -> RocksOptions { let mut options = RocksOptions::default(); options.create_if_missing(true); @@ -141,6 +151,16 @@ impl DatabaseSettings { let mut options = RocksOptions::default(); options.set_block_based_table_factory(&block_based_table_factory); options.set_compression_type(rocksdb::DBCompressionType::Lz4); + // Help compaction find tombstone-heavy SSTs (table deletes leave range + // tombstones), and bound staleness so no dead file lingers uncompacted. + // A lone range tombstone over a whole table has low deletion *density*, + // so periodic compaction is the real backstop here; the collector mostly + // catches denser boundary files. Thresholds are provisional. + options.add_compact_on_deletion_collector_factory(128 * 1024, 64 * 1024, 0.5); + options.set_periodic_compaction_seconds(24 * 60 * 60); + if !self.auto_compactions { + options.set_disable_auto_compactions(true); + } options } @@ -287,8 +307,49 @@ impl Database { Ok(()) } + /// Phase 1 -- logically purge deleted tables (snapshot-safe range tombstones). + /// Returns the number of tables logically deleted by this call. pub fn cleanup(&self) -> anyhow::Result { - deleted_deleted_tables(&self.db) + cleanup_ops::logical_cleanup(&self.db) + } + + /// Phase 2 -- physically reclaim disk space from tables deleted longer than + /// `grace` ago, by unlinking whole SST files below the live watermark. + /// + /// Performs no writes and needs no scratch space, so it works at a full disk. + /// `grace` MUST exceed the maximum in-flight query/snapshot lifetime: file + /// unlinks ignore snapshots, so a shorter grace can drop rows from a running + /// query. Use [`Duration::ZERO`] only where no readers exist (e.g. startup). + /// Returns the number of deleted-table records reclaimed. + pub fn reclaim_disk_space(&self, grace: std::time::Duration) -> anyhow::Result { + cleanup_ops::reclaim_disk_space(&self.db, grace) + } + + /// Crash recovery: purge `DIRTY_TABLES` markers left by builds that died + /// before committing their chunk. Such an orphan's id would otherwise pin + /// the disk-reclaim watermark forever (see [`Database::reclaim_disk_space`]). + /// + /// MUST be called only before any ingest/table build starts (e.g. at + /// startup), since it treats every dirty marker as an orphan. Returns the + /// number of orphan markers purged. + pub fn purge_orphan_dirty_tables(&self) -> anyhow::Result { + cleanup_ops::purge_orphan_dirty_tables(&self.db) + } + + /// Flush the table-data column family's memtable to SST files. Useful before + /// a reclaim/shutdown so freshly written data is on disk as files. + pub fn flush(&self) -> anyhow::Result<()> { + self.db.flush_cf(self.db.cf_handle(CF_TABLES).unwrap())?; + Ok(()) + } + + /// Force a full compaction of the table-data column family. Unlike + /// [`Database::reclaim_disk_space`] this *writes* (needs scratch space, so it + /// is not safe at a full disk); it exists to push data into the bottom level + /// and to rewrite tombstone-heavy boundary files that file-unlink can't reach. + pub fn compact_tables(&self) { + let cf = self.db.cf_handle(CF_TABLES).unwrap(); + self.db.compact_range_cf(cf, None::<&[u8]>, None::<&[u8]>); } pub fn get_statistics(&self) -> Option { diff --git a/crates/storage/src/db/table_id.rs b/crates/storage/src/db/table_id.rs index dfca7357..a4a8b0b4 100644 --- a/crates/storage/src/db/table_id.rs +++ b/crates/storage/src/db/table_id.rs @@ -20,9 +20,15 @@ impl TableId { } pub fn from_slice(bytes: &[u8]) -> Self { - Self { - uuid: Uuid::from_slice(bytes).unwrap() - } + Self::try_from_slice(bytes).expect("TableId::from_slice: not a 16-byte UUID") + } + + /// Decode a key into a `TableId`, or `None` if it is not exactly a 16-byte + /// UUID. Use this (not [`TableId::from_slice`]) when scanning a column family + /// whose keys could in principle be corrupt/short: a malformed key is then + /// skipped instead of panicking and wedging the scan. + pub fn try_from_slice(bytes: &[u8]) -> Option { + Uuid::from_slice(bytes).ok().map(|uuid| Self { uuid }) } } @@ -31,3 +37,21 @@ impl Display for TableId { self.uuid.fmt(f) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn try_from_slice_rejects_non_16_byte_keys() { + // A real 16-byte id round-trips. + let id = TableId::new(); + assert_eq!(TableId::try_from_slice(id.as_ref()), Some(id)); + + // Anything that is not exactly 16 bytes decodes to None instead of + // panicking (so a corrupt/short key can be skipped, not wedge cleanup). + assert_eq!(TableId::try_from_slice(&[]), None); + assert_eq!(TableId::try_from_slice(&[0u8; 15]), None); + assert_eq!(TableId::try_from_slice(&[0u8; 17]), None); + } +} diff --git a/crates/storage/src/db/write/ops.rs b/crates/storage/src/db/write/ops.rs index 3e0ce35e..9e81424f 100644 --- a/crates/storage/src/db/write/ops.rs +++ b/crates/storage/src/db/write/ops.rs @@ -1,56 +1,307 @@ +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + use crate::{ - db::db::{RocksDB, RocksWriteBatch, CF_DELETED_TABLES, CF_DIRTY_TABLES, CF_TABLES}, - kv::KvReadCursor, + db::{ + db::{RocksDB, RocksWriteBatch, CF_CHUNKS, CF_DELETED_TABLES, CF_DIRTY_TABLES, CF_TABLES}, + table_id::TableId, + Chunk + }, table::key::TableKeyFactory }; -pub fn deleted_deleted_tables(db: &RocksDB) -> anyhow::Result { - let mut deleted = 0; - let cf_deleted_tables = db.cf_handle(CF_DELETED_TABLES).unwrap(); - let mut it = db.raw_iterator_cf(cf_deleted_tables); - for_each_key(&mut it, |key| { - deleted += 1; - delete_table(db, key) - })?; - Ok(deleted) +// `CF_DELETED_TABLES` value layout: `[phase: u8][deleted_at_ms: u64 LE]` (9 bytes). +// +// A legacy/empty value (written before grace tracking existed) decodes as +// `(LOGICAL_PENDING, deleted_at = 0)`, i.e. "logical delete still owed, deleted +// long ago" -- it drains on the next cycle, which is safe because a process +// restart leaves no in-flight snapshots from the previous run. +const PHASE_LOGICAL_PENDING: u8 = 0; +const PHASE_RECLAIM_PENDING: u8 = 1; + +fn encode_deleted(phase: u8, deleted_at_ms: u64) -> [u8; 9] { + let mut v = [0u8; 9]; + v[0] = phase; + v[1..].copy_from_slice(&deleted_at_ms.to_le_bytes()); + v +} + +fn decode_deleted(value: &[u8]) -> (u8, u64) { + if value.len() >= 9 { + let ms = u64::from_le_bytes(value[1..9].try_into().unwrap()); + (value[0], ms) + } else { + (PHASE_LOGICAL_PENDING, 0) + } } -fn delete_table(db: &RocksDB, table_id: &[u8]) -> anyhow::Result<()> { - let mut key1 = TableKeyFactory::new(table_id); - let mut key2 = TableKeyFactory::new(table_id); - let start = key1.start(); - let end = key2.end(); +fn now_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0) +} + +/// Value to store when a table is first routed to `CF_DELETED_TABLES` +/// (see [`super::tx::Tx::delete_table`]): logical delete owed, stamped now. +pub(crate) fn encode_pending_deletion() -> [u8; 9] { + encode_deleted(PHASE_LOGICAL_PENDING, now_ms()) +} + +/// Phase 1 -- logical, snapshot-safe purge of deleted tables. +/// +/// For every table still awaiting logical deletion, drop its `CF_TABLES` range +/// with a single range tombstone (instead of millions of point deletes) and +/// flip it to "reclaim pending". Range tombstones respect snapshots, so +/// in-flight queries are unaffected and no grace period is required here. The +/// tombstone frees no space directly -- it lets compaction drop the data and +/// covers what [`reclaim_disk_space`] (file unlink) cannot reach (boundary or +/// above-watermark files). +/// +/// Idempotent: tables already flipped are skipped, so re-running issues no new +/// tombstones, and a crash between the range delete and the flip just re-runs +/// the (no-op) range delete next time. +/// +/// Returns the number of tables logically deleted by this call. +pub(crate) fn logical_cleanup(db: &RocksDB) -> anyhow::Result { + let cf_tables = db.cf_handle(CF_TABLES).unwrap(); + let cf_deleted = db.cf_handle(CF_DELETED_TABLES).unwrap(); + + // Collect first, mutate after: issuing writes while iterating the same CF + // would disturb the cursor. + let mut pending: Vec<(TableId, u64)> = Vec::new(); + { + let mut it = db.raw_iterator_cf(cf_deleted); + it.seek_to_first(); + while it.valid() { + let (phase, deleted_at) = decode_deleted(it.value().unwrap()); + // Skip a malformed key rather than panic on it: one corrupt/short + // row must not wedge cleanup (a panic re-fires every tick forever). + if phase == PHASE_LOGICAL_PENDING { + if let Some(id) = TableId::try_from_slice(it.key().unwrap()) { + pending.push((id, deleted_at)); + } + } + it.next(); + } + it.status()?; + } + + if pending.is_empty() { + return Ok(0); + } + // `delete_range_cf` writes straight to the base DB (the transactional write + // batch has no range delete), so it cannot share `delete_table`'s + // transaction -- it runs here, in the cleanup loop, instead. + let mut batch = RocksWriteBatch::default(); + for (id, deleted_at) in &pending { + let mut start = TableKeyFactory::new(id); + let mut end = TableKeyFactory::new(id); + db.delete_range_cf(cf_tables, start.start(), end.end())?; + batch.put_cf(cf_deleted, id, encode_deleted(PHASE_RECLAIM_PENDING, *deleted_at)); + } + db.write(batch)?; + + Ok(pending.len()) +} + +/// Phase 2 -- physical reclaim by unlinking whole SST files. +/// +/// Frees space without writing (no scratch space, works at a full disk) by +/// dropping every `CF_TABLES` SST file that lies entirely below `bound`, where +/// +/// ```text +/// bound = min( watermark , min{ id in DELETED_TABLES : deleted_at > now - grace } ) +/// ``` +/// +/// `watermark` is the smallest live `TableId`, so everything below it is dead +/// (table ids are time-ordered UUIDv7s that are never reused, so dead tables +/// form a contiguous low key range). The grace term keeps a recently deleted +/// table -- whose data an in-flight query's snapshot may still read -- pinned +/// until `grace` elapses; **`grace` must exceed the maximum query/snapshot +/// lifetime**, otherwise a file unlink (which ignores snapshots) can silently +/// drop rows from a running query. +/// +/// Boundary files (dead+live mixed) and above-watermark garbage are left to +/// compaction, which the Phase-1 range tombstones let it drop. +/// +/// Returns the number of deleted-table records reclaimed. +pub(crate) fn reclaim_disk_space(db: &RocksDB, grace: Duration) -> anyhow::Result { let cf_tables = db.cf_handle(CF_TABLES).unwrap(); + let cf_deleted = db.cf_handle(CF_DELETED_TABLES).unwrap(); + + let watermark = min_live_table_id(db)?; + let now = now_ms(); + let grace_ms = grace.as_millis() as u64; + + // Smallest id still inside the grace window, plus the id + phase of every + // deleted table so we can drop the bookkeeping below the final bound. + let mut grace_floor: Option = None; + let mut deleted: Vec<(TableId, u8)> = Vec::new(); + { + let mut it = db.raw_iterator_cf(cf_deleted); + it.seek_to_first(); + while it.valid() { + // Skip a malformed key rather than panic (see logical_cleanup). + let Some(id) = TableId::try_from_slice(it.key().unwrap()) else { + it.next(); + continue; + }; + let (phase, deleted_at) = decode_deleted(it.value().unwrap()); + if now.saturating_sub(deleted_at) < grace_ms { + grace_floor = Some(grace_floor.map_or(id, |m| m.min(id))); + } + deleted.push((id, phase)); + it.next(); + } + it.status()?; + } + + // `None` for either term means "no constraint from this side". With no live + // tables and nothing in grace, the bound is unbounded and every dead file + // (including orphans) is unlinked. + let bound = match (watermark, grace_floor) { + (Some(w), Some(g)) => Some(w.min(g)), + (Some(w), None) => Some(w), + (None, Some(g)) => Some(g), + (None, None) => None + }; + + let lo = [0u8; 16]; + let hi: Vec = match bound { + Some(b) => b.as_ref().to_vec(), + // 17 bytes of 0xFF sorts above every `id (16B) ++ suffix` key. + None => vec![0xFFu8; 17] + }; + db.delete_file_in_range_cf(cf_tables, &lo[..], &hi[..])?; + + let mut removed = 0usize; let mut batch = RocksWriteBatch::default(); - let mut cursor = db.raw_iterator_cf(cf_tables); + for (id, phase) in &deleted { + // Only drop bookkeeping once Phase 1 has issued the range tombstone + // (`PHASE_RECLAIM_PENDING`). A still-`PHASE_LOGICAL_PENDING` entry -- + // reachable on the startup path, which reclaims without a preceding + // `cleanup()`, and for legacy/empty values -- has no tombstone yet; + // forgetting it would strand any of its data in boundary or + // above-watermark files with nothing left to drop it. Leave it for the + // next Phase 1 (the file unlink above still frees its bottom-level SSTs). + if *phase == PHASE_RECLAIM_PENDING && bound.is_none_or(|b| *id < b) { + batch.delete_cf(cf_deleted, id); + removed += 1; + } + } + if removed > 0 { + db.write(batch)?; + } - list_keys(&mut cursor, start, end, |key| batch.delete_cf(cf_tables, key))?; + Ok(removed) +} - let cf_dirty_tables = db.cf_handle(CF_DIRTY_TABLES).unwrap(); - batch.delete_cf(cf_dirty_tables, table_id); +/// Crash recovery -- purge orphaned `CF_DIRTY_TABLES` markers. +/// +/// A dirty marker is written when a table build starts and removed when its +/// chunk commits (see [`super::tx::Tx::write_chunk`]), in the same transaction. +/// So any marker still present once no build is running has no owning chunk: it +/// is an orphan left by a build that died before commit. [`min_live_table_id`] +/// otherwise counts every dirty id as a live watermark floor, so a single +/// orphan pins `reclaim_disk_space`'s `bound` forever and disk is never freed. +/// We drop the marker and range-tombstone the orphan's `CF_TABLES` data (which +/// no chunk references) so compaction can reclaim it. +/// +/// MUST run only when no table build is in flight (e.g. at startup before +/// ingest begins): it treats **every** dirty marker as an orphan, so calling it +/// while a build is active would tombstone that build's live data. +/// +/// Returns the number of orphan markers purged. +pub(crate) fn purge_orphan_dirty_tables(db: &RocksDB) -> anyhow::Result { + let cf_tables = db.cf_handle(CF_TABLES).unwrap(); + let cf_dirty = db.cf_handle(CF_DIRTY_TABLES).unwrap(); - let cf_deleted_tables = db.cf_handle(CF_DELETED_TABLES).unwrap(); - batch.delete_cf(cf_deleted_tables, table_id); + // Collect first, mutate after: writing while iterating disturbs the cursor. + let mut orphans: Vec = Vec::new(); + { + let mut it = db.raw_iterator_cf(cf_dirty); + it.seek_to_first(); + while it.valid() { + // Skip a malformed key rather than panic (see logical_cleanup). + if let Some(id) = TableId::try_from_slice(it.key().unwrap()) { + orphans.push(id); + } + it.next(); + } + it.status()?; + } + + if orphans.is_empty() { + return Ok(0); + } + let mut batch = RocksWriteBatch::default(); + for id in &orphans { + let mut start = TableKeyFactory::new(id); + let mut end = TableKeyFactory::new(id); + db.delete_range_cf(cf_tables, start.start(), end.end())?; + batch.delete_cf(cf_dirty, id); + } db.write(batch)?; - Ok(()) + + Ok(orphans.len()) } -fn list_keys(cursor: &mut impl KvReadCursor, from: &[u8], to: &[u8], mut cb: impl FnMut(&[u8])) -> anyhow::Result<()> { - cursor.seek(from)?; - while cursor.is_valid() && cursor.key() < to { - cb(cursor.key()); - cursor.next()?; +/// Smallest `TableId` still referenced by a committed chunk (`CF_CHUNKS`) or +/// pending in `CF_DIRTY_TABLES` (written but not yet attached to a chunk -- +/// treated as live). `None` when no live tables exist. Taken across **all** +/// datasets, which also absorbs UUIDv7 clock skew between them. +fn min_live_table_id(db: &RocksDB) -> anyhow::Result> { + let mut min: Option = None; + + { + let cf_chunks = db.cf_handle(CF_CHUNKS).unwrap(); + let mut it = db.raw_iterator_cf(cf_chunks); + it.seek_to_first(); + while it.valid() { + let chunk: Chunk = borsh::from_slice(it.value().unwrap())?; + for id in chunk.tables().values() { + min = Some(min.map_or(*id, |m| m.min(*id))); + } + it.next(); + } + it.status()?; + } + + { + let cf_dirty = db.cf_handle(CF_DIRTY_TABLES).unwrap(); + let mut it = db.raw_iterator_cf(cf_dirty); + it.seek_to_first(); + while it.valid() { + // Skip a malformed key rather than panic (see logical_cleanup). + if let Some(id) = TableId::try_from_slice(it.key().unwrap()) { + min = Some(min.map_or(id, |m| m.min(id))); + } + it.next(); + } + it.status()?; } - Ok(()) + + Ok(min) } -fn for_each_key(cursor: &mut impl KvReadCursor, mut cb: impl FnMut(&[u8]) -> anyhow::Result<()>) -> anyhow::Result<()> { - cursor.seek_first()?; - while cursor.is_valid() { - cb(cursor.key())?; - cursor.next()?; +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn deleted_value_codec_roundtrips() { + let v = encode_deleted(PHASE_RECLAIM_PENDING, 123_456_789); + assert_eq!(decode_deleted(&v), (PHASE_RECLAIM_PENDING, 123_456_789)); + + let pending = encode_pending_deletion(); + assert_eq!(pending[0], PHASE_LOGICAL_PENDING); + } + + #[test] + fn legacy_empty_value_decodes_as_logical_pending() { + // Entries written before grace tracking carried an empty value. + assert_eq!(decode_deleted(&[]), (PHASE_LOGICAL_PENDING, 0)); } - Ok(()) } diff --git a/crates/storage/src/db/write/tx.rs b/crates/storage/src/db/write/tx.rs index b7ba820f..67ab9b49 100644 --- a/crates/storage/src/db/write/tx.rs +++ b/crates/storage/src/db/write/tx.rs @@ -126,8 +126,13 @@ impl<'a> Tx<'a> { } pub fn delete_table(&self, table_id: &TableId) -> anyhow::Result<()> { - self.transaction - .put_cf(self.cf_handle(CF_DELETED_TABLES), table_id, [])?; + // Stamp the deletion time so the cleanup loop can hold a grace period + // before physically unlinking the table's files (see `ops::reclaim_disk_space`). + self.transaction.put_cf( + self.cf_handle(CF_DELETED_TABLES), + table_id, + super::ops::encode_pending_deletion() + )?; Ok(()) } diff --git a/crates/storage/tests/cleanup_reclaim.rs b/crates/storage/tests/cleanup_reclaim.rs new file mode 100644 index 00000000..a0794f14 --- /dev/null +++ b/crates/storage/tests/cleanup_reclaim.rs @@ -0,0 +1,284 @@ +//! Tests for the two-phase table cleanup: +//! * Phase 1 ([`Database::cleanup`]) -- logical, snapshot-safe range tombstones. +//! * Phase 2 ([`Database::reclaim_disk_space`]) -- physical SST-file unlink, gated by a grace period. +//! +//! Everything runs against [`MockDB`] (see `mock_db`), a scratch temp-dir +//! database that hides the shared setup so each test reads as a sequence of +//! domain steps (commit / delete / cleanup / reclaim / snapshot) and asserts on +//! observable effects (rows read, files freed). The grace gate is exercised +//! deterministically by varying the `grace` argument (huge = "max query lifetime +//! not yet elapsed", zero = "elapsed") instead of sleeping. + +use std::time::Duration; + +mod mock_db; +mod utils; + +use mock_db::{MockDB, Table}; + +/// Phase 1 must be invisible to readers that already hold a snapshot: a query +/// in flight when its chunk is deleted keeps reading every row (RocksDB MVCC -- +/// range tombstones respect older snapshots), while new snapshots see nothing. +#[test] +fn logical_delete_is_snapshot_safe() { + let mut db = MockDB::new(); + let t = db.commit_table(50); + + // Snapshot taken BEFORE the deletion -- models an in-flight query. + let reader = db.snapshot(); + + db.delete(&t); + assert_eq!(db.cleanup(), 1, "one table logically deleted"); + + // A fresh snapshot no longer sees the chunk... + assert!(!db.has_visible_chunk()); + // ...but the pre-deletion snapshot still reads every row. + assert_eq!(db.read(&reader, &t), t.rows); + + // Running Phase 1 again is a no-op (already logically deleted). + assert_eq!(db.cleanup(), 0); +} + +/// Phase 2 unlinks dead SST files only once the grace period has elapsed. +#[test] +fn reclaim_unlinks_dead_sst_files() { + let mut db = MockDB::new(); + let tables: Vec = (0..3).map(|_| db.commit_table(1000)).collect(); + db.compact_to_bottom(); + let before = db.sst_size(); + assert!(before > 0, "expected flushed SST data"); + + for t in &tables { + db.delete(t); + } + assert_eq!(db.cleanup(), 3); + // Flush the Phase-1 range tombstones out of the memtable so the unlink can + // drop the dead bottom-level files. Auto-compaction is off, so the flushed + // tombstone SST won't trigger a background compaction that races the + // assertions -- the only thing freeing space below is the unlink. + db.flush(); + + // Grace not elapsed -> files survive, nothing reclaimed. + assert_eq!(db.reclaim(Duration::from_secs(3600)), 0); + assert!(db.sst_size() > 0, "files must survive within grace"); + + // Grace elapsed -> every dead file unlinked. + assert_eq!(db.reclaim(Duration::ZERO), 3); + let after = db.sst_size(); + assert!(after * 4 < before, "expected physical reclaim: before={before} after={after}"); +} + +/// The watermark is the min live `TableId` over ALL datasets, so a single old +/// *live* table pins it low and dead tables with larger ids are NOT +/// file-reclaimable until that live table is gone. This both verifies live data +/// is never unlinked and documents the known limitation (heterogeneous retention). +#[test] +fn live_table_pins_reclaim_watermark() { + let mut db = MockDB::new(); + // Creation order == id order, so `older` is the smaller id (the + // watermark-pinning live table) and `newer` is deleted first. + let older = db.commit_table(1000); + let newer = db.commit_table(1000); + db.flush(); + + // Delete the NEWER table; the older one stays live and pins the watermark. + db.delete(&newer); + db.cleanup(); + db.flush(); + + // Dead table sits above the watermark -> not reclaimable even past grace... + assert_eq!(db.reclaim(Duration::ZERO), 0); + // ...and the live (older) chunk is untouched and fully readable. + assert_eq!(db.read(&db.snapshot(), &older), older.rows); + + // Remove the older table too -> watermark lifts -> both get reclaimed. + db.delete(&older); + db.cleanup(); + db.flush(); + assert_eq!(db.reclaim(Duration::ZERO), 2); +} + +/// Both phases are idempotent (crash-safety relies on it): re-running them after +/// completion does no work and does not error. +#[test] +fn cleanup_and_reclaim_are_idempotent() { + let mut db = MockDB::new(); + let t = db.commit_table(1000); + db.delete(&t); + + assert_eq!(db.cleanup(), 1); + assert_eq!(db.cleanup(), 0); + assert_eq!(db.reclaim(Duration::ZERO), 1); + assert_eq!(db.reclaim(Duration::ZERO), 0); + assert_eq!(db.cleanup(), 0); +} + +/// End-to-end: `delete_dataset` runs Phase 1 synchronously (now cheap), and a +/// subsequent zero-grace reclaim (the startup "reclaim before ingest" path, +/// where no readers exist) frees the disk. +#[test] +fn delete_dataset_then_reclaim_frees_space() { + let mut db = MockDB::new(); + let _t0 = db.commit_table(1000); + let _t1 = db.commit_table(1000); + db.compact_to_bottom(); + let before = db.sst_size(); + assert!(before > 0); + + db.delete_dataset(); + assert!(db.has_no_datasets()); + db.flush(); // flush Phase-1 tombstones so the unlink can drop the files + + assert_eq!(db.reclaim(Duration::ZERO), 2); + let after = db.sst_size(); + assert!(after * 4 < before, "expected reclaim: before={before} after={after}"); +} + +/// Phase 2's core safety invariant: a snapshot taken BEFORE deletion keeps +/// reading every row while the table is within grace (its files must survive), +/// and the physical unlink happens only once grace has elapsed. This is the +/// guarantee that makes the snapshot-ignoring file unlink safe. +#[test] +fn reclaim_within_grace_preserves_pre_deletion_reader() { + let mut db = MockDB::new(); + let t = db.commit_table(3000); + db.compact_to_bottom(); + let before = db.sst_size(); + assert!(before > 0, "expected flushed SST data"); + + // In-flight query: snapshot taken BEFORE the deletion. + let reader = db.snapshot(); + + db.delete(&t); + assert_eq!(db.cleanup(), 1, "Phase 1 tombstone issued"); + db.flush(); + + // Within grace: nothing is unlinked, the files survive, and the pre-deletion + // reader still sees every row -- the safety guarantee Phase 2 rests on. + assert_eq!(db.reclaim(Duration::from_secs(3600)), 0); + assert!(db.sst_size() > 0, "files must survive within grace"); + assert_eq!(db.read(&reader, &t), t.rows, "reader must still read all rows within grace"); + + // Grace elapsed: now (and only now) the files are physically unlinked. + assert_eq!(db.reclaim(Duration::ZERO), 1); + let after = db.sst_size(); + assert!( + after * 4 < before, + "expected physical reclaim once grace elapsed: before={before} after={after}" + ); +} + +/// The flip side of the invariant above, documenting the data loss the grace +/// gate exists to prevent (findings #5/#7): if grace is shorter than a live +/// snapshot's lifetime, a past-grace reclaim unlinks the files out from under a +/// pre-deletion reader and the read breaks. This is the UNSAFE path -- it must +/// fail loudly, not silently return wrong rows. Cache disabled so the data is +/// genuinely gone after the unlink, not served from a warm block cache. +#[test] +fn reclaim_past_grace_breaks_a_live_pre_deletion_reader() { + let mut db = MockDB::uncached(); + let t = db.commit_table(3000); + db.compact_to_bottom(); + + // In-flight query: snapshot taken BEFORE the deletion. + let reader = db.snapshot(); + + db.delete(&t); + assert_eq!(db.cleanup(), 1); + db.flush(); + + // Baseline: the reader still reads every row while its files are present. + assert_eq!(db.read(&reader, &t), t.rows); + + // Reclaim with ZERO grace while that snapshot is STILL live -- the + // misconfiguration "grace < max snapshot lifetime". The file unlink ignores + // snapshots, so the reader's files vanish from under it. + assert_eq!(db.reclaim(Duration::ZERO), 1, "files unlinked despite the live snapshot"); + + // Data loss: the same reader can no longer read the table. The grace gate + // (see `reclaim_within_grace_preserves_pre_deletion_reader`) is exactly what + // keeps a correctly configured grace from ever reaching this state. + assert!( + db.try_read(&reader, &t).is_err(), + "reading a table whose files were unlinked under a live snapshot must fail, not return wrong rows" + ); +} + +/// An orphaned `DIRTY_TABLES` marker -- left when a build's chunk is never +/// committed (a crash/abandon before `write_chunk` removes it) -- is counted as +/// a live table by the watermark, so it pins disk reclaim for every later table +/// forever. Startup recovery (`purge_orphan_dirty_tables`) must drop it so the +/// watermark lifts and dead tables become reclaimable again. +#[test] +fn orphan_dirty_marker_unpinned_by_purge() { + let mut db = MockDB::new(); + // Orphan created FIRST -> smaller id -> pins the watermark low. Its chunk is + // never committed, so nothing ever removes its dirty marker. + let orphan = db.orphan_table(1000); + let live = db.commit_table(1000); + assert!(orphan.id < live.id, "orphan must be the smaller id to pin the watermark"); + + db.compact_to_bottom(); + let before = db.sst_size(); + assert!(before > 0); + + // Delete the live table and tombstone it (Phase 1). + db.delete(&live); + assert_eq!(db.cleanup(), 1); + db.flush(); + + // Bug witness: the orphan marker pins the watermark at the orphan's id, so + // the dead (higher-id) table cannot be unlinked even past grace. + assert_eq!( + db.reclaim(Duration::ZERO), + 0, + "orphan dirty marker pins the watermark, blocking reclaim of the dead table" + ); + assert!(db.sst_size() > 0, "dead table's files are still pinned"); + + // Startup recovery removes the orphan marker (and tombstones its data). + assert_eq!(db.purge_orphans(), 1, "one orphan marker purged"); + + // Watermark lifts -> the dead table is now reclaimable. + assert_eq!( + db.reclaim(Duration::ZERO), + 1, + "dead table reclaimable once the orphan no longer pins the watermark" + ); + let after = db.sst_size(); + assert!(after * 4 < before, "expected physical reclaim: before={before} after={after}"); + + // Purge is idempotent: no markers remain. + assert_eq!(db.purge_orphans(), 0); +} + +/// `reclaim_disk_space` must not drop bookkeeping for a table that is still only +/// logically pending (Phase 1 never ran, so no range tombstone was issued). The +/// startup reclaim path runs without a preceding `cleanup()`; forgetting the +/// record there would strand the table's data with no tombstone and no tracking. +/// The record must survive so a later Phase 1 can tombstone it. +#[test] +fn reclaim_preserves_untombstoned_bookkeeping() { + let mut db = MockDB::new(); + let t = db.commit_table(2000); + db.compact_to_bottom(); + + // Logical-delete but DO NOT run Phase 1 -- mirrors the startup reclaim path + // (reclaim without a preceding cleanup). The entry stays LOGICAL_PENDING. + db.delete(&t); + + // Reclaim reports nothing reclaimed and must NOT consume the record. + assert_eq!( + db.reclaim(Duration::ZERO), + 0, + "must not reclaim/forget a table whose Phase-1 tombstone was never issued" + ); + + // Proof the record survived: Phase 1 still finds and tombstones it. + assert_eq!(db.cleanup(), 1, "the pending deletion survived and is recoverable by Phase 1"); + + // And now, properly tombstoned, it reclaims and is idempotent. + db.flush(); + assert_eq!(db.reclaim(Duration::ZERO), 1); + assert_eq!(db.reclaim(Duration::ZERO), 0); +} diff --git a/crates/storage/tests/mock_db/mod.rs b/crates/storage/tests/mock_db/mod.rs new file mode 100644 index 00000000..6a194b79 --- /dev/null +++ b/crates/storage/tests/mock_db/mod.rs @@ -0,0 +1,211 @@ +//! [`MockDB`] -- a scratch temp-dir `Database` for driving the two-phase +//! table cleanup lifecycle in tests. +//! +//! It encapsulates the shared, fiddly setup (auto-compaction off, small write +//! buffers, block-range bookkeeping, UUIDv7 id ordering, flush/compact into the +//! bottom level) so a test reads as a sequence of domain steps -- commit / +//! delete / cleanup / reclaim / snapshot / purge -- and asserts on observable +//! effects (rows read, files freed). +//! +//! The grace gate is exercised deterministically by varying the `grace` +//! argument passed to [`MockDB::reclaim`] (huge vs zero) instead of sleeping. + +use std::{sync::Arc, time::Duration}; + +use arrow::datatypes::{DataType, Schema}; +use sqd_storage::{ + db::{Chunk, Database, DatabaseSettings, DatasetId, DatasetKind, ReadSnapshot, TableId}, + table::write::{use_small_buffers, RestoreBufferSizesGuard} +}; +use tempfile::TempDir; + +use crate::utils::{make_irregular_block, make_schema, read_chunk}; + +/// Generous upper bound on the total number of rows a single test builds across +/// all its tables (block numbers double as indices into the shared column data). +const CAPACITY: usize = 20_000; + +fn two_u16_columns(n: usize) -> Vec> { + let col: Vec = (0..n).map(|i| (i % 60_000) as u16).collect(); + vec![col.clone(), col] +} + +/// A table [`MockDB`] built. Holds enough to delete it, read it back, and +/// reason about its watermark position (`id`). +pub struct Table { + pub chunk: Chunk, + pub id: TableId, + pub rows: Vec<(u32, u32)> +} + +/// A scratch `Database` wired for deterministic cleanup/reclaim tests: +/// auto-compaction off (only explicit `compact`/unlink move data) and small +/// write buffers (so modest row counts still produce real SST files). +/// +/// Invariant: tables are created with **strictly increasing ids** in call order +/// (a 2 ms gap per build makes the UUIDv7 timestamp advance), so "created first" +/// == "smaller id" == "lower in the reclaim watermark order". Tests rely on this +/// to place an orphan/older table below a live one without inspecting raw ids. +pub struct MockDB { + db: Database, + ds: DatasetId, + schema: Arc, + static_data: Vec>, + next_block: usize, + last_id: Option, + _small_buffers: RestoreBufferSizesGuard, + // Keeps the temp dir alive: flush/reclaim create new WAL/SST files, which + // fails once the directory is cleaned up. + _dir: TempDir +} + +impl MockDB { + pub fn new() -> Self { + Self::open( + DatabaseSettings::default() + .with_rocksdb_stats(true) + .with_auto_compactions(false) + ) + } + + /// Like [`MockDB::new`] but with the block cache disabled, so reads must + /// hit the SST files. Lets a test observe data loss deterministically: once + /// files are unlinked the data is genuinely gone, never served from cache. + pub fn uncached() -> Self { + Self::open( + DatabaseSettings::default() + .with_rocksdb_stats(true) + .with_auto_compactions(false) + .with_data_cache_size(0) + .with_chunk_cache_size(0) + ) + } + + fn open(settings: DatabaseSettings) -> Self { + let dir = tempfile::tempdir().unwrap(); + let db = settings.open(dir.path()).unwrap(); + let ds = DatasetId::from_str("solana"); + db.create_dataset(ds, DatasetKind::from_str("solana")).unwrap(); + Self { + db, + ds, + schema: make_schema(DataType::UInt32, DataType::UInt32, false), + static_data: two_u16_columns(CAPACITY), + next_block: 0, + last_id: None, + _small_buffers: use_small_buffers(), + _dir: dir + } + } + + /// Build (and `finish`) a table of `rows` rows over the next free block + /// range. `finish` already persists the dirty marker and the table data; + /// the caller decides whether to commit the chunk. + fn build_table(&mut self, rows: usize) -> (Chunk, Vec<(u32, u32)>, TableId) { + // Guarantee a strictly increasing UUIDv7 vs the previous table (see the + // struct invariant). This is id-ordering only -- unrelated to the grace + // gate, which the tests drive via the `grace` argument, never by waiting. + std::thread::sleep(Duration::from_millis(2)); + + let start = self.next_block; + let end = start + rows; + assert!(end <= self.static_data[0].len(), "harness data capacity exceeded"); + self.next_block = end; + + let (chunk, rows_data) = make_irregular_block(&self.static_data, start, end, Arc::clone(&self.schema), &self.db); + let id = chunk.tables().get("block").copied().unwrap(); + if let Some(last) = self.last_id { + assert!(id > last, "harness invariant: ids must increase with creation order"); + } + self.last_id = Some(id); + (chunk, rows_data, id) + } + + /// Build a committed table: its chunk is inserted, so `write_chunk` removes + /// its dirty marker. + pub fn commit_table(&mut self, rows: usize) -> Table { + let (chunk, rows, id) = self.build_table(rows); + self.db.insert_chunk(self.ds, &chunk).unwrap(); + Table { chunk, id, rows } + } + + /// Build a table but never commit its chunk -- `build_table`'s `finish` has + /// already persisted the dirty marker and data, so nothing ever removes the + /// marker. Exactly the orphan a crash-before-commit leaves behind. + pub fn orphan_table(&mut self, rows: usize) -> Table { + let (chunk, rows, id) = self.build_table(rows); + Table { chunk, id, rows } + } + + pub fn snapshot(&self) -> ReadSnapshot<'_> { + self.db.snapshot() + } + + pub fn read(&self, snapshot: &ReadSnapshot, table: &Table) -> Vec<(u32, u32)> { + read_chunk(snapshot, table.chunk.clone()) + } + + /// Like [`MockDB::read`] but propagates errors instead of unwrapping -- + /// used to show that an unlink under a live snapshot makes the data + /// unreadable rather than silently wrong. + pub fn try_read(&self, snapshot: &ReadSnapshot, table: &Table) -> anyhow::Result<()> { + let chunk_reader = snapshot.create_chunk_reader(table.chunk.clone()); + let table_reader = chunk_reader.get_table_reader("block")?; + table_reader.read_column(0, None)?; + table_reader.read_column(1, None)?; + Ok(()) + } + + /// Whether a fresh snapshot still sees any committed chunk in the dataset. + pub fn has_visible_chunk(&self) -> bool { + self.db.snapshot().get_first_chunk(self.ds).unwrap().is_some() + } + + pub fn delete(&self, table: &Table) { + self.db.update_dataset(self.ds, |tx| tx.delete_chunk(&table.chunk)).unwrap(); + } + + /// Delete the whole dataset (runs Phase 1 synchronously), as production's + /// `delete_dataset` does. + pub fn delete_dataset(&self) { + self.db.delete_dataset(self.ds).unwrap(); + } + + /// Whether the database has no datasets left. + pub fn has_no_datasets(&self) -> bool { + self.db.get_all_datasets().unwrap().is_empty() + } + + pub fn cleanup(&self) -> usize { + self.db.cleanup().unwrap() + } + + pub fn reclaim(&self, grace: Duration) -> usize { + self.db.reclaim_disk_space(grace).unwrap() + } + + pub fn purge_orphans(&self) -> usize { + self.db.purge_orphan_dirty_tables().unwrap() + } + + pub fn flush(&self) { + self.db.flush().unwrap(); + } + + /// Land the table data in the bottom level, so a later reclaim is a real + /// file unlink (`DeleteFilesInRange` skips L0): flush the memtable, then + /// compact. In production the dead data being reclaimed lives there too. + pub fn compact_to_bottom(&self) { + self.db.flush().unwrap(); + self.db.compact_tables(); + } + + /// Total size of all SST files in the table-data column family, in bytes. + pub fn sst_size(&self) -> u64 { + self.db + .get_property("TABLES", "rocksdb.total-sst-files-size") + .unwrap() + .and_then(|s| s.parse().ok()) + .unwrap_or(0) + } +}