From 1026d0d6758ae832c0c58835b8dadf961feb8a5e Mon Sep 17 00:00:00 2001 From: miton18 Date: Mon, 29 Jun 2026 14:37:20 +0200 Subject: [PATCH 1/6] feat(pulsar): support distributed indexing Allow `num_pipelines > 1` for the Pulsar source so its partitions can be indexed in parallel across several pipelines and indexers, on par with Kafka. The control plane already spawns N pipelines for "NonSharded" sources (Pulsar included), delegating partition distribution to the broker. The only blockers were a config validation gate and the consumer setup: - Lift the `num_pipelines > 1` validation gate for Pulsar. - Derive a per-pipeline consumer name (`{prefix}-{pipeline_uid}`) so each pipeline connects as a distinct consumer under the shared subscription. - Keep the `Failover` subscription: on a partitioned topic it assigns each partition to exactly one active consumer, matching Quickwit's per-partition checkpoint model. `Shared` would spread a partition's messages across pipelines and corrupt the metastore checkpoint; this is now documented at the call site. Tests: flip the config unit test to expect success, add a `pipeline_uid` test-builder setter, and add a broker integration test proving N pipelines split the partitions of a 4-partition topic with disjoint ownership, no loss, no duplicates, and a consistent merged checkpoint. Docs: add a "Distributed indexing" section to the Pulsar guide and note the partitioned-topic requirement in the source config reference. --- docs/configuration/source-config.md | 2 +- docs/ingest-data/pulsar.md | 23 ++ .../quickwit-config/src/source_config/mod.rs | 34 ++- .../src/source_config/serialize.rs | 3 +- quickwit/quickwit-indexing/src/source/mod.rs | 10 +- .../src/source/pulsar_source.rs | 225 +++++++++++++++++- 6 files changed, 279 insertions(+), 18 deletions(-) diff --git a/docs/configuration/source-config.md b/docs/configuration/source-config.md index 30894d3faad..b67efbb2605 100644 --- a/docs/configuration/source-config.md +++ b/docs/configuration/source-config.md @@ -223,7 +223,7 @@ will be decided by the control plane. :::info -Note that distributing the indexing load of partitioned sources like Kafka is done by assigning the different partitions to different pipelines. As a result, it is important to ensure that the number of partitions is a multiple of `num_pipelines`. +Note that distributing the indexing load of partitioned sources like Kafka and Pulsar is done by assigning the different partitions to different pipelines. As a result, it is important to ensure that the number of partitions is a multiple of `num_pipelines`. For Pulsar specifically, the topic(s) must be partitioned: a non-partitioned Pulsar topic is served by a single active consumer, so extra pipelines stay idle and provide no parallelism. Also, assuming you are only indexing a single Kafka source in your Quickwit cluster, you should set the number of pipelines to a multiple of the number of indexers. Finally, if your indexing throughput is high, you should provision between 2 and 4 vCPUs per pipeline. diff --git a/docs/ingest-data/pulsar.md b/docs/ingest-data/pulsar.md index 4563beda98e..650180d02bf 100644 --- a/docs/ingest-data/pulsar.md +++ b/docs/ingest-data/pulsar.md @@ -158,6 +158,29 @@ As soon as the Pulsar source is created, Quickwit control plane will ask an inde INFO spawn_pipeline{index=stackoverflow gen=0}:pulsar-consumer{subscription_name="quickwit-stackoverflow-pulsar-source" params=PulsarSourceParams { topics: ["stackoverflow"], address: "pulsar://localhost:6650", consumer_name: "quickwit", authentication: None } current_positions={}}: quickwit_indexing::source::pulsar_source: Seeking to last checkpoint positions. positions={} ``` +### Distributed indexing + +To parallelize indexing across several pipelines (and indexers), set `num_pipelines` on the source config to the number of pipelines you want: + +```yaml +version: 0.8 +source_id: pulsar-source +source_type: pulsar +num_pipelines: 4 +params: + topics: + - stackoverflow + address: pulsar://localhost:6650 +``` + +The pipelines share a single Pulsar `Failover` subscription, and the broker assigns each topic partition to exactly one active pipeline — so each partition is indexed by a single pipeline, with no duplicates and no loss. When a pipeline dies, Pulsar promotes another and it resumes from the subscription cursor. + +:::info + +Distribution happens **per partition**, so the topic(s) must be partitioned. A non-partitioned topic is served by a single active consumer: extra pipelines stay idle as standbys and provide no parallelism. For balanced load, make the number of partitions a multiple of `num_pipelines`. + +::: + ## Create and populate a Pulsar topic We will use the Pulsar's default tenant/namespace `public/default`. To populate the topic, we will use a python script: diff --git a/quickwit/quickwit-config/src/source_config/mod.rs b/quickwit/quickwit-config/src/source_config/mod.rs index 81811e9f47c..b687dba35ea 100644 --- a/quickwit/quickwit-config/src/source_config/mod.rs +++ b/quickwit/quickwit-config/src/source_config/mod.rs @@ -566,7 +566,10 @@ pub struct PulsarSourceParams { pub address: String, #[schema(default = "quickwit")] #[serde(default = "default_consumer_name")] - /// The name to register with the pulsar source. + /// The consumer name prefix to register with the pulsar source. With + /// distributed indexing (`num_pipelines > 1`), each pipeline appends its + /// pipeline UID to this prefix so it connects as a distinct consumer under + /// the shared subscription. pub consumer_name: String, // Serde yaml has some specific behaviour when deserializing // enums (see https://github.com/dtolnay/serde-yaml/issues/342) @@ -955,15 +958,14 @@ mod tests { "source_type": "pulsar", "params": { "topics": ["my-topic"], - "address": "http://localhost:6650" + "address": "pulsar://localhost:6650" } } "#; - load_source_config_from_user_config(ConfigFormat::Json, content.as_bytes()) - .unwrap_err(); - // TODO: uncomment asserts once distributed indexing is activated for pulsar. - // assert_eq!(source_config.num_pipelines(), 3); - // assert_eq!(source_config.max_num_pipelines_per_indexer(), 3); + let source_config = + load_source_config_from_user_config(ConfigFormat::Json, content.as_bytes()) + .unwrap(); + assert_eq!(source_config.num_pipelines.get(), 3); } } @@ -986,6 +988,24 @@ mod tests { .unwrap(); assert_eq!(source_config.num_pipelines.get(), 3); } + { + let content = r#" + { + "version": "0.8", + "source_id": "hdfs-logs-pulsar-source", + "num_pipelines": 3, + "source_type": "pulsar", + "params": { + "topics": ["my-topic"], + "address": "pulsar://localhost:6650" + } + } + "#; + let source_config = + load_source_config_from_user_config(ConfigFormat::Json, content.as_bytes()) + .unwrap(); + assert_eq!(source_config.num_pipelines.get(), 3); + } } #[test] diff --git a/quickwit/quickwit-config/src/source_config/serialize.rs b/quickwit/quickwit-config/src/source_config/serialize.rs index 3f580ee8fa3..1abbc596906 100644 --- a/quickwit/quickwit-config/src/source_config/serialize.rs +++ b/quickwit/quickwit-config/src/source_config/serialize.rs @@ -126,11 +126,12 @@ impl SourceConfigForSerialization { match &self.source_params { SourceParams::PubSub(_) | SourceParams::Kafka(_) + | SourceParams::Pulsar(_) | SourceParams::File(FileSourceParams::Notifications(_)) => {} _ => { if self.num_pipelines > 1 { bail!( - "Quickwit currently supports multiple pipelines only for GCP PubSub or Kafka sources. open an issue https://github.com/quickwit-oss/quickwit/issues if you need the feature for other source types" + "Quickwit currently supports multiple pipelines only for GCP PubSub, Kafka or Pulsar sources. open an issue https://github.com/quickwit-oss/quickwit/issues if you need the feature for other source types" ); } } diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index 8b4db0844ad..e4d41501354 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -595,6 +595,7 @@ mod tests { source_config: SourceConfig, metastore_opt: Option, queues_dir_path_opt: Option, + pipeline_uid: PipelineUid, } impl SourceRuntimeBuilder { @@ -604,6 +605,7 @@ mod tests { source_config, metastore_opt: None, queues_dir_path_opt: None, + pipeline_uid: PipelineUid::for_test(0u128), } } @@ -622,7 +624,7 @@ mod tests { node_id: NodeId::from_str("test-node"), index_uid: self.index_uid, source_id: self.source_config.source_id.clone(), - pipeline_uid: PipelineUid::for_test(0u128), + pipeline_uid: self.pipeline_uid, }, metastore, ingester_pool: IngesterPool::default(), @@ -656,6 +658,12 @@ mod tests { self } + #[cfg(all(test, feature = "pulsar-broker-tests"))] + pub fn with_pipeline_uid(mut self, pipeline_uid: PipelineUid) -> Self { + self.pipeline_uid = pipeline_uid; + self + } + fn setup_mock_metastore( &self, source_checkpoint_delta_opt: Option, diff --git a/quickwit/quickwit-indexing/src/source/pulsar_source.rs b/quickwit/quickwit-indexing/src/source/pulsar_source.rs index 679234ad2f5..a9b432f48b8 100644 --- a/quickwit/quickwit-indexing/src/source/pulsar_source.rs +++ b/quickwit/quickwit-indexing/src/source/pulsar_source.rs @@ -30,7 +30,7 @@ use quickwit_actors::ActorExitStatus; use quickwit_config::{PulsarSourceAuth, PulsarSourceParams}; use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint}; use quickwit_proto::metastore::SourceType; -use quickwit_proto::types::{IndexUid, Position}; +use quickwit_proto::types::{IndexUid, PipelineUid, Position}; use serde_json::{Value as JsonValue, json}; use tokio::time; use tracing::{debug, info, warn}; @@ -75,6 +75,12 @@ pub struct PulsarSource { source_params: PulsarSourceParams, pulsar_consumer: PulsarConsumer, subscription_name: String, + // Effective Pulsar consumer name, derived from the user-supplied + // `consumer_name` prefix and this pipeline's `pipeline_uid`. It must be + // unique per pipeline so that several pipelines of the same source connect + // as distinct consumers under the shared Failover subscription, letting the + // broker distribute partitions across them (distributed indexing). + consumer_name: String, current_positions: BTreeMap, state: PulsarSourceState, } @@ -97,11 +103,17 @@ impl PulsarSource { ) -> anyhow::Result { let subscription_name = subscription_name(source_runtime.index_uid(), source_runtime.source_id()); + // The consumer name must be unique per pipeline: under the shared + // Failover subscription, each pipeline connects as a separate consumer + // and the broker assigns each topic partition to exactly one of them. + let consumer_name = + consumer_name(&source_params.consumer_name, source_runtime.pipeline_uid()); info!( index_id=%source_runtime.index_id(), source_id=%source_runtime.source_id(), topics=?source_params.topics, subscription_name=%subscription_name, + consumer_name=%consumer_name, "Create Pulsar source." ); let pulsar = connect_pulsar(&source_params).await?; @@ -125,6 +137,7 @@ impl PulsarSource { } let pulsar_consumer = create_pulsar_consumer( subscription_name.clone(), + consumer_name.clone(), source_params.clone(), pulsar, current_positions.clone(), @@ -136,6 +149,7 @@ impl PulsarSource { source_params, pulsar_consumer, subscription_name, + consumer_name, current_positions, state: PulsarSourceState::default(), }) @@ -283,7 +297,7 @@ impl Source for PulsarSource { "source_id": self.source_runtime.source_id(), "topics": self.source_params.topics, "subscription_name": self.subscription_name, - "consumer_name": self.source_params.consumer_name, + "consumer_name": self.consumer_name, "num_bytes_processed": self.state.num_bytes_processed, "num_messages_processed": self.state.num_messages_processed, "num_invalid_messages": self.state.num_invalid_messages, @@ -306,6 +320,7 @@ impl DeserializeMessage for PulsarMessage { /// Creates a new pulsar consumer async fn create_pulsar_consumer( subscription_name: String, + consumer_name: String, params: PulsarSourceParams, pulsar: Pulsar, current_positions: BTreeMap, @@ -313,8 +328,13 @@ async fn create_pulsar_consumer( let mut consumer: Consumer = pulsar .consumer() .with_topics(¶ms.topics) - .with_consumer_name(¶ms.consumer_name) + .with_consumer_name(&consumer_name) .with_subscription(subscription_name) + // `Failover` (not `Shared`) is required for correctness: it assigns each + // topic partition to exactly one active consumer, so a given partition's + // checkpoint deltas always come from a single pipeline. `Shared` would + // spread a partition's messages across pipelines, producing conflicting, + // non-monotonic deltas that the metastore rejects. .with_subscription_type(SubType::Failover) .build() .await?; @@ -324,6 +344,10 @@ async fn create_pulsar_consumer( .into_iter() .map(|id| id.to_string()) .collect::>(); + // Every pipeline seeks all partitions to the positions read from the shared + // metastore checkpoint. The targets are identical across pipelines, so these + // seeks are idempotent even though each pipeline is only the active consumer + // for a subset of partitions under the Failover subscription. info!(positions = ?current_positions, "seeking to last checkpoint positions"); for (_, position) in current_positions { let seek_to = msg_id_from_position(&position); @@ -437,6 +461,16 @@ fn subscription_name(index_uid: &IndexUid, source_id: &str) -> String { format!("quickwit-{index_uid}-{source_id}") } +/// Builds the effective Pulsar consumer name for a pipeline. +/// +/// `base` is the user-supplied `consumer_name` (a prefix); the `pipeline_uid` +/// suffix makes the name unique per pipeline. This uniqueness lets several +/// pipelines of the same source connect as distinct consumers under the shared +/// Failover subscription, so the broker can distribute partitions across them. +fn consumer_name(base: &str, pipeline_uid: PipelineUid) -> String { + format!("{base}-{pipeline_uid}") +} + #[cfg(all(test, feature = "pulsar-broker-tests"))] mod pulsar_broker_tests { use std::collections::HashSet; @@ -638,15 +672,41 @@ mod pulsar_broker_tests { ); } + /// Default pipeline UID used by single-pipeline tests. + fn default_test_pipeline_uid() -> PipelineUid { + PipelineUid::for_test(0u128) + } + async fn create_source( + universe: &Universe, + metastore: MetastoreServiceClient, + index_uid: IndexUid, + source_config: SourceConfig, + start_checkpoint: SourceCheckpoint, + ) -> anyhow::Result<(ActorHandle, Inbox)> { + create_source_with_pipeline_uid( + universe, + metastore, + index_uid, + source_config, + start_checkpoint, + default_test_pipeline_uid(), + ) + .await + } + + async fn create_source_with_pipeline_uid( universe: &Universe, _metastore: MetastoreServiceClient, index_uid: IndexUid, source_config: SourceConfig, _start_checkpoint: SourceCheckpoint, + pipeline_uid: PipelineUid, ) -> anyhow::Result<(ActorHandle, Inbox)> { let source_loader = quickwit_supported_sources(); - let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build(); + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config) + .with_pipeline_uid(pipeline_uid) + .build(); let source = source_loader.load_source(source_runtime).await?; let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); let source_actor = SourceActor::new(source, doc_processor_mailbox); @@ -866,7 +926,7 @@ mod pulsar_broker_tests { "source_id": source_id, "topics": vec![topic], "subscription_name": subscription_name(&index_uid, &source_id), - "consumer_name": CLIENT_NAME, + "consumer_name": consumer_name(CLIENT_NAME, default_test_pipeline_uid()), "num_bytes_processed": num_bytes, "num_messages_processed": 10, "num_invalid_messages": 0, @@ -933,7 +993,7 @@ mod pulsar_broker_tests { "source_id": source_id, "topics": vec![topic1, topic2], "subscription_name": subscription_name(&index_uid, &source_id), - "consumer_name": CLIENT_NAME, + "consumer_name": consumer_name(CLIENT_NAME, default_test_pipeline_uid()), "num_bytes_processed": num_bytes, "num_messages_processed": 20, "num_invalid_messages": 0, @@ -987,7 +1047,7 @@ mod pulsar_broker_tests { "source_id": source_id, "topics": vec![topic], "subscription_name": subscription_name(&index_uid, &source_id), - "consumer_name": CLIENT_NAME, + "consumer_name": consumer_name(CLIENT_NAME, default_test_pipeline_uid()), "num_bytes_processed": num_bytes, "num_messages_processed": 10, "num_invalid_messages": 0, @@ -1070,7 +1130,7 @@ mod pulsar_broker_tests { "source_id": source_id, "topics": vec![topic], "subscription_name": subscription_name(&index_uid, &source_id), - "consumer_name": CLIENT_NAME, + "consumer_name": consumer_name(CLIENT_NAME, default_test_pipeline_uid()), "num_bytes_processed": num_bytes, "num_messages_processed": 10, "num_invalid_messages": 0, @@ -1079,6 +1139,155 @@ mod pulsar_broker_tests { assert_eq!(exit_state2, expected_state); } + /// Proves the distributed-indexing contract: several pipelines of the same + /// Pulsar source (distinct `pipeline_uid`s, shared Failover subscription) + /// split the partitions of a partitioned topic with no loss, no duplicates, + /// and produce checkpoint deltas that merge into a single consistent + /// metastore checkpoint. + #[tokio::test] + async fn test_partitioned_topic_distributed_indexing() { + let universe = Universe::with_accelerated_time(); + let metastore = metastore_for_test(); + let topic = append_random_suffix("test-pulsar-source--distributed--topic"); + let index_id = append_random_suffix("test-pulsar-source--distributed--index"); + let (_source_id, source_config) = get_source_config([&topic]); + + let num_partitions = 4; + let msgs_per_partition = 10; + let total_messages = num_partitions * msgs_per_partition; + + create_partitioned_topic(&topic, num_partitions).await; + let index_uid = setup_index(metastore.clone(), &index_id, &source_config, &[]).await; + + let partition_topics: Vec = (0..num_partitions) + .map(|partition| format!("{topic}-partition-{partition}")) + .collect(); + + // Two pipelines with distinct pipeline UIDs: the realistic distributed + // setup where the control plane spawns several pipelines for one source. + let (source_handle1, doc_processor_inbox1) = create_source_with_pipeline_uid( + &universe, + metastore.clone(), + index_uid.clone(), + source_config.clone(), + SourceCheckpoint::default(), + PipelineUid::for_test(1u128), + ) + .await + .expect("Create source 1"); + let (source_handle2, doc_processor_inbox2) = create_source_with_pipeline_uid( + &universe, + metastore.clone(), + index_uid.clone(), + source_config.clone(), + SourceCheckpoint::default(), + PipelineUid::for_test(2u128), + ) + .await + .expect("Create source 2"); + + let expected_docs = populate_topic( + partition_topics.iter().map(|topic| topic.as_str()), + 0..msgs_per_partition, + message_generator, + ) + .await + .unwrap(); + + // Wait until the two pipelines together have processed every message. + loop { + let processed1 = source_handle1 + .observe() + .await + .state + .get("num_messages_processed") + .unwrap() + .as_u64() + .unwrap(); + let processed2 = source_handle2 + .observe() + .await + .state + .get("num_messages_processed") + .unwrap() + .as_u64() + .unwrap(); + if processed1 + processed2 >= total_messages as u64 { + break; + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + + source_handle1.quit().await; + source_handle2.quit().await; + + let batch1 = merge_doc_batches(doc_processor_inbox1.drain_for_test_typed()); + let batch2 = merge_doc_batches(doc_processor_inbox2.drain_for_test_typed()); + + // Each pipeline owns a non-empty, disjoint set of partitions. + let partitions1: HashSet = + batch1.checkpoint_delta.partitions().cloned().collect(); + let partitions2: HashSet = + batch2.checkpoint_delta.partitions().cloned().collect(); + assert!( + !partitions1.is_empty(), + "pipeline 1 should own at least one partition" + ); + assert!( + !partitions2.is_empty(), + "pipeline 2 should own at least one partition" + ); + assert!( + partitions1.is_disjoint(&partitions2), + "pipelines must not share partitions: {partitions1:?} vs {partitions2:?}" + ); + let owned_partitions: HashSet = + partitions1.union(&partitions2).cloned().collect(); + assert_eq!( + owned_partitions.len(), + num_partitions, + "every partition must be owned by exactly one pipeline" + ); + + // Every produced message is accounted for, with no loss and no + // duplicates across the pipelines. + let produced: usize = expected_docs + .iter() + .map(|topic_data| topic_data.len()) + .sum(); + assert_eq!(produced, total_messages, "test should produce all messages"); + let delta1 = batch1.checkpoint_delta.clone(); + let delta2 = batch2.checkpoint_delta.clone(); + let total_docs = batch1.docs.len() + batch2.docs.len(); + let batches = vec![batch1, batch2]; + assert_eq!(total_docs, total_messages, "no message should be dropped"); + assert_eq!( + count_unique_messages_in_batches(&batches), + total_messages, + "no message should be duplicated across pipelines" + ); + + // The two pipelines' deltas merge into a single checkpoint without + // conflict (the metastore-consistency proof) and every partition has + // advanced past the beginning. + let mut checkpoint = SourceCheckpoint::default(); + checkpoint + .try_apply_delta(delta1) + .expect("apply pipeline 1 delta"); + checkpoint + .try_apply_delta(delta2) + .expect("apply pipeline 2 delta"); + for partition_topic in &partition_topics { + let partition = PartitionId::from(partition_topic.as_str()); + let position = checkpoint.position_for_partition(&partition); + assert!( + matches!(position, Some(position) if *position != Position::Beginning), + "partition {partition_topic} must have an advanced checkpoint position, got \ + {position:?}" + ); + } + } + #[tokio::test] async fn test_partitioned_topic_multi_consumer_ingestion_with_failover() { // We test successive failures of one source and observe pulsar failover mechanism. From 97b03e02e5cbf0220eb3c92fe39f6b8f657afc94 Mon Sep 17 00:00:00 2001 From: miton18 Date: Mon, 29 Jun 2026 17:17:51 +0200 Subject: [PATCH 2/6] fix(pulsar): refresh partition position on failover handoff; fix rest test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses review feedback on distributed Pulsar indexing. P1 — Failover position refresh: with num_pipelines > 1, a pipeline can become the active Failover consumer for a partition another pipeline has already advanced. The `current_positions` snapshot taken in `try_new` is then stale, so the first delta would start from a position behind the committed metastore checkpoint and the publish would be rejected, wedging the partition. We now lazily re-read the committed position from the metastore the first time a partition is consumed in a pipeline session (tracked via `synced_partitions`), so the first recorded delta is contiguous. Mirrors the Kafka source refreshing offsets on assignment. Added unit tests for the `align_position` helper. P2 — Updated `test_create_source_with_bad_config` in quickwit-serve: it asserted the multi-pipeline rejection using a Pulsar source, which is now allowed. Switched it to a kinesis source and the updated error message. --- .../src/source/pulsar_source.rs | 106 +++++++++++++++++- .../src/index_api/rest_handler.rs | 11 +- 2 files changed, 110 insertions(+), 7 deletions(-) diff --git a/quickwit/quickwit-indexing/src/source/pulsar_source.rs b/quickwit/quickwit-indexing/src/source/pulsar_source.rs index a9b432f48b8..b8fdb95da42 100644 --- a/quickwit/quickwit-indexing/src/source/pulsar_source.rs +++ b/quickwit/quickwit-indexing/src/source/pulsar_source.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashSet}; use std::fmt; use std::time::{Duration, Instant}; @@ -82,6 +82,15 @@ pub struct PulsarSource { // broker distribute partitions across them (distributed indexing). consumer_name: String, current_positions: BTreeMap, + // Partitions whose `current_positions` entry has been aligned with the + // committed metastore checkpoint during this pipeline session. With + // `num_pipelines > 1`, a pipeline can become the active Failover consumer + // for a partition another pipeline already advanced; the startup snapshot in + // `current_positions` is then stale. We lazily re-read the committed position + // the first time we consume a partition so the first recorded delta is + // contiguous with the metastore checkpoint (otherwise the publish is + // rejected and the partition wedges). + synced_partitions: HashSet, state: PulsarSourceState, } @@ -151,6 +160,7 @@ impl PulsarSource { subscription_name, consumer_name, current_positions, + synced_partitions: HashSet::new(), state: PulsarSourceState::default(), }) } @@ -220,6 +230,94 @@ impl PulsarSource { } Ok(()) } + + /// Aligns the local position of a partition with the committed metastore + /// checkpoint the first time it is consumed in this pipeline session. + /// + /// This is required for distributed indexing (`num_pipelines > 1`): when this + /// pipeline becomes the active Failover consumer for a partition another + /// pipeline already advanced, the `current_positions` snapshot taken in + /// `try_new` is stale. Re-reading the committed position ensures the first + /// delta we record starts where the metastore checkpoint actually is, so the + /// publish is accepted. + async fn sync_partition_position(&mut self, partition: &PartitionId) -> anyhow::Result<()> { + if !self.synced_partitions.insert(partition.clone()) { + return Ok(()); + } + let checkpoint = self.source_runtime.fetch_checkpoint().await?; + align_position(&mut self.current_positions, partition, &checkpoint); + Ok(()) + } +} + +/// Sets the local position of `partition` to the committed position from +/// `checkpoint`, or removes any stale entry when the checkpoint has none (so the +/// next delta starts from `Position::Beginning`). +fn align_position( + current_positions: &mut BTreeMap, + partition: &PartitionId, + checkpoint: &SourceCheckpoint, +) { + match checkpoint.position_for_partition(partition) { + Some(position) => { + current_positions.insert(partition.clone(), position.clone()); + } + None => { + current_positions.remove(partition); + } + } +} + +#[cfg(test)] +mod position_alignment_tests { + use quickwit_metastore::checkpoint::SourceCheckpointDelta; + + use super::*; + + fn checkpoint_with(partition: &str, offset: u64) -> SourceCheckpoint { + let mut delta = SourceCheckpointDelta::default(); + delta + .record_partition_delta( + PartitionId::from(partition), + Position::Beginning, + Position::offset(offset), + ) + .unwrap(); + let mut checkpoint = SourceCheckpoint::default(); + checkpoint.try_apply_delta(delta).unwrap(); + checkpoint + } + + #[test] + fn align_position_adopts_committed_over_stale_local() { + // Simulates a Failover handoff: another pipeline advanced the partition to + // offset 42, but this pipeline still holds a stale startup snapshot (10). + let partition = PartitionId::from("topic-partition-0"); + let committed = checkpoint_with("topic-partition-0", 42); + let mut current_positions = BTreeMap::new(); + current_positions.insert(partition.clone(), Position::offset(10u64)); + + align_position(&mut current_positions, &partition, &committed); + + assert_eq!( + current_positions.get(&partition), + Some(&Position::offset(42u64)) + ); + } + + #[test] + fn align_position_clears_stale_entry_when_uncommitted() { + // The committed checkpoint has nothing for this partition, so any stale + // local entry must be dropped (next delta starts from `Beginning`). + let partition = PartitionId::from("topic-partition-1"); + let committed = checkpoint_with("topic-partition-0", 42); + let mut current_positions = BTreeMap::new(); + current_positions.insert(partition.clone(), Position::offset(10u64)); + + align_position(&mut current_positions, &partition, &committed); + + assert_eq!(current_positions.get(&partition), None); + } } #[async_trait] @@ -244,6 +342,12 @@ impl Source for PulsarSource { .ok_or_else(|| ActorExitStatus::from(anyhow!("consumer was dropped")))? .map_err(|e| ActorExitStatus::from(anyhow!("failed to get message from consumer: {:?}", e)))?; + // Align with the committed checkpoint the first time we see a + // partition, so a Failover handoff from another pipeline does not + // record a delta from a stale startup position. + let partition = PartitionId::from(message.topic.clone()); + self.sync_partition_position(&partition).await.map_err(ActorExitStatus::from)?; + self.process_message(message, &mut batch_builder).map_err(ActorExitStatus::from)?; if batch_builder.num_bytes >= BATCH_NUM_BYTES_LIMIT { diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index bc4c8d9b105..1131411a152 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -1085,23 +1085,22 @@ mod tests { assert!(body.contains("invalid type: floating point `0.4`")); } { - // Invalid pulsar source config with number of pipelines > 1, not supported yet. + // Invalid kinesis source config with number of pipelines > 1, not supported yet. let resp = warp::test::request() .path("/indexes/my-index/sources") .method("POST") .json(&true) .body( - r#"{"version": "0.8", "source_id": "pulsar-source", - "num_pipelines": 2, "source_type": "pulsar", "params": {"topics": ["my-topic"], - "address": "pulsar://localhost:6650" }}"#, + r#"{"version": "0.8", "source_id": "kinesis-source", + "num_pipelines": 2, "source_type": "kinesis", "params": {"stream_name": "my-stream"}}"#, ) .reply(&index_management_handler) .await; assert_eq!(resp.status(), 400); let body = std::str::from_utf8(resp.body()).unwrap(); assert!(body.contains( - "Quickwit currently supports multiple pipelines only for GCP PubSub or Kafka \ - sources" + "Quickwit currently supports multiple pipelines only for GCP PubSub, Kafka or \ + Pulsar sources" )); } { From c326bddd48686bfc94308b7d9e5463fe35b5fa2e Mon Sep 17 00:00:00 2001 From: miton18 Date: Tue, 30 Jun 2026 09:58:27 +0200 Subject: [PATCH 3/6] fix(pulsar): resync partition position on every failover (re)gain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to the failover position-refresh fix. The previous version tracked synced partitions in a set that was never cleared, so it only re-read the committed position once per pipeline lifetime. After a second handoff (a pipeline loses a partition to another pipeline, then regains it once that pipeline leaves), it would reuse the stale local position and record a non-contiguous delta, which the metastore rejects — stalling the partition after a normal rebalance. We now track the last time a message was received per partition and re-read the committed position whenever a partition is consumed for the first time or reappears after a gap longer than POSITION_RESYNC_GAP (30s), a proxy for a Failover (re)gain since the Pulsar client does not surface partition reassignments. The merge never moves a position backwards, so a continuously-owned partition never resyncs and in-flight progress is preserved. Renamed the helper to `advance_position` and extended the unit tests (adopt-when-ahead, keep-local-when-ahead, no-op-when-uncommitted). --- .../src/source/pulsar_source.rs | 122 ++++++++++++------ 1 file changed, 79 insertions(+), 43 deletions(-) diff --git a/quickwit/quickwit-indexing/src/source/pulsar_source.rs b/quickwit/quickwit-indexing/src/source/pulsar_source.rs index b8fdb95da42..92f09eca83d 100644 --- a/quickwit/quickwit-indexing/src/source/pulsar_source.rs +++ b/quickwit/quickwit-indexing/src/source/pulsar_source.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashSet}; +use std::collections::{BTreeMap, HashMap}; use std::fmt; use std::time::{Duration, Instant}; @@ -42,6 +42,12 @@ use crate::source::{ type PulsarConsumer = Consumer; +/// Minimum time without receiving any message from a partition before we treat a +/// new message for it as a Failover (re)gain and re-read its committed position +/// from the metastore. Large enough that continuously-owned partitions never +/// resync, small enough to recover quickly after a realistic rebalance. +const POSITION_RESYNC_GAP: Duration = Duration::from_secs(30); + pub struct PulsarSourceFactory; #[async_trait] @@ -82,15 +88,15 @@ pub struct PulsarSource { // broker distribute partitions across them (distributed indexing). consumer_name: String, current_positions: BTreeMap, - // Partitions whose `current_positions` entry has been aligned with the - // committed metastore checkpoint during this pipeline session. With - // `num_pipelines > 1`, a pipeline can become the active Failover consumer - // for a partition another pipeline already advanced; the startup snapshot in - // `current_positions` is then stale. We lazily re-read the committed position - // the first time we consume a partition so the first recorded delta is - // contiguous with the metastore checkpoint (otherwise the publish is - // rejected and the partition wedges). - synced_partitions: HashSet, + // Last time a message was received for each partition. Used to detect a + // Failover (re)gain: with `num_pipelines > 1` a pipeline can lose a partition + // to another pipeline and later regain it. While it did not own the + // partition, the other pipeline may have advanced the committed checkpoint, + // leaving this pipeline's `current_positions` entry stale. When we start + // receiving a partition again after a gap (or for the first time), we re-read + // the committed position so the next delta is contiguous with the metastore + // checkpoint — otherwise the publish is rejected and the partition wedges. + last_message_at: HashMap, state: PulsarSourceState, } @@ -160,7 +166,7 @@ impl PulsarSource { subscription_name, consumer_name, current_positions, - synced_partitions: HashSet::new(), + last_message_at: HashMap::new(), state: PulsarSourceState::default(), }) } @@ -231,40 +237,53 @@ impl PulsarSource { Ok(()) } - /// Aligns the local position of a partition with the committed metastore - /// checkpoint the first time it is consumed in this pipeline session. + /// Re-reads the committed position for a partition from the metastore when we + /// (re)start consuming it, so the next recorded delta is contiguous with the + /// metastore checkpoint. /// - /// This is required for distributed indexing (`num_pipelines > 1`): when this - /// pipeline becomes the active Failover consumer for a partition another - /// pipeline already advanced, the `current_positions` snapshot taken in - /// `try_new` is stale. Re-reading the committed position ensures the first - /// delta we record starts where the metastore checkpoint actually is, so the - /// publish is accepted. + /// Required for distributed indexing (`num_pipelines > 1`): a pipeline can + /// lose a partition to another pipeline under the Failover subscription and + /// later regain it. While it did not own the partition, the other pipeline may + /// have advanced the committed checkpoint, so the local `current_positions` + /// entry is stale. We resync the first time we see a partition and whenever it + /// reappears after a gap longer than [`POSITION_RESYNC_GAP`] (a proxy for a + /// handoff, since the Pulsar client does not surface partition reassignments). + /// Without this, the stale `from` makes the publish rejected and the partition + /// wedges after a rebalance. Continuously-owned partitions never resync (the + /// committed checkpoint only trails the positions we already recorded). async fn sync_partition_position(&mut self, partition: &PartitionId) -> anyhow::Result<()> { - if !self.synced_partitions.insert(partition.clone()) { + let now = Instant::now(); + let resync = match self.last_message_at.get(partition) { + Some(last_message_at) => now.duration_since(*last_message_at) > POSITION_RESYNC_GAP, + None => true, + }; + self.last_message_at.insert(partition.clone(), now); + if !resync { return Ok(()); } - let checkpoint = self.source_runtime.fetch_checkpoint().await?; - align_position(&mut self.current_positions, partition, &checkpoint); + let committed = self.source_runtime.fetch_checkpoint().await?; + advance_position(&mut self.current_positions, partition, &committed); Ok(()) } } -/// Sets the local position of `partition` to the committed position from -/// `checkpoint`, or removes any stale entry when the checkpoint has none (so the -/// next delta starts from `Position::Beginning`). -fn align_position( +/// Advances the local position of `partition` to the committed position when the +/// committed checkpoint is ahead. It never moves a position backwards, so +/// uncommitted in-flight progress made by this pipeline is preserved and already +/// consumed messages are not re-emitted. +fn advance_position( current_positions: &mut BTreeMap, partition: &PartitionId, - checkpoint: &SourceCheckpoint, + committed: &SourceCheckpoint, ) { - match checkpoint.position_for_partition(partition) { - Some(position) => { - current_positions.insert(partition.clone(), position.clone()); - } - None => { - current_positions.remove(partition); - } + let Some(committed_position) = committed.position_for_partition(partition) else { + return; + }; + let local_position = current_positions + .entry(partition.clone()) + .or_insert(Position::Beginning); + if *local_position < *committed_position { + *local_position = committed_position.clone(); } } @@ -289,15 +308,15 @@ mod position_alignment_tests { } #[test] - fn align_position_adopts_committed_over_stale_local() { - // Simulates a Failover handoff: another pipeline advanced the partition to - // offset 42, but this pipeline still holds a stale startup snapshot (10). + fn advance_position_adopts_committed_when_ahead() { + // Failover (re)gain: another pipeline advanced the partition to offset 42 + // while this pipeline held a stale position (10). let partition = PartitionId::from("topic-partition-0"); let committed = checkpoint_with("topic-partition-0", 42); let mut current_positions = BTreeMap::new(); current_positions.insert(partition.clone(), Position::offset(10u64)); - align_position(&mut current_positions, &partition, &committed); + advance_position(&mut current_positions, &partition, &committed); assert_eq!( current_positions.get(&partition), @@ -306,15 +325,32 @@ mod position_alignment_tests { } #[test] - fn align_position_clears_stale_entry_when_uncommitted() { - // The committed checkpoint has nothing for this partition, so any stale - // local entry must be dropped (next delta starts from `Beginning`). + fn advance_position_keeps_local_when_ahead_of_committed() { + // Local uncommitted progress (50) must not be moved back to the committed + // checkpoint (42), otherwise already-consumed messages would be re-emitted. + let partition = PartitionId::from("topic-partition-0"); + let committed = checkpoint_with("topic-partition-0", 42); + let mut current_positions = BTreeMap::new(); + current_positions.insert(partition.clone(), Position::offset(50u64)); + + advance_position(&mut current_positions, &partition, &committed); + + assert_eq!( + current_positions.get(&partition), + Some(&Position::offset(50u64)) + ); + } + + #[test] + fn advance_position_noop_when_partition_uncommitted() { + // The committed checkpoint has nothing for this partition: leave the local + // entry untouched (a missing entry means the next delta starts at the + // beginning, which is correct when nothing was ever committed). let partition = PartitionId::from("topic-partition-1"); let committed = checkpoint_with("topic-partition-0", 42); let mut current_positions = BTreeMap::new(); - current_positions.insert(partition.clone(), Position::offset(10u64)); - align_position(&mut current_positions, &partition, &committed); + advance_position(&mut current_positions, &partition, &committed); assert_eq!(current_positions.get(&partition), None); } From d765a6cf097556d25c4f24670551408d697c0c25 Mon Sep 17 00:00:00 2001 From: miton18 Date: Tue, 30 Jun 2026 10:25:58 +0200 Subject: [PATCH 4/6] fix(pulsar): gate failover resync to distributed sources; document metastore backstop; wire test metastore MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses the third review round on distributed Pulsar indexing. - Single-writer correctness is ultimately enforced by the metastore, not by the source: SourceCheckpoint::try_apply_delta rejects any overlapping or non-contiguous delta, so a stale position or a brief overlap between the old and new active Failover consumers during a rebalance yields a rejected publish and a pipeline restart (resuming from the committed checkpoint), never lost or duplicated data — the same model the Kafka source relies on across consumer-group rebalances. Documented this on sync_partition_position. The time-gap resync stays a best-effort churn optimization; cases it misses (e.g. a re-handoff within the gap, which the Pulsar client cannot signal) are caught by that backstop. - Gate the resync to sources with num_pipelines > 1 (new `distributed` flag): a single-pipeline source can never hand a partition over, so it never needs to re-read the checkpoint — restoring the exact previous single-pipeline behavior and avoiding any extra metastore load for the common case. - Wire the real test metastore into the Pulsar broker-test source builder (it previously ran against an empty mock checkpoint) and add test_distributed_source_resumes_from_committed_checkpoint, which seeds a committed checkpoint ("another pipeline") and asserts a distributed source resumes from it instead of re-indexing committed messages. Enables with_metastore under the pulsar-broker-tests feature. --- quickwit/quickwit-indexing/src/source/mod.rs | 6 +- .../src/source/pulsar_source.rs | 94 ++++++++++++++++++- 2 files changed, 95 insertions(+), 5 deletions(-) diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index e4d41501354..2c7f36714f3 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -638,7 +638,11 @@ mod tests { #[cfg(all( test, - any(feature = "kafka-broker-tests", feature = "sqs-localstack-tests") + any( + feature = "kafka-broker-tests", + feature = "sqs-localstack-tests", + feature = "pulsar-broker-tests" + ) ))] pub fn with_metastore(mut self, metastore: MetastoreServiceClient) -> Self { self.metastore_opt = Some(metastore); diff --git a/quickwit/quickwit-indexing/src/source/pulsar_source.rs b/quickwit/quickwit-indexing/src/source/pulsar_source.rs index 92f09eca83d..c73be14dd14 100644 --- a/quickwit/quickwit-indexing/src/source/pulsar_source.rs +++ b/quickwit/quickwit-indexing/src/source/pulsar_source.rs @@ -87,6 +87,9 @@ pub struct PulsarSource { // as distinct consumers under the shared Failover subscription, letting the // broker distribute partitions across them (distributed indexing). consumer_name: String, + // Whether this source runs with more than one pipeline. Only then can a + // partition be handed between pipelines, so only then do we resync positions. + distributed: bool, current_positions: BTreeMap, // Last time a message was received for each partition. Used to detect a // Failover (re)gain: with `num_pipelines > 1` a pipeline can lose a partition @@ -123,6 +126,7 @@ impl PulsarSource { // and the broker assigns each topic partition to exactly one of them. let consumer_name = consumer_name(&source_params.consumer_name, source_runtime.pipeline_uid()); + let distributed = source_runtime.source_config.num_pipelines.get() > 1; info!( index_id=%source_runtime.index_id(), source_id=%source_runtime.source_id(), @@ -165,6 +169,7 @@ impl PulsarSource { pulsar_consumer, subscription_name, consumer_name, + distributed, current_positions, last_message_at: HashMap::new(), state: PulsarSourceState::default(), @@ -248,10 +253,24 @@ impl PulsarSource { /// entry is stale. We resync the first time we see a partition and whenever it /// reappears after a gap longer than [`POSITION_RESYNC_GAP`] (a proxy for a /// handoff, since the Pulsar client does not surface partition reassignments). - /// Without this, the stale `from` makes the publish rejected and the partition - /// wedges after a rebalance. Continuously-owned partitions never resync (the - /// committed checkpoint only trails the positions we already recorded). + /// Continuously-owned partitions never resync (the committed checkpoint only + /// trails the positions we already recorded), and single-pipeline sources skip + /// it entirely since they can never hand a partition over. + /// + /// This is a best-effort optimization to avoid churn, not the correctness + /// guarantee. The per-partition single-writer invariant is ultimately enforced + /// by the metastore: `SourceCheckpoint::try_apply_delta` rejects any + /// overlapping or non-contiguous delta, so a stale position or a brief overlap + /// between the old and new active Failover consumers during a rebalance results + /// in a rejected publish and a pipeline restart (which resumes from the + /// committed checkpoint), never in lost or duplicated data. This matches how + /// the Kafka source relies on the metastore across consumer-group rebalances. + /// The time-gap heuristic cannot detect every reassignment (the Pulsar client + /// surfaces none), but any case it misses is caught by that backstop. async fn sync_partition_position(&mut self, partition: &PartitionId) -> anyhow::Result<()> { + if !self.distributed { + return Ok(()); + } let now = Instant::now(); let resync = match self.last_message_at.get(partition) { Some(last_message_at) => now.duration_since(*last_message_at) > POSITION_RESYNC_GAP, @@ -837,7 +856,7 @@ mod pulsar_broker_tests { async fn create_source_with_pipeline_uid( universe: &Universe, - _metastore: MetastoreServiceClient, + metastore: MetastoreServiceClient, index_uid: IndexUid, source_config: SourceConfig, _start_checkpoint: SourceCheckpoint, @@ -845,6 +864,7 @@ mod pulsar_broker_tests { ) -> anyhow::Result<(ActorHandle, Inbox)> { let source_loader = quickwit_supported_sources(); let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config) + .with_metastore(metastore) .with_pipeline_uid(pipeline_uid) .build(); let source = source_loader.load_source(source_runtime).await?; @@ -1428,6 +1448,72 @@ mod pulsar_broker_tests { } } + /// Exercises the failover position resync against the real test metastore: a + /// distributed source whose partition was already advanced by "another + /// pipeline" (a committed checkpoint seeded in the metastore) must resume from + /// that committed position and not re-index the already-committed messages. + #[tokio::test] + async fn test_distributed_source_resumes_from_committed_checkpoint() { + let universe = Universe::with_accelerated_time(); + let metastore = metastore_for_test(); + let topic = append_random_suffix("test-pulsar-source--distributed-resume--topic"); + let index_id = append_random_suffix("test-pulsar-source--distributed-resume--index"); + let (_source_id, mut source_config) = get_source_config([&topic]); + // Multiple pipelines: enables the failover resync path. + source_config.num_pipelines = NonZeroUsize::new(2).unwrap(); + + // First half is "already indexed and committed by another pipeline". + let committed = populate_topic([&topic], 0..5, message_generator) + .await + .unwrap(); + // Second half is what this pipeline must index. + let to_index = populate_topic([&topic], 5..10, message_generator) + .await + .unwrap(); + + let partition = PartitionId::from(topic.clone()); + let index_uid = setup_index( + metastore.clone(), + &index_id, + &source_config, + &[( + partition.clone(), + Position::Beginning, + committed[0].expected_position.clone(), + )], + ) + .await; + + let (source_handle, doc_processor_inbox) = create_source_with_pipeline_uid( + &universe, + metastore, + index_uid, + source_config, + SourceCheckpoint::default(), + PipelineUid::for_test(7u128), + ) + .await + .expect("Create source"); + + let exit_state = wait_for_completion( + source_handle, + to_index[0].len(), + partition, + to_index[0].expected_position.clone(), + ) + .await; + + let messages: Vec = doc_processor_inbox.drain_for_test_typed(); + let batch = merge_doc_batches(messages); + // Only the uncommitted second half is indexed; the committed first half is + // skipped because the source resumed from the metastore checkpoint. + assert_eq!(batch.docs, to_index[0].messages); + assert_eq!( + exit_state.get("num_messages_processed").unwrap().as_u64(), + Some(to_index[0].len() as u64) + ); + } + #[tokio::test] async fn test_partitioned_topic_multi_consumer_ingestion_with_failover() { // We test successive failures of one source and observe pulsar failover mechanism. From ab1cb8a6f470a08535a785e0284e12bde889ac90 Mon Sep 17 00:00:00 2001 From: miton18 Date: Tue, 30 Jun 2026 14:10:57 +0200 Subject: [PATCH 5/6] fix(pulsar): only suffix the consumer name for distributed sources A single-pipeline source kept its configured `consumer_name` before this feature; suffixing it with the pipeline UID unconditionally was a needless broker-visible change that breaks monitoring/automation keyed on the name. Only distributed sources (num_pipelines > 1) need per-pipeline uniqueness under the shared subscription, so single-pipeline sources now keep the configured name unchanged. Reverted the single-pipeline test assertions to the plain name and made the distributed-indexing test actually set num_pipelines = 2. --- .../src/source/pulsar_source.rs | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/quickwit/quickwit-indexing/src/source/pulsar_source.rs b/quickwit/quickwit-indexing/src/source/pulsar_source.rs index c73be14dd14..0a228d41de2 100644 --- a/quickwit/quickwit-indexing/src/source/pulsar_source.rs +++ b/quickwit/quickwit-indexing/src/source/pulsar_source.rs @@ -81,11 +81,11 @@ pub struct PulsarSource { source_params: PulsarSourceParams, pulsar_consumer: PulsarConsumer, subscription_name: String, - // Effective Pulsar consumer name, derived from the user-supplied - // `consumer_name` prefix and this pipeline's `pipeline_uid`. It must be - // unique per pipeline so that several pipelines of the same source connect - // as distinct consumers under the shared Failover subscription, letting the - // broker distribute partitions across them (distributed indexing). + // Effective Pulsar consumer name. For a distributed source it is the + // user-supplied `consumer_name` suffixed with this pipeline's `pipeline_uid`, + // so that several pipelines connect as distinct consumers under the shared + // Failover subscription and the broker distributes partitions across them. For + // a single-pipeline source it is the configured `consumer_name` unchanged. consumer_name: String, // Whether this source runs with more than one pipeline. Only then can a // partition be handed between pipelines, so only then do we resync positions. @@ -121,12 +121,17 @@ impl PulsarSource { ) -> anyhow::Result { let subscription_name = subscription_name(source_runtime.index_uid(), source_runtime.source_id()); - // The consumer name must be unique per pipeline: under the shared - // Failover subscription, each pipeline connects as a separate consumer - // and the broker assigns each topic partition to exactly one of them. - let consumer_name = - consumer_name(&source_params.consumer_name, source_runtime.pipeline_uid()); let distributed = source_runtime.source_config.num_pipelines.get() > 1; + // Only distributed sources need a per-pipeline consumer name: under the + // shared Failover subscription each pipeline connects as a separate + // consumer and the broker assigns each partition to exactly one of them. A + // single-pipeline source keeps the configured consumer name as-is, to stay + // backward compatible with monitoring/automation keyed on it. + let consumer_name = if distributed { + consumer_name(&source_params.consumer_name, source_runtime.pipeline_uid()) + } else { + source_params.consumer_name.clone() + }; info!( index_id=%source_runtime.index_id(), source_id=%source_runtime.source_id(), @@ -1086,7 +1091,7 @@ mod pulsar_broker_tests { "source_id": source_id, "topics": vec![topic], "subscription_name": subscription_name(&index_uid, &source_id), - "consumer_name": consumer_name(CLIENT_NAME, default_test_pipeline_uid()), + "consumer_name": CLIENT_NAME, "num_bytes_processed": num_bytes, "num_messages_processed": 10, "num_invalid_messages": 0, @@ -1153,7 +1158,7 @@ mod pulsar_broker_tests { "source_id": source_id, "topics": vec![topic1, topic2], "subscription_name": subscription_name(&index_uid, &source_id), - "consumer_name": consumer_name(CLIENT_NAME, default_test_pipeline_uid()), + "consumer_name": CLIENT_NAME, "num_bytes_processed": num_bytes, "num_messages_processed": 20, "num_invalid_messages": 0, @@ -1207,7 +1212,7 @@ mod pulsar_broker_tests { "source_id": source_id, "topics": vec![topic], "subscription_name": subscription_name(&index_uid, &source_id), - "consumer_name": consumer_name(CLIENT_NAME, default_test_pipeline_uid()), + "consumer_name": CLIENT_NAME, "num_bytes_processed": num_bytes, "num_messages_processed": 10, "num_invalid_messages": 0, @@ -1290,7 +1295,7 @@ mod pulsar_broker_tests { "source_id": source_id, "topics": vec![topic], "subscription_name": subscription_name(&index_uid, &source_id), - "consumer_name": consumer_name(CLIENT_NAME, default_test_pipeline_uid()), + "consumer_name": CLIENT_NAME, "num_bytes_processed": num_bytes, "num_messages_processed": 10, "num_invalid_messages": 0, @@ -1310,7 +1315,8 @@ mod pulsar_broker_tests { let metastore = metastore_for_test(); let topic = append_random_suffix("test-pulsar-source--distributed--topic"); let index_id = append_random_suffix("test-pulsar-source--distributed--index"); - let (_source_id, source_config) = get_source_config([&topic]); + let (_source_id, mut source_config) = get_source_config([&topic]); + source_config.num_pipelines = NonZeroUsize::new(2).unwrap(); let num_partitions = 4; let msgs_per_partition = 10; From 5c7267bf23e97b99106db0f319eb06ebc39334a2 Mon Sep 17 00:00:00 2001 From: miton18 Date: Tue, 30 Jun 2026 14:36:31 +0200 Subject: [PATCH 6/6] fix(pulsar): ack broker to committed position on failover resync If the pipeline that advanced the metastore checkpoint died before processing SuggestTruncate, it never cumulative-acked the broker, so the Pulsar subscription cursor stays behind and the broker replays the backlog to the new active pipeline. The resync moves current_positions past those replayed messages, which are then skipped; on a quiet partition no batch is published, so no ack is ever sent and the subscription backlog never clears. On resync we now cumulative-ack the broker up to the committed (durable) position so the cursor catches up immediately. We ack only up to the committed position; anything beyond it is genuinely unprocessed and must still be delivered. --- .../quickwit-indexing/src/source/pulsar_source.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/quickwit/quickwit-indexing/src/source/pulsar_source.rs b/quickwit/quickwit-indexing/src/source/pulsar_source.rs index 0a228d41de2..7904937ede9 100644 --- a/quickwit/quickwit-indexing/src/source/pulsar_source.rs +++ b/quickwit/quickwit-indexing/src/source/pulsar_source.rs @@ -287,6 +287,20 @@ impl PulsarSource { } let committed = self.source_runtime.fetch_checkpoint().await?; advance_position(&mut self.current_positions, partition, &committed); + // Catch the broker cursor up to the committed position. The pipeline that + // advanced the metastore checkpoint may have died before acking (it never + // processed `SuggestTruncate`), leaving a backlog the broker keeps + // replaying — messages we then skip, so on a quiet partition no batch is + // published and no ack is ever sent, keeping the subscription behind. We + // ack only up to the committed (durable) position; anything beyond it is + // genuinely unprocessed and must still be delivered. + if let Some(committed_position) = committed.position_for_partition(partition) + && let Some(msg_id) = msg_id_from_position(committed_position) + { + self.pulsar_consumer + .cumulative_ack_with_id(partition.0.as_ref(), msg_id) + .await?; + } Ok(()) } }