diff --git a/crates/hotblocks/src/cli.rs b/crates/hotblocks/src/cli.rs index 1e14ff72..a1308e47 100644 --- a/crates/hotblocks/src/cli.rs +++ b/crates/hotblocks/src/cli.rs @@ -10,7 +10,7 @@ use sqd_storage::db::{DatabaseSettings, DatasetId}; use crate::{ data_service::{DataService, DataServiceRef}, dataset_config::{DatasetConfig, RetentionConfig}, - metrics::DatasetMetricsCollector, + metrics::{DatasetMetricsCollector, StorageMetricsCollector}, query::{QueryService, QueryServiceRef}, types::DBRef }; @@ -91,10 +91,8 @@ impl CLI { .context("failed to open rocksdb database")?; let mut metrics_registry = crate::metrics::build_metrics_registry(); - metrics_registry.register_collector(Box::new(DatasetMetricsCollector { - db: db.clone(), - datasets: datasets.keys().copied().collect() - })); + + let dataset_ids: Vec = datasets.keys().copied().collect(); let api_controlled_datasets = datasets .iter() @@ -103,6 +101,12 @@ impl CLI { let data_service = DataService::start(db.clone(), datasets).await.map(Arc::new)?; + metrics_registry.register_collector(Box::new(DatasetMetricsCollector { + data_service: data_service.clone(), + datasets: dataset_ids + })); + metrics_registry.register_collector(Box::new(StorageMetricsCollector { db: db.clone() })); + let query_service = { let mut builder = QueryService::builder(db.clone()); builder.set_max_data_waiters(self.query_max_data_waiters); diff --git a/crates/hotblocks/src/dataset_controller/dataset_controller.rs b/crates/hotblocks/src/dataset_controller/dataset_controller.rs index 8568b79a..c0ad5400 100644 --- a/crates/hotblocks/src/dataset_controller/dataset_controller.rs +++ b/crates/hotblocks/src/dataset_controller/dataset_controller.rs @@ -4,7 +4,7 @@ use anyhow::{Context, anyhow}; use futures::{FutureExt, StreamExt, future::BoxFuture, stream::FuturesUnordered}; use sqd_data_client::reqwest::ReqwestDataClient; use sqd_primitives::{BlockNumber, BlockRef}; -use sqd_storage::db::{Chunk, CompactionStatus, DatasetId}; +use sqd_storage::db::{Chunk, CompactionStatus, Database, DatasetId}; use tokio::{select, task::JoinHandle, time::Instant}; use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; @@ -24,17 +24,29 @@ pub struct DatasetController { head_receiver: tokio::sync::watch::Receiver>, finalized_head_receiver: tokio::sync::watch::Receiver>, compaction_enabled_sender: tokio::sync::watch::Sender, + stats_receiver: tokio::sync::watch::Receiver, task: JoinHandle<()>, - compaction_task: JoinHandle<()> + compaction_task: JoinHandle<()>, + stats_task: JoinHandle<()> } impl Drop for DatasetController { fn drop(&mut self) { self.task.abort(); self.compaction_task.abort(); + self.stats_task.abort(); } } +/// Dataset metrics refreshed by [`dataset_stats_loop`] every ~60s. +#[derive(Clone, Debug, Default)] +pub struct DatasetStats { + pub first_block: Option, + pub last_block_time: Option, + /// `None` until computed once, distinguishing a fresh process from an empty dataset. + pub size_bytes: Option +} + impl DatasetController { #[instrument(name = "dataset", skip_all, fields(dataset_id = %dataset_id))] pub fn new( @@ -54,6 +66,7 @@ impl DatasetController { let (head_sender, head_receiver) = tokio::sync::watch::channel(None); let (finalized_head_sender, finalized_head_receiver) = tokio::sync::watch::channel(None); let (compaction_enabled_sender, compaction_enabled_receiver) = tokio::sync::watch::channel(false); + let (stats_sender, stats_receiver) = tokio::sync::watch::channel(DatasetStats::default()); let _ = head_sender.send(write.head().cloned()); let _ = finalized_head_sender.send(write.finalized_head().cloned()); @@ -70,6 +83,8 @@ impl DatasetController { let task = tokio::spawn(ctl.run(write).in_current_span()); + let stats_task = tokio::spawn(dataset_stats_loop(db.clone(), dataset_id, stats_sender).in_current_span()); + let compaction_task = tokio::spawn(compaction_loop(db, dataset_id, compaction_enabled_receiver).in_current_span()); @@ -80,8 +95,10 @@ impl DatasetController { head_receiver, finalized_head_receiver, compaction_enabled_sender, + stats_receiver, task, - compaction_task + compaction_task, + stats_task }) } @@ -117,6 +134,10 @@ impl DatasetController { self.retention_sender.borrow().clone() } + pub fn get_stats(&self) -> DatasetStats { + self.stats_receiver.borrow().clone() + } + pub async fn wait_for_block(&self, block_number: BlockNumber) -> BlockNumber { let mut recv = self.head_receiver.clone(); loop { @@ -591,6 +612,51 @@ async fn fetch_chain_top(clients: Vec) -> BlockNumber { } } +#[instrument(name = "dataset_stats", skip_all)] +async fn dataset_stats_loop(db: DBRef, dataset_id: DatasetId, sender: tokio::sync::watch::Sender) { + const REFRESH: Duration = Duration::from_secs(60); + + // Delay the first run by a per-dataset offset so the loops don't hit RocksDB together. + let offset = dataset_id + .as_ref() + .iter() + .fold(0u64, |acc, &b| acc.wrapping_add(b as u64)) + % REFRESH.as_secs(); + tokio::time::sleep(Duration::from_secs(offset)).await; + + loop { + let db = db.clone(); + let span = tracing::Span::current(); + let result = tokio::task::spawn_blocking(move || { + let _s = span.enter(); + compute_dataset_stats(&db, dataset_id) + }) + .await; + + match result { + Ok(Ok(stats)) => { + let _ = sender.send(stats); + } + Ok(Err(err)) => error!(reason =? err, "failed to estimate dataset stats"), + Err(err) => error!(reason =? err, "dataset stats task panicked") + } + + tokio::time::sleep(REFRESH).await; + } +} + +fn compute_dataset_stats(db: &Database, dataset_id: DatasetId) -> anyhow::Result { + let snapshot = db.snapshot(); + let first_block = snapshot.get_first_chunk(dataset_id)?.map(|c| c.first_block()); + let last_block_time = snapshot.get_last_chunk(dataset_id)?.and_then(|c| c.last_block_time()); + let size_bytes = snapshot.estimate_dataset_size(dataset_id)?; + Ok(DatasetStats { + first_block, + last_block_time, + size_bytes: Some(size_bytes) + }) +} + #[instrument(name = "compaction", skip_all)] async fn compaction_loop(db: DBRef, dataset_id: DatasetId, mut enabled: tokio::sync::watch::Receiver) { let mut skips = 0; diff --git a/crates/hotblocks/src/metrics.rs b/crates/hotblocks/src/metrics.rs index d5e08c0b..488ddac0 100644 --- a/crates/hotblocks/src/metrics.rs +++ b/crates/hotblocks/src/metrics.rs @@ -1,6 +1,5 @@ use std::{fmt::Write, sync::LazyLock, time::Duration}; -use anyhow::bail; use prometheus_client::{ collector::Collector, encoding::{DescriptorEncoder, EncodeLabelSet, EncodeLabelValue, LabelValueEncoder}, @@ -12,10 +11,11 @@ use prometheus_client::{ }, registry::Registry }; -use sqd_storage::db::{DatasetId, ReadSnapshot}; -use tracing::error; +use sqd_storage::db::DatasetId; -use crate::{query::QueryExecutorCollector, types::DBRef}; +use crate::{ + data_service::DataServiceRef, dataset_controller::DatasetController, query::QueryExecutorCollector, types::DBRef +}; #[derive(Copy, Clone, Hash, Debug, Default, Ord, PartialOrd, Eq, PartialEq, EncodeLabelSet)] struct DatasetLabel { @@ -87,31 +87,27 @@ pub fn report_http_response(labels: &Vec<(&'static str, String)>, to_first_byte: HTTP_TTFB.get_or_create(&labels).observe(to_first_byte.as_secs_f64()); } -#[derive(Debug)] pub struct DatasetMetricsCollector { - pub db: DBRef, + pub data_service: DataServiceRef, pub datasets: Vec } +impl std::fmt::Debug for DatasetMetricsCollector { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DatasetMetricsCollector") + .field("datasets", &self.datasets) + .finish_non_exhaustive() + } +} + impl Collector for DatasetMetricsCollector { fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { - let db = self.db.snapshot(); - for dataset_id in self.datasets.iter().copied() { - if let Err(err) = collect_dataset_metrics(&mut encoder, &db, dataset_id) { - return if err.is::() { - Err(err.downcast().unwrap()) - } else { - // subsequent metric collection most likely will fail as well, - // hence let's terminate metric collection entirely - error!( - err =? err, - "failed to collect metrics for dataset {}", - dataset_id - ); - Ok(()) - }; - } + // Read from in-memory controller state only - no per-scrape DB access. + let Ok(controller) = self.data_service.get_dataset(dataset_id) else { + continue; + }; + collect_dataset_metrics(&mut encoder, dataset_id, &controller)?; } Ok(()) @@ -120,40 +116,44 @@ impl Collector for DatasetMetricsCollector { fn collect_dataset_metrics( encoder: &mut DescriptorEncoder, - db: &ReadSnapshot, - dataset_id: DatasetId -) -> anyhow::Result<()> { - let Some(label) = db.get_label(dataset_id)? else { + dataset_id: DatasetId, + controller: &DatasetController +) -> Result<(), std::fmt::Error> { + let last_block = controller.get_head_block_number(); + let stats = controller.get_stats(); + + // Nothing ingested yet - emit no series for this dataset. + if last_block.is_none() && stats.first_block.is_none() { return Ok(()); - }; - - let Some(first_chunk) = db.get_first_chunk(dataset_id)? else { - return Ok(()); - }; - - let Some(last_chunk) = db.get_last_chunk(dataset_id)? else { - bail!("first chunk exists, while last does not") - }; + } - encoder - .encode_descriptor("hotblocks_first_block", "First block", None, MetricType::Gauge)? - .encode_family(&dataset_label!(dataset_id))? - .encode_gauge(&first_chunk.first_block())?; + if let Some(first_block) = stats.first_block { + encoder + .encode_descriptor("hotblocks_first_block", "First block", None, MetricType::Gauge)? + .encode_family(&dataset_label!(dataset_id))? + .encode_gauge(&first_block)?; + } - encoder - .encode_descriptor("hotblocks_last_block", "Last block", None, MetricType::Gauge)? - .encode_family(&dataset_label!(dataset_id))? - .encode_gauge(&last_chunk.last_block())?; + // `last_block` is the live head; `last_block_time` below is from lagging stats, + // so the two can briefly disagree after a new block arrives. + if let Some(last_block) = last_block { + encoder + .encode_descriptor("hotblocks_last_block", "Last block", None, MetricType::Gauge)? + .encode_family(&dataset_label!(dataset_id))? + .encode_gauge(&last_block)?; + } - encoder - .encode_descriptor( - "hotblocks_last_block_timestamp_ms", - "Timestamp of the last block", - None, - MetricType::Gauge - )? - .encode_family(&dataset_label!(dataset_id))? - .encode_gauge(&last_chunk.last_block_time().unwrap_or(0))?; + if let Some(last_block_time) = stats.last_block_time { + encoder + .encode_descriptor( + "hotblocks_last_block_timestamp_ms", + "Timestamp of the last block", + None, + MetricType::Gauge + )? + .encode_family(&dataset_label!(dataset_id))? + .encode_gauge(&last_block_time)?; + } encoder .encode_descriptor( @@ -163,11 +163,59 @@ fn collect_dataset_metrics( MetricType::Gauge )? .encode_family(&dataset_label!(dataset_id))? - .encode_gauge(&label.finalized_head().map_or(0, |h| h.number))?; + .encode_gauge(&controller.get_finalized_head().map_or(0, |h| h.number))?; + + // Skip until computed once, so a fresh process doesn't report a spurious zero. + if let Some(size_bytes) = stats.size_bytes { + encoder + .encode_descriptor( + "hotblocks_dataset_size_bytes", + "Approximate on-disk size of the dataset's table data", + None, + MetricType::Gauge + )? + .encode_family(&dataset_label!(dataset_id))? + .encode_gauge(&size_bytes)?; + } Ok(()) } +/// On-disk size of each RocksDB column family. Unlike per-dataset sizes, this also +/// covers metadata column families and orphaned table data. Cheap enough to run +/// on every scrape (O(1) property reads). +#[derive(Debug)] +pub struct StorageMetricsCollector { + pub db: DBRef +} + +impl Collector for StorageMetricsCollector { + fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { + let sizes = match self.db.column_family_sizes() { + Ok(sizes) => sizes, + Err(err) => { + tracing::warn!(reason =? err, "failed to read column family sizes"); + return Ok(()); + } + }; + + for (cf, size) in sizes { + let labels: Vec<(&'static str, String)> = vec![("column_family", cf.to_string())]; + encoder + .encode_descriptor( + "hotblocks_column_family_size_bytes", + "Approximate on-disk size (total SST files) of a RocksDB column family", + None, + MetricType::Gauge + )? + .encode_family(&labels)? + .encode_gauge(&size)?; + } + + Ok(()) + } +} + pub fn build_metrics_registry() -> Registry { let mut top_registry = Registry::default(); let registry = top_registry.sub_registry_with_prefix("hotblocks"); diff --git a/crates/storage/src/db/db.rs b/crates/storage/src/db/db.rs index 34e0d71c..18d11800 100644 --- a/crates/storage/src/db/db.rs +++ b/crates/storage/src/db/db.rs @@ -22,6 +22,10 @@ pub(super) const CF_TABLES: Name = "TABLES"; pub(super) const CF_DIRTY_TABLES: Name = "DIRTY_TABLES"; pub(super) const CF_DELETED_TABLES: Name = "DELETED_TABLES"; +/// All column families. Keep in sync with the descriptor list in [`DatabaseSettings::open`]. +pub(super) const ALL_COLUMN_FAMILIES: [Name; 5] = + [CF_DATASETS, CF_CHUNKS, CF_TABLES, CF_DIRTY_TABLES, CF_DELETED_TABLES]; + pub(super) type RocksDB = rocksdb::OptimisticTransactionDB; pub(super) type RocksTransaction<'a> = rocksdb::Transaction<'a, RocksDB>; pub(super) type RocksTransactionOptions = rocksdb::OptimisticTransactionOptions; @@ -295,6 +299,22 @@ impl Database { self.options.get_statistics() } + /// Approximate on-disk size of each column family as `(cf_name, bytes)`. Counts + /// SST files only, excluding WAL and memtables. Constant-time property reads. + pub fn column_family_sizes(&self) -> anyhow::Result> { + ALL_COLUMN_FAMILIES + .into_iter() + .map(|cf| { + let handle = self.db.cf_handle(cf).expect("column family must exist"); + let size = self + .db + .property_int_value_cf(handle, "rocksdb.total-sst-files-size")? + .unwrap_or(0); + Ok((cf, size)) + }) + .collect() + } + pub fn get_property(&self, cf: &str, name: &str) -> anyhow::Result> { let Some(cf_handle) = self.db.cf_handle(cf) else { return Ok(None); diff --git a/crates/storage/src/db/read/snapshot.rs b/crates/storage/src/db/read/snapshot.rs index 31659392..60b7ed1b 100644 --- a/crates/storage/src/db/read/snapshot.rs +++ b/crates/storage/src/db/read/snapshot.rs @@ -2,7 +2,7 @@ use std::{collections::BTreeMap, ops::Deref, sync::Arc}; use anyhow::anyhow; use parking_lot::Mutex; -use rocksdb::{ColumnFamily, ReadOptions}; +use rocksdb::{ColumnFamily, Range, ReadOptions}; use sqd_primitives::{BlockNumber, Name}; use crate::{ @@ -14,7 +14,7 @@ use crate::{ DatasetLabel }, kv::KvRead, - table::read::TableReader + table::{key::TableKeyFactory, read::TableReader} }; pub struct ReadSnapshot<'a> { @@ -75,6 +75,30 @@ impl<'a> ReadSnapshot<'a> { self.list_chunks(dataset_id, 0, None).into_reversed().next().transpose() } + /// Approximate on-disk size (compressed, flushed bytes) of a dataset's table data. + /// + /// `CF_TABLES` is keyed by random `TableId`, so a dataset's tables aren't + /// contiguous: we resolve them through chunks and sum each table's key-range + /// size. An estimate, safe to run against a live database. + pub fn estimate_dataset_size(&self, dataset_id: DatasetId) -> anyhow::Result { + let mut bounds: Vec<(Vec, Vec)> = Vec::new(); + for chunk in self.list_chunks(dataset_id, 0, None) { + let chunk = chunk?; + for table_id in chunk.tables().values() { + // `start()`/`end()` bracket all of one table's keys, using the + // same key layout the writer and reader rely on. + let mut key = TableKeyFactory::new(table_id.as_ref()); + let lo = key.start().to_vec(); + let hi = key.end().to_vec(); + bounds.push((lo, hi)); + } + } + + let ranges: Vec = bounds.iter().map(|(lo, hi)| Range::new(lo, hi)).collect(); + let sizes = self.db.get_approximate_sizes_cf(self.cf_handle(CF_TABLES), &ranges); + Ok(sizes.iter().sum()) + } + fn new_options(&self) -> ReadOptions { let mut options = ReadOptions::default(); options.set_snapshot(&self.snapshot);