From 6424ad2013df8526cb23c80d0f982cb1a38b4dfb Mon Sep 17 00:00:00 2001 From: Dzmitry Kalabuk Date: Thu, 25 Jun 2026 09:54:22 +0300 Subject: [PATCH 1/2] Export hotblocks storage size metrics Run a background task to roughly estimate how much disk space each dataset is using. Expose it with the metrics. These numbers may be used to quickly understand which datasets started growing, but not as precise disk usage profiler. Also move all the DB reads out of the scraping path to avoid locking the DB on calls to /metrics. --- crates/hotblocks/src/cli.rs | 14 +- .../dataset_controller/dataset_controller.rs | 73 +++++++- crates/hotblocks/src/metrics.rs | 157 ++++++++++++------ crates/storage/src/db/db.rs | 20 +++ crates/storage/src/db/read/snapshot.rs | 28 +++- 5 files changed, 229 insertions(+), 63 deletions(-) diff --git a/crates/hotblocks/src/cli.rs b/crates/hotblocks/src/cli.rs index 1e14ff72..a1308e47 100644 --- a/crates/hotblocks/src/cli.rs +++ b/crates/hotblocks/src/cli.rs @@ -10,7 +10,7 @@ use sqd_storage::db::{DatabaseSettings, DatasetId}; use crate::{ data_service::{DataService, DataServiceRef}, dataset_config::{DatasetConfig, RetentionConfig}, - metrics::DatasetMetricsCollector, + metrics::{DatasetMetricsCollector, StorageMetricsCollector}, query::{QueryService, QueryServiceRef}, types::DBRef }; @@ -91,10 +91,8 @@ impl CLI { .context("failed to open rocksdb database")?; let mut metrics_registry = crate::metrics::build_metrics_registry(); - metrics_registry.register_collector(Box::new(DatasetMetricsCollector { - db: db.clone(), - datasets: datasets.keys().copied().collect() - })); + + let dataset_ids: Vec = datasets.keys().copied().collect(); let api_controlled_datasets = datasets .iter() @@ -103,6 +101,12 @@ impl CLI { let data_service = DataService::start(db.clone(), datasets).await.map(Arc::new)?; + metrics_registry.register_collector(Box::new(DatasetMetricsCollector { + data_service: data_service.clone(), + datasets: dataset_ids + })); + metrics_registry.register_collector(Box::new(StorageMetricsCollector { db: db.clone() })); + let query_service = { let mut builder = QueryService::builder(db.clone()); builder.set_max_data_waiters(self.query_max_data_waiters); diff --git a/crates/hotblocks/src/dataset_controller/dataset_controller.rs b/crates/hotblocks/src/dataset_controller/dataset_controller.rs index 8568b79a..17f2697f 100644 --- a/crates/hotblocks/src/dataset_controller/dataset_controller.rs +++ b/crates/hotblocks/src/dataset_controller/dataset_controller.rs @@ -4,7 +4,7 @@ use anyhow::{Context, anyhow}; use futures::{FutureExt, StreamExt, future::BoxFuture, stream::FuturesUnordered}; use sqd_data_client::reqwest::ReqwestDataClient; use sqd_primitives::{BlockNumber, BlockRef}; -use sqd_storage::db::{Chunk, CompactionStatus, DatasetId}; +use sqd_storage::db::{Chunk, CompactionStatus, Database, DatasetId}; use tokio::{select, task::JoinHandle, time::Instant}; use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; @@ -24,17 +24,29 @@ pub struct DatasetController { head_receiver: tokio::sync::watch::Receiver>, finalized_head_receiver: tokio::sync::watch::Receiver>, compaction_enabled_sender: tokio::sync::watch::Sender, + stats_receiver: tokio::sync::watch::Receiver, task: JoinHandle<()>, - compaction_task: JoinHandle<()> + compaction_task: JoinHandle<()>, + stats_task: JoinHandle<()> } impl Drop for DatasetController { fn drop(&mut self) { self.task.abort(); self.compaction_task.abort(); + self.stats_task.abort(); } } +/// Dataset metrics refreshed by [`dataset_stats_loop`] every ~60s. +#[derive(Clone, Debug, Default)] +pub struct DatasetStats { + pub first_block: Option, + pub last_block_time: Option, + /// `None` until computed once, distinguishing a fresh process from an empty dataset. + pub size_bytes: Option +} + impl DatasetController { #[instrument(name = "dataset", skip_all, fields(dataset_id = %dataset_id))] pub fn new( @@ -54,6 +66,7 @@ impl DatasetController { let (head_sender, head_receiver) = tokio::sync::watch::channel(None); let (finalized_head_sender, finalized_head_receiver) = tokio::sync::watch::channel(None); let (compaction_enabled_sender, compaction_enabled_receiver) = tokio::sync::watch::channel(false); + let (stats_sender, stats_receiver) = tokio::sync::watch::channel(DatasetStats::default()); let _ = head_sender.send(write.head().cloned()); let _ = finalized_head_sender.send(write.finalized_head().cloned()); @@ -70,6 +83,9 @@ impl DatasetController { let task = tokio::spawn(ctl.run(write).in_current_span()); + let stats_task = + tokio::spawn(dataset_stats_loop(db.clone(), dataset_id, stats_sender).in_current_span()); + let compaction_task = tokio::spawn(compaction_loop(db, dataset_id, compaction_enabled_receiver).in_current_span()); @@ -80,8 +96,10 @@ impl DatasetController { head_receiver, finalized_head_receiver, compaction_enabled_sender, + stats_receiver, task, - compaction_task + compaction_task, + stats_task }) } @@ -117,6 +135,10 @@ impl DatasetController { self.retention_sender.borrow().clone() } + pub fn get_stats(&self) -> DatasetStats { + self.stats_receiver.borrow().clone() + } + pub async fn wait_for_block(&self, block_number: BlockNumber) -> BlockNumber { let mut recv = self.head_receiver.clone(); loop { @@ -591,6 +613,51 @@ async fn fetch_chain_top(clients: Vec) -> BlockNumber { } } +#[instrument(name = "dataset_stats", skip_all)] +async fn dataset_stats_loop( + db: DBRef, + dataset_id: DatasetId, + sender: tokio::sync::watch::Sender +) { + const REFRESH: Duration = Duration::from_secs(60); + + // Delay the first run by a per-dataset offset so the loops don't hit RocksDB together. + let offset = dataset_id.as_ref().iter().fold(0u64, |acc, &b| acc.wrapping_add(b as u64)) % REFRESH.as_secs(); + tokio::time::sleep(Duration::from_secs(offset)).await; + + loop { + let db = db.clone(); + let span = tracing::Span::current(); + let result = tokio::task::spawn_blocking(move || { + let _s = span.enter(); + compute_dataset_stats(&db, dataset_id) + }) + .await; + + match result { + Ok(Ok(stats)) => { + let _ = sender.send(stats); + } + Ok(Err(err)) => error!(reason =? err, "failed to estimate dataset stats"), + Err(err) => error!(reason =? err, "dataset stats task panicked") + } + + tokio::time::sleep(REFRESH).await; + } +} + +fn compute_dataset_stats(db: &Database, dataset_id: DatasetId) -> anyhow::Result { + let snapshot = db.snapshot(); + let first_block = snapshot.get_first_chunk(dataset_id)?.map(|c| c.first_block()); + let last_block_time = snapshot.get_last_chunk(dataset_id)?.and_then(|c| c.last_block_time()); + let size_bytes = snapshot.estimate_dataset_size(dataset_id)?; + Ok(DatasetStats { + first_block, + last_block_time, + size_bytes: Some(size_bytes) + }) +} + #[instrument(name = "compaction", skip_all)] async fn compaction_loop(db: DBRef, dataset_id: DatasetId, mut enabled: tokio::sync::watch::Receiver) { let mut skips = 0; diff --git a/crates/hotblocks/src/metrics.rs b/crates/hotblocks/src/metrics.rs index d5e08c0b..eceade67 100644 --- a/crates/hotblocks/src/metrics.rs +++ b/crates/hotblocks/src/metrics.rs @@ -1,6 +1,5 @@ use std::{fmt::Write, sync::LazyLock, time::Duration}; -use anyhow::bail; use prometheus_client::{ collector::Collector, encoding::{DescriptorEncoder, EncodeLabelSet, EncodeLabelValue, LabelValueEncoder}, @@ -12,10 +11,14 @@ use prometheus_client::{ }, registry::Registry }; -use sqd_storage::db::{DatasetId, ReadSnapshot}; -use tracing::error; +use sqd_storage::db::DatasetId; -use crate::{query::QueryExecutorCollector, types::DBRef}; +use crate::{ + data_service::DataServiceRef, + dataset_controller::DatasetController, + query::QueryExecutorCollector, + types::DBRef +}; #[derive(Copy, Clone, Hash, Debug, Default, Ord, PartialOrd, Eq, PartialEq, EncodeLabelSet)] struct DatasetLabel { @@ -87,31 +90,27 @@ pub fn report_http_response(labels: &Vec<(&'static str, String)>, to_first_byte: HTTP_TTFB.get_or_create(&labels).observe(to_first_byte.as_secs_f64()); } -#[derive(Debug)] pub struct DatasetMetricsCollector { - pub db: DBRef, + pub data_service: DataServiceRef, pub datasets: Vec } +impl std::fmt::Debug for DatasetMetricsCollector { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DatasetMetricsCollector") + .field("datasets", &self.datasets) + .finish_non_exhaustive() + } +} + impl Collector for DatasetMetricsCollector { fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { - let db = self.db.snapshot(); - for dataset_id in self.datasets.iter().copied() { - if let Err(err) = collect_dataset_metrics(&mut encoder, &db, dataset_id) { - return if err.is::() { - Err(err.downcast().unwrap()) - } else { - // subsequent metric collection most likely will fail as well, - // hence let's terminate metric collection entirely - error!( - err =? err, - "failed to collect metrics for dataset {}", - dataset_id - ); - Ok(()) - }; - } + // Read from in-memory controller state only - no per-scrape DB access. + let Ok(controller) = self.data_service.get_dataset(dataset_id) else { + continue; + }; + collect_dataset_metrics(&mut encoder, dataset_id, &controller)?; } Ok(()) @@ -120,40 +119,44 @@ impl Collector for DatasetMetricsCollector { fn collect_dataset_metrics( encoder: &mut DescriptorEncoder, - db: &ReadSnapshot, - dataset_id: DatasetId -) -> anyhow::Result<()> { - let Some(label) = db.get_label(dataset_id)? else { + dataset_id: DatasetId, + controller: &DatasetController +) -> Result<(), std::fmt::Error> { + let last_block = controller.get_head_block_number(); + let stats = controller.get_stats(); + + // Nothing ingested yet - emit no series for this dataset. + if last_block.is_none() && stats.first_block.is_none() { return Ok(()); - }; - - let Some(first_chunk) = db.get_first_chunk(dataset_id)? else { - return Ok(()); - }; - - let Some(last_chunk) = db.get_last_chunk(dataset_id)? else { - bail!("first chunk exists, while last does not") - }; + } - encoder - .encode_descriptor("hotblocks_first_block", "First block", None, MetricType::Gauge)? - .encode_family(&dataset_label!(dataset_id))? - .encode_gauge(&first_chunk.first_block())?; + if let Some(first_block) = stats.first_block { + encoder + .encode_descriptor("hotblocks_first_block", "First block", None, MetricType::Gauge)? + .encode_family(&dataset_label!(dataset_id))? + .encode_gauge(&first_block)?; + } - encoder - .encode_descriptor("hotblocks_last_block", "Last block", None, MetricType::Gauge)? - .encode_family(&dataset_label!(dataset_id))? - .encode_gauge(&last_chunk.last_block())?; + // `last_block` is the live head; `last_block_time` below is from lagging stats, + // so the two can briefly disagree after a new block arrives. + if let Some(last_block) = last_block { + encoder + .encode_descriptor("hotblocks_last_block", "Last block", None, MetricType::Gauge)? + .encode_family(&dataset_label!(dataset_id))? + .encode_gauge(&last_block)?; + } - encoder - .encode_descriptor( - "hotblocks_last_block_timestamp_ms", - "Timestamp of the last block", - None, - MetricType::Gauge - )? - .encode_family(&dataset_label!(dataset_id))? - .encode_gauge(&last_chunk.last_block_time().unwrap_or(0))?; + if let Some(last_block_time) = stats.last_block_time { + encoder + .encode_descriptor( + "hotblocks_last_block_timestamp_ms", + "Timestamp of the last block", + None, + MetricType::Gauge + )? + .encode_family(&dataset_label!(dataset_id))? + .encode_gauge(&last_block_time)?; + } encoder .encode_descriptor( @@ -163,11 +166,59 @@ fn collect_dataset_metrics( MetricType::Gauge )? .encode_family(&dataset_label!(dataset_id))? - .encode_gauge(&label.finalized_head().map_or(0, |h| h.number))?; + .encode_gauge(&controller.get_finalized_head().map_or(0, |h| h.number))?; + + // Skip until computed once, so a fresh process doesn't report a spurious zero. + if let Some(size_bytes) = stats.size_bytes { + encoder + .encode_descriptor( + "hotblocks_dataset_size_bytes", + "Approximate on-disk size of the dataset's table data", + None, + MetricType::Gauge + )? + .encode_family(&dataset_label!(dataset_id))? + .encode_gauge(&size_bytes)?; + } Ok(()) } +/// On-disk size of each RocksDB column family. Unlike per-dataset sizes, this also +/// covers metadata column families and orphaned table data. Cheap enough to run +/// on every scrape (O(1) property reads). +#[derive(Debug)] +pub struct StorageMetricsCollector { + pub db: DBRef +} + +impl Collector for StorageMetricsCollector { + fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { + let sizes = match self.db.column_family_sizes() { + Ok(sizes) => sizes, + Err(err) => { + tracing::warn!(reason =? err, "failed to read column family sizes"); + return Ok(()); + } + }; + + for (cf, size) in sizes { + let labels: Vec<(&'static str, String)> = vec![("column_family", cf.to_string())]; + encoder + .encode_descriptor( + "hotblocks_column_family_size_bytes", + "Approximate on-disk size (total SST files) of a RocksDB column family", + None, + MetricType::Gauge + )? + .encode_family(&labels)? + .encode_gauge(&size)?; + } + + Ok(()) + } +} + pub fn build_metrics_registry() -> Registry { let mut top_registry = Registry::default(); let registry = top_registry.sub_registry_with_prefix("hotblocks"); diff --git a/crates/storage/src/db/db.rs b/crates/storage/src/db/db.rs index 34e0d71c..18d11800 100644 --- a/crates/storage/src/db/db.rs +++ b/crates/storage/src/db/db.rs @@ -22,6 +22,10 @@ pub(super) const CF_TABLES: Name = "TABLES"; pub(super) const CF_DIRTY_TABLES: Name = "DIRTY_TABLES"; pub(super) const CF_DELETED_TABLES: Name = "DELETED_TABLES"; +/// All column families. Keep in sync with the descriptor list in [`DatabaseSettings::open`]. +pub(super) const ALL_COLUMN_FAMILIES: [Name; 5] = + [CF_DATASETS, CF_CHUNKS, CF_TABLES, CF_DIRTY_TABLES, CF_DELETED_TABLES]; + pub(super) type RocksDB = rocksdb::OptimisticTransactionDB; pub(super) type RocksTransaction<'a> = rocksdb::Transaction<'a, RocksDB>; pub(super) type RocksTransactionOptions = rocksdb::OptimisticTransactionOptions; @@ -295,6 +299,22 @@ impl Database { self.options.get_statistics() } + /// Approximate on-disk size of each column family as `(cf_name, bytes)`. Counts + /// SST files only, excluding WAL and memtables. Constant-time property reads. + pub fn column_family_sizes(&self) -> anyhow::Result> { + ALL_COLUMN_FAMILIES + .into_iter() + .map(|cf| { + let handle = self.db.cf_handle(cf).expect("column family must exist"); + let size = self + .db + .property_int_value_cf(handle, "rocksdb.total-sst-files-size")? + .unwrap_or(0); + Ok((cf, size)) + }) + .collect() + } + pub fn get_property(&self, cf: &str, name: &str) -> anyhow::Result> { let Some(cf_handle) = self.db.cf_handle(cf) else { return Ok(None); diff --git a/crates/storage/src/db/read/snapshot.rs b/crates/storage/src/db/read/snapshot.rs index 31659392..60b7ed1b 100644 --- a/crates/storage/src/db/read/snapshot.rs +++ b/crates/storage/src/db/read/snapshot.rs @@ -2,7 +2,7 @@ use std::{collections::BTreeMap, ops::Deref, sync::Arc}; use anyhow::anyhow; use parking_lot::Mutex; -use rocksdb::{ColumnFamily, ReadOptions}; +use rocksdb::{ColumnFamily, Range, ReadOptions}; use sqd_primitives::{BlockNumber, Name}; use crate::{ @@ -14,7 +14,7 @@ use crate::{ DatasetLabel }, kv::KvRead, - table::read::TableReader + table::{key::TableKeyFactory, read::TableReader} }; pub struct ReadSnapshot<'a> { @@ -75,6 +75,30 @@ impl<'a> ReadSnapshot<'a> { self.list_chunks(dataset_id, 0, None).into_reversed().next().transpose() } + /// Approximate on-disk size (compressed, flushed bytes) of a dataset's table data. + /// + /// `CF_TABLES` is keyed by random `TableId`, so a dataset's tables aren't + /// contiguous: we resolve them through chunks and sum each table's key-range + /// size. An estimate, safe to run against a live database. + pub fn estimate_dataset_size(&self, dataset_id: DatasetId) -> anyhow::Result { + let mut bounds: Vec<(Vec, Vec)> = Vec::new(); + for chunk in self.list_chunks(dataset_id, 0, None) { + let chunk = chunk?; + for table_id in chunk.tables().values() { + // `start()`/`end()` bracket all of one table's keys, using the + // same key layout the writer and reader rely on. + let mut key = TableKeyFactory::new(table_id.as_ref()); + let lo = key.start().to_vec(); + let hi = key.end().to_vec(); + bounds.push((lo, hi)); + } + } + + let ranges: Vec = bounds.iter().map(|(lo, hi)| Range::new(lo, hi)).collect(); + let sizes = self.db.get_approximate_sizes_cf(self.cf_handle(CF_TABLES), &ranges); + Ok(sizes.iter().sum()) + } + fn new_options(&self) -> ReadOptions { let mut options = ReadOptions::default(); options.set_snapshot(&self.snapshot); From 13af54202b7f8e8cfda145e9c4d89f8ff86f8c4b Mon Sep 17 00:00:00 2001 From: Dzmitry Kalabuk Date: Thu, 25 Jun 2026 13:26:46 +0300 Subject: [PATCH 2/2] style: apply cargo +nightly fmt --- .../src/dataset_controller/dataset_controller.rs | 15 +++++++-------- crates/hotblocks/src/metrics.rs | 5 +---- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/crates/hotblocks/src/dataset_controller/dataset_controller.rs b/crates/hotblocks/src/dataset_controller/dataset_controller.rs index 17f2697f..c0ad5400 100644 --- a/crates/hotblocks/src/dataset_controller/dataset_controller.rs +++ b/crates/hotblocks/src/dataset_controller/dataset_controller.rs @@ -83,8 +83,7 @@ impl DatasetController { let task = tokio::spawn(ctl.run(write).in_current_span()); - let stats_task = - tokio::spawn(dataset_stats_loop(db.clone(), dataset_id, stats_sender).in_current_span()); + let stats_task = tokio::spawn(dataset_stats_loop(db.clone(), dataset_id, stats_sender).in_current_span()); let compaction_task = tokio::spawn(compaction_loop(db, dataset_id, compaction_enabled_receiver).in_current_span()); @@ -614,15 +613,15 @@ async fn fetch_chain_top(clients: Vec) -> BlockNumber { } #[instrument(name = "dataset_stats", skip_all)] -async fn dataset_stats_loop( - db: DBRef, - dataset_id: DatasetId, - sender: tokio::sync::watch::Sender -) { +async fn dataset_stats_loop(db: DBRef, dataset_id: DatasetId, sender: tokio::sync::watch::Sender) { const REFRESH: Duration = Duration::from_secs(60); // Delay the first run by a per-dataset offset so the loops don't hit RocksDB together. - let offset = dataset_id.as_ref().iter().fold(0u64, |acc, &b| acc.wrapping_add(b as u64)) % REFRESH.as_secs(); + let offset = dataset_id + .as_ref() + .iter() + .fold(0u64, |acc, &b| acc.wrapping_add(b as u64)) + % REFRESH.as_secs(); tokio::time::sleep(Duration::from_secs(offset)).await; loop { diff --git a/crates/hotblocks/src/metrics.rs b/crates/hotblocks/src/metrics.rs index eceade67..488ddac0 100644 --- a/crates/hotblocks/src/metrics.rs +++ b/crates/hotblocks/src/metrics.rs @@ -14,10 +14,7 @@ use prometheus_client::{ use sqd_storage::db::DatasetId; use crate::{ - data_service::DataServiceRef, - dataset_controller::DatasetController, - query::QueryExecutorCollector, - types::DBRef + data_service::DataServiceRef, dataset_controller::DatasetController, query::QueryExecutorCollector, types::DBRef }; #[derive(Copy, Clone, Hash, Debug, Default, Ord, PartialOrd, Eq, PartialEq, EncodeLabelSet)]