Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions crates/hotblocks/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use sqd_storage::db::{DatabaseSettings, DatasetId};
use crate::{
data_service::{DataService, DataServiceRef},
dataset_config::{DatasetConfig, RetentionConfig},
metrics::DatasetMetricsCollector,
metrics::{DatasetMetricsCollector, StorageMetricsCollector},
query::{QueryService, QueryServiceRef},
types::DBRef
};
Expand Down Expand Up @@ -91,10 +91,8 @@ impl CLI {
.context("failed to open rocksdb database")?;

let mut metrics_registry = crate::metrics::build_metrics_registry();
metrics_registry.register_collector(Box::new(DatasetMetricsCollector {
db: db.clone(),
datasets: datasets.keys().copied().collect()
}));

let dataset_ids: Vec<DatasetId> = datasets.keys().copied().collect();

let api_controlled_datasets = datasets
.iter()
Expand All @@ -103,6 +101,12 @@ impl CLI {

let data_service = DataService::start(db.clone(), datasets).await.map(Arc::new)?;

metrics_registry.register_collector(Box::new(DatasetMetricsCollector {
data_service: data_service.clone(),
datasets: dataset_ids
}));
metrics_registry.register_collector(Box::new(StorageMetricsCollector { db: db.clone() }));

let query_service = {
let mut builder = QueryService::builder(db.clone());
builder.set_max_data_waiters(self.query_max_data_waiters);
Expand Down
72 changes: 69 additions & 3 deletions crates/hotblocks/src/dataset_controller/dataset_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::{Context, anyhow};
use futures::{FutureExt, StreamExt, future::BoxFuture, stream::FuturesUnordered};
use sqd_data_client::reqwest::ReqwestDataClient;
use sqd_primitives::{BlockNumber, BlockRef};
use sqd_storage::db::{Chunk, CompactionStatus, DatasetId};
use sqd_storage::db::{Chunk, CompactionStatus, Database, DatasetId};
use tokio::{select, task::JoinHandle, time::Instant};
use tracing::{Instrument, debug, error, info, info_span, instrument, warn};

Expand All @@ -24,17 +24,29 @@ pub struct DatasetController {
head_receiver: tokio::sync::watch::Receiver<Option<BlockRef>>,
finalized_head_receiver: tokio::sync::watch::Receiver<Option<BlockRef>>,
compaction_enabled_sender: tokio::sync::watch::Sender<bool>,
stats_receiver: tokio::sync::watch::Receiver<DatasetStats>,
task: JoinHandle<()>,
compaction_task: JoinHandle<()>
compaction_task: JoinHandle<()>,
stats_task: JoinHandle<()>
}

impl Drop for DatasetController {
fn drop(&mut self) {
self.task.abort();
self.compaction_task.abort();
self.stats_task.abort();
}
}

/// Dataset metrics refreshed by [`dataset_stats_loop`] every ~60s.
#[derive(Clone, Debug, Default)]
pub struct DatasetStats {
pub first_block: Option<BlockNumber>,
pub last_block_time: Option<i64>,
/// `None` until computed once, distinguishing a fresh process from an empty dataset.
pub size_bytes: Option<u64>
}

impl DatasetController {
#[instrument(name = "dataset", skip_all, fields(dataset_id = %dataset_id))]
pub fn new(
Expand All @@ -54,6 +66,7 @@ impl DatasetController {
let (head_sender, head_receiver) = tokio::sync::watch::channel(None);
let (finalized_head_sender, finalized_head_receiver) = tokio::sync::watch::channel(None);
let (compaction_enabled_sender, compaction_enabled_receiver) = tokio::sync::watch::channel(false);
let (stats_sender, stats_receiver) = tokio::sync::watch::channel(DatasetStats::default());

let _ = head_sender.send(write.head().cloned());
let _ = finalized_head_sender.send(write.finalized_head().cloned());
Expand All @@ -70,6 +83,8 @@ impl DatasetController {

let task = tokio::spawn(ctl.run(write).in_current_span());

let stats_task = tokio::spawn(dataset_stats_loop(db.clone(), dataset_id, stats_sender).in_current_span());

let compaction_task =
tokio::spawn(compaction_loop(db, dataset_id, compaction_enabled_receiver).in_current_span());

Expand All @@ -80,8 +95,10 @@ impl DatasetController {
head_receiver,
finalized_head_receiver,
compaction_enabled_sender,
stats_receiver,
task,
compaction_task
compaction_task,
stats_task
})
}

Expand Down Expand Up @@ -117,6 +134,10 @@ impl DatasetController {
self.retention_sender.borrow().clone()
}

pub fn get_stats(&self) -> DatasetStats {
self.stats_receiver.borrow().clone()
}

pub async fn wait_for_block(&self, block_number: BlockNumber) -> BlockNumber {
let mut recv = self.head_receiver.clone();
loop {
Expand Down Expand Up @@ -591,6 +612,51 @@ async fn fetch_chain_top(clients: Vec<ReqwestDataClient>) -> BlockNumber {
}
}

#[instrument(name = "dataset_stats", skip_all)]
async fn dataset_stats_loop(db: DBRef, dataset_id: DatasetId, sender: tokio::sync::watch::Sender<DatasetStats>) {
const REFRESH: Duration = Duration::from_secs(60);

// Delay the first run by a per-dataset offset so the loops don't hit RocksDB together.
let offset = dataset_id
.as_ref()
.iter()
.fold(0u64, |acc, &b| acc.wrapping_add(b as u64))
% REFRESH.as_secs();
tokio::time::sleep(Duration::from_secs(offset)).await;

loop {
let db = db.clone();
let span = tracing::Span::current();
let result = tokio::task::spawn_blocking(move || {
let _s = span.enter();
compute_dataset_stats(&db, dataset_id)
})
.await;

match result {
Ok(Ok(stats)) => {
let _ = sender.send(stats);
}
Ok(Err(err)) => error!(reason =? err, "failed to estimate dataset stats"),
Err(err) => error!(reason =? err, "dataset stats task panicked")
}

tokio::time::sleep(REFRESH).await;
}
}

fn compute_dataset_stats(db: &Database, dataset_id: DatasetId) -> anyhow::Result<DatasetStats> {
let snapshot = db.snapshot();
let first_block = snapshot.get_first_chunk(dataset_id)?.map(|c| c.first_block());
let last_block_time = snapshot.get_last_chunk(dataset_id)?.and_then(|c| c.last_block_time());
let size_bytes = snapshot.estimate_dataset_size(dataset_id)?;
Ok(DatasetStats {
first_block,
last_block_time,
size_bytes: Some(size_bytes)
})
}

#[instrument(name = "compaction", skip_all)]
async fn compaction_loop(db: DBRef, dataset_id: DatasetId, mut enabled: tokio::sync::watch::Receiver<bool>) {
let mut skips = 0;
Expand Down
154 changes: 101 additions & 53 deletions crates/hotblocks/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{fmt::Write, sync::LazyLock, time::Duration};

use anyhow::bail;
use prometheus_client::{
collector::Collector,
encoding::{DescriptorEncoder, EncodeLabelSet, EncodeLabelValue, LabelValueEncoder},
Expand All @@ -12,10 +11,11 @@ use prometheus_client::{
},
registry::Registry
};
use sqd_storage::db::{DatasetId, ReadSnapshot};
use tracing::error;
use sqd_storage::db::DatasetId;

use crate::{query::QueryExecutorCollector, types::DBRef};
use crate::{
data_service::DataServiceRef, dataset_controller::DatasetController, query::QueryExecutorCollector, types::DBRef
};

#[derive(Copy, Clone, Hash, Debug, Default, Ord, PartialOrd, Eq, PartialEq, EncodeLabelSet)]
struct DatasetLabel {
Expand Down Expand Up @@ -87,31 +87,27 @@ pub fn report_http_response(labels: &Vec<(&'static str, String)>, to_first_byte:
HTTP_TTFB.get_or_create(&labels).observe(to_first_byte.as_secs_f64());
}

#[derive(Debug)]
pub struct DatasetMetricsCollector {
pub db: DBRef,
pub data_service: DataServiceRef,
pub datasets: Vec<DatasetId>
}

impl std::fmt::Debug for DatasetMetricsCollector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DatasetMetricsCollector")
.field("datasets", &self.datasets)
.finish_non_exhaustive()
}
}

impl Collector for DatasetMetricsCollector {
fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> {
let db = self.db.snapshot();

for dataset_id in self.datasets.iter().copied() {
if let Err(err) = collect_dataset_metrics(&mut encoder, &db, dataset_id) {
return if err.is::<std::fmt::Error>() {
Err(err.downcast().unwrap())
} else {
// subsequent metric collection most likely will fail as well,
// hence let's terminate metric collection entirely
error!(
err =? err,
"failed to collect metrics for dataset {}",
dataset_id
);
Ok(())
};
}
// Read from in-memory controller state only - no per-scrape DB access.
let Ok(controller) = self.data_service.get_dataset(dataset_id) else {
continue;
};
collect_dataset_metrics(&mut encoder, dataset_id, &controller)?;
}

Ok(())
Expand All @@ -120,40 +116,44 @@ impl Collector for DatasetMetricsCollector {

fn collect_dataset_metrics(
encoder: &mut DescriptorEncoder,
db: &ReadSnapshot,
dataset_id: DatasetId
) -> anyhow::Result<()> {
let Some(label) = db.get_label(dataset_id)? else {
dataset_id: DatasetId,
controller: &DatasetController
) -> Result<(), std::fmt::Error> {
let last_block = controller.get_head_block_number();
let stats = controller.get_stats();

// Nothing ingested yet - emit no series for this dataset.
if last_block.is_none() && stats.first_block.is_none() {
return Ok(());
};

let Some(first_chunk) = db.get_first_chunk(dataset_id)? else {
return Ok(());
};

let Some(last_chunk) = db.get_last_chunk(dataset_id)? else {
bail!("first chunk exists, while last does not")
};
}

encoder
.encode_descriptor("hotblocks_first_block", "First block", None, MetricType::Gauge)?
.encode_family(&dataset_label!(dataset_id))?
.encode_gauge(&first_chunk.first_block())?;
if let Some(first_block) = stats.first_block {
encoder
.encode_descriptor("hotblocks_first_block", "First block", None, MetricType::Gauge)?
.encode_family(&dataset_label!(dataset_id))?
.encode_gauge(&first_block)?;
}

encoder
.encode_descriptor("hotblocks_last_block", "Last block", None, MetricType::Gauge)?
.encode_family(&dataset_label!(dataset_id))?
.encode_gauge(&last_chunk.last_block())?;
// `last_block` is the live head; `last_block_time` below is from lagging stats,
// so the two can briefly disagree after a new block arrives.
if let Some(last_block) = last_block {
encoder
.encode_descriptor("hotblocks_last_block", "Last block", None, MetricType::Gauge)?
.encode_family(&dataset_label!(dataset_id))?
.encode_gauge(&last_block)?;
}

encoder
.encode_descriptor(
"hotblocks_last_block_timestamp_ms",
"Timestamp of the last block",
None,
MetricType::Gauge
)?
.encode_family(&dataset_label!(dataset_id))?
.encode_gauge(&last_chunk.last_block_time().unwrap_or(0))?;
if let Some(last_block_time) = stats.last_block_time {
encoder
.encode_descriptor(
"hotblocks_last_block_timestamp_ms",
"Timestamp of the last block",
None,
MetricType::Gauge
)?
.encode_family(&dataset_label!(dataset_id))?
.encode_gauge(&last_block_time)?;
}

encoder
.encode_descriptor(
Expand All @@ -163,11 +163,59 @@ fn collect_dataset_metrics(
MetricType::Gauge
)?
.encode_family(&dataset_label!(dataset_id))?
.encode_gauge(&label.finalized_head().map_or(0, |h| h.number))?;
.encode_gauge(&controller.get_finalized_head().map_or(0, |h| h.number))?;

// Skip until computed once, so a fresh process doesn't report a spurious zero.
if let Some(size_bytes) = stats.size_bytes {
encoder
.encode_descriptor(
"hotblocks_dataset_size_bytes",
"Approximate on-disk size of the dataset's table data",
None,
MetricType::Gauge
)?
.encode_family(&dataset_label!(dataset_id))?
.encode_gauge(&size_bytes)?;
}

Ok(())
}

/// On-disk size of each RocksDB column family. Unlike per-dataset sizes, this also
/// covers metadata column families and orphaned table data. Cheap enough to run
/// on every scrape (O(1) property reads).
#[derive(Debug)]
pub struct StorageMetricsCollector {
pub db: DBRef
}

impl Collector for StorageMetricsCollector {
fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> {
let sizes = match self.db.column_family_sizes() {
Ok(sizes) => sizes,
Err(err) => {
tracing::warn!(reason =? err, "failed to read column family sizes");
return Ok(());
}
};

for (cf, size) in sizes {
let labels: Vec<(&'static str, String)> = vec![("column_family", cf.to_string())];
encoder
.encode_descriptor(
"hotblocks_column_family_size_bytes",
"Approximate on-disk size (total SST files) of a RocksDB column family",
None,
MetricType::Gauge
)?
.encode_family(&labels)?
.encode_gauge(&size)?;
}

Ok(())
}
}

pub fn build_metrics_registry() -> Registry {
let mut top_registry = Registry::default();
let registry = top_registry.sub_registry_with_prefix("hotblocks");
Expand Down
Loading
Loading