diff --git a/docs/configuration/source-config.md b/docs/configuration/source-config.md index 30894d3faad..b67efbb2605 100644 --- a/docs/configuration/source-config.md +++ b/docs/configuration/source-config.md @@ -223,7 +223,7 @@ will be decided by the control plane. :::info -Note that distributing the indexing load of partitioned sources like Kafka is done by assigning the different partitions to different pipelines. As a result, it is important to ensure that the number of partitions is a multiple of `num_pipelines`. +Note that distributing the indexing load of partitioned sources like Kafka and Pulsar is done by assigning the different partitions to different pipelines. As a result, it is important to ensure that the number of partitions is a multiple of `num_pipelines`. For Pulsar specifically, the topic(s) must be partitioned: a non-partitioned Pulsar topic is served by a single active consumer, so extra pipelines stay idle and provide no parallelism. Also, assuming you are only indexing a single Kafka source in your Quickwit cluster, you should set the number of pipelines to a multiple of the number of indexers. Finally, if your indexing throughput is high, you should provision between 2 and 4 vCPUs per pipeline. diff --git a/docs/ingest-data/pulsar.md b/docs/ingest-data/pulsar.md index 4563beda98e..650180d02bf 100644 --- a/docs/ingest-data/pulsar.md +++ b/docs/ingest-data/pulsar.md @@ -158,6 +158,29 @@ As soon as the Pulsar source is created, Quickwit control plane will ask an inde INFO spawn_pipeline{index=stackoverflow gen=0}:pulsar-consumer{subscription_name="quickwit-stackoverflow-pulsar-source" params=PulsarSourceParams { topics: ["stackoverflow"], address: "pulsar://localhost:6650", consumer_name: "quickwit", authentication: None } current_positions={}}: quickwit_indexing::source::pulsar_source: Seeking to last checkpoint positions. positions={} ``` +### Distributed indexing + +To parallelize indexing across several pipelines (and indexers), set `num_pipelines` on the source config to the number of pipelines you want: + +```yaml +version: 0.8 +source_id: pulsar-source +source_type: pulsar +num_pipelines: 4 +params: + topics: + - stackoverflow + address: pulsar://localhost:6650 +``` + +The pipelines share a single Pulsar `Failover` subscription, and the broker assigns each topic partition to exactly one active pipeline — so each partition is indexed by a single pipeline, with no duplicates and no loss. When a pipeline dies, Pulsar promotes another and it resumes from the subscription cursor. + +:::info + +Distribution happens **per partition**, so the topic(s) must be partitioned. A non-partitioned topic is served by a single active consumer: extra pipelines stay idle as standbys and provide no parallelism. For balanced load, make the number of partitions a multiple of `num_pipelines`. + +::: + ## Create and populate a Pulsar topic We will use the Pulsar's default tenant/namespace `public/default`. To populate the topic, we will use a python script: diff --git a/quickwit/quickwit-config/src/source_config/mod.rs b/quickwit/quickwit-config/src/source_config/mod.rs index 81811e9f47c..b687dba35ea 100644 --- a/quickwit/quickwit-config/src/source_config/mod.rs +++ b/quickwit/quickwit-config/src/source_config/mod.rs @@ -566,7 +566,10 @@ pub struct PulsarSourceParams { pub address: String, #[schema(default = "quickwit")] #[serde(default = "default_consumer_name")] - /// The name to register with the pulsar source. + /// The consumer name prefix to register with the pulsar source. With + /// distributed indexing (`num_pipelines > 1`), each pipeline appends its + /// pipeline UID to this prefix so it connects as a distinct consumer under + /// the shared subscription. pub consumer_name: String, // Serde yaml has some specific behaviour when deserializing // enums (see https://github.com/dtolnay/serde-yaml/issues/342) @@ -955,15 +958,14 @@ mod tests { "source_type": "pulsar", "params": { "topics": ["my-topic"], - "address": "http://localhost:6650" + "address": "pulsar://localhost:6650" } } "#; - load_source_config_from_user_config(ConfigFormat::Json, content.as_bytes()) - .unwrap_err(); - // TODO: uncomment asserts once distributed indexing is activated for pulsar. - // assert_eq!(source_config.num_pipelines(), 3); - // assert_eq!(source_config.max_num_pipelines_per_indexer(), 3); + let source_config = + load_source_config_from_user_config(ConfigFormat::Json, content.as_bytes()) + .unwrap(); + assert_eq!(source_config.num_pipelines.get(), 3); } } @@ -986,6 +988,24 @@ mod tests { .unwrap(); assert_eq!(source_config.num_pipelines.get(), 3); } + { + let content = r#" + { + "version": "0.8", + "source_id": "hdfs-logs-pulsar-source", + "num_pipelines": 3, + "source_type": "pulsar", + "params": { + "topics": ["my-topic"], + "address": "pulsar://localhost:6650" + } + } + "#; + let source_config = + load_source_config_from_user_config(ConfigFormat::Json, content.as_bytes()) + .unwrap(); + assert_eq!(source_config.num_pipelines.get(), 3); + } } #[test] diff --git a/quickwit/quickwit-config/src/source_config/serialize.rs b/quickwit/quickwit-config/src/source_config/serialize.rs index 3f580ee8fa3..1abbc596906 100644 --- a/quickwit/quickwit-config/src/source_config/serialize.rs +++ b/quickwit/quickwit-config/src/source_config/serialize.rs @@ -126,11 +126,12 @@ impl SourceConfigForSerialization { match &self.source_params { SourceParams::PubSub(_) | SourceParams::Kafka(_) + | SourceParams::Pulsar(_) | SourceParams::File(FileSourceParams::Notifications(_)) => {} _ => { if self.num_pipelines > 1 { bail!( - "Quickwit currently supports multiple pipelines only for GCP PubSub or Kafka sources. open an issue https://github.com/quickwit-oss/quickwit/issues if you need the feature for other source types" + "Quickwit currently supports multiple pipelines only for GCP PubSub, Kafka or Pulsar sources. open an issue https://github.com/quickwit-oss/quickwit/issues if you need the feature for other source types" ); } } diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index 8b4db0844ad..2c7f36714f3 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -595,6 +595,7 @@ mod tests { source_config: SourceConfig, metastore_opt: Option, queues_dir_path_opt: Option, + pipeline_uid: PipelineUid, } impl SourceRuntimeBuilder { @@ -604,6 +605,7 @@ mod tests { source_config, metastore_opt: None, queues_dir_path_opt: None, + pipeline_uid: PipelineUid::for_test(0u128), } } @@ -622,7 +624,7 @@ mod tests { node_id: NodeId::from_str("test-node"), index_uid: self.index_uid, source_id: self.source_config.source_id.clone(), - pipeline_uid: PipelineUid::for_test(0u128), + pipeline_uid: self.pipeline_uid, }, metastore, ingester_pool: IngesterPool::default(), @@ -636,7 +638,11 @@ mod tests { #[cfg(all( test, - any(feature = "kafka-broker-tests", feature = "sqs-localstack-tests") + any( + feature = "kafka-broker-tests", + feature = "sqs-localstack-tests", + feature = "pulsar-broker-tests" + ) ))] pub fn with_metastore(mut self, metastore: MetastoreServiceClient) -> Self { self.metastore_opt = Some(metastore); @@ -656,6 +662,12 @@ mod tests { self } + #[cfg(all(test, feature = "pulsar-broker-tests"))] + pub fn with_pipeline_uid(mut self, pipeline_uid: PipelineUid) -> Self { + self.pipeline_uid = pipeline_uid; + self + } + fn setup_mock_metastore( &self, source_checkpoint_delta_opt: Option, diff --git a/quickwit/quickwit-indexing/src/source/pulsar_source.rs b/quickwit/quickwit-indexing/src/source/pulsar_source.rs index 679234ad2f5..7904937ede9 100644 --- a/quickwit/quickwit-indexing/src/source/pulsar_source.rs +++ b/quickwit/quickwit-indexing/src/source/pulsar_source.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::fmt; use std::time::{Duration, Instant}; @@ -30,7 +30,7 @@ use quickwit_actors::ActorExitStatus; use quickwit_config::{PulsarSourceAuth, PulsarSourceParams}; use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint}; use quickwit_proto::metastore::SourceType; -use quickwit_proto::types::{IndexUid, Position}; +use quickwit_proto::types::{IndexUid, PipelineUid, Position}; use serde_json::{Value as JsonValue, json}; use tokio::time; use tracing::{debug, info, warn}; @@ -42,6 +42,12 @@ use crate::source::{ type PulsarConsumer = Consumer; +/// Minimum time without receiving any message from a partition before we treat a +/// new message for it as a Failover (re)gain and re-read its committed position +/// from the metastore. Large enough that continuously-owned partitions never +/// resync, small enough to recover quickly after a realistic rebalance. +const POSITION_RESYNC_GAP: Duration = Duration::from_secs(30); + pub struct PulsarSourceFactory; #[async_trait] @@ -75,7 +81,25 @@ pub struct PulsarSource { source_params: PulsarSourceParams, pulsar_consumer: PulsarConsumer, subscription_name: String, + // Effective Pulsar consumer name. For a distributed source it is the + // user-supplied `consumer_name` suffixed with this pipeline's `pipeline_uid`, + // so that several pipelines connect as distinct consumers under the shared + // Failover subscription and the broker distributes partitions across them. For + // a single-pipeline source it is the configured `consumer_name` unchanged. + consumer_name: String, + // Whether this source runs with more than one pipeline. Only then can a + // partition be handed between pipelines, so only then do we resync positions. + distributed: bool, current_positions: BTreeMap, + // Last time a message was received for each partition. Used to detect a + // Failover (re)gain: with `num_pipelines > 1` a pipeline can lose a partition + // to another pipeline and later regain it. While it did not own the + // partition, the other pipeline may have advanced the committed checkpoint, + // leaving this pipeline's `current_positions` entry stale. When we start + // receiving a partition again after a gap (or for the first time), we re-read + // the committed position so the next delta is contiguous with the metastore + // checkpoint — otherwise the publish is rejected and the partition wedges. + last_message_at: HashMap, state: PulsarSourceState, } @@ -97,11 +121,23 @@ impl PulsarSource { ) -> anyhow::Result { let subscription_name = subscription_name(source_runtime.index_uid(), source_runtime.source_id()); + let distributed = source_runtime.source_config.num_pipelines.get() > 1; + // Only distributed sources need a per-pipeline consumer name: under the + // shared Failover subscription each pipeline connects as a separate + // consumer and the broker assigns each partition to exactly one of them. A + // single-pipeline source keeps the configured consumer name as-is, to stay + // backward compatible with monitoring/automation keyed on it. + let consumer_name = if distributed { + consumer_name(&source_params.consumer_name, source_runtime.pipeline_uid()) + } else { + source_params.consumer_name.clone() + }; info!( index_id=%source_runtime.index_id(), source_id=%source_runtime.source_id(), topics=?source_params.topics, subscription_name=%subscription_name, + consumer_name=%consumer_name, "Create Pulsar source." ); let pulsar = connect_pulsar(&source_params).await?; @@ -125,6 +161,7 @@ impl PulsarSource { } let pulsar_consumer = create_pulsar_consumer( subscription_name.clone(), + consumer_name.clone(), source_params.clone(), pulsar, current_positions.clone(), @@ -136,7 +173,10 @@ impl PulsarSource { source_params, pulsar_consumer, subscription_name, + consumer_name, + distributed, current_positions, + last_message_at: HashMap::new(), state: PulsarSourceState::default(), }) } @@ -206,6 +246,152 @@ impl PulsarSource { } Ok(()) } + + /// Re-reads the committed position for a partition from the metastore when we + /// (re)start consuming it, so the next recorded delta is contiguous with the + /// metastore checkpoint. + /// + /// Required for distributed indexing (`num_pipelines > 1`): a pipeline can + /// lose a partition to another pipeline under the Failover subscription and + /// later regain it. While it did not own the partition, the other pipeline may + /// have advanced the committed checkpoint, so the local `current_positions` + /// entry is stale. We resync the first time we see a partition and whenever it + /// reappears after a gap longer than [`POSITION_RESYNC_GAP`] (a proxy for a + /// handoff, since the Pulsar client does not surface partition reassignments). + /// Continuously-owned partitions never resync (the committed checkpoint only + /// trails the positions we already recorded), and single-pipeline sources skip + /// it entirely since they can never hand a partition over. + /// + /// This is a best-effort optimization to avoid churn, not the correctness + /// guarantee. The per-partition single-writer invariant is ultimately enforced + /// by the metastore: `SourceCheckpoint::try_apply_delta` rejects any + /// overlapping or non-contiguous delta, so a stale position or a brief overlap + /// between the old and new active Failover consumers during a rebalance results + /// in a rejected publish and a pipeline restart (which resumes from the + /// committed checkpoint), never in lost or duplicated data. This matches how + /// the Kafka source relies on the metastore across consumer-group rebalances. + /// The time-gap heuristic cannot detect every reassignment (the Pulsar client + /// surfaces none), but any case it misses is caught by that backstop. + async fn sync_partition_position(&mut self, partition: &PartitionId) -> anyhow::Result<()> { + if !self.distributed { + return Ok(()); + } + let now = Instant::now(); + let resync = match self.last_message_at.get(partition) { + Some(last_message_at) => now.duration_since(*last_message_at) > POSITION_RESYNC_GAP, + None => true, + }; + self.last_message_at.insert(partition.clone(), now); + if !resync { + return Ok(()); + } + let committed = self.source_runtime.fetch_checkpoint().await?; + advance_position(&mut self.current_positions, partition, &committed); + // Catch the broker cursor up to the committed position. The pipeline that + // advanced the metastore checkpoint may have died before acking (it never + // processed `SuggestTruncate`), leaving a backlog the broker keeps + // replaying — messages we then skip, so on a quiet partition no batch is + // published and no ack is ever sent, keeping the subscription behind. We + // ack only up to the committed (durable) position; anything beyond it is + // genuinely unprocessed and must still be delivered. + if let Some(committed_position) = committed.position_for_partition(partition) + && let Some(msg_id) = msg_id_from_position(committed_position) + { + self.pulsar_consumer + .cumulative_ack_with_id(partition.0.as_ref(), msg_id) + .await?; + } + Ok(()) + } +} + +/// Advances the local position of `partition` to the committed position when the +/// committed checkpoint is ahead. It never moves a position backwards, so +/// uncommitted in-flight progress made by this pipeline is preserved and already +/// consumed messages are not re-emitted. +fn advance_position( + current_positions: &mut BTreeMap, + partition: &PartitionId, + committed: &SourceCheckpoint, +) { + let Some(committed_position) = committed.position_for_partition(partition) else { + return; + }; + let local_position = current_positions + .entry(partition.clone()) + .or_insert(Position::Beginning); + if *local_position < *committed_position { + *local_position = committed_position.clone(); + } +} + +#[cfg(test)] +mod position_alignment_tests { + use quickwit_metastore::checkpoint::SourceCheckpointDelta; + + use super::*; + + fn checkpoint_with(partition: &str, offset: u64) -> SourceCheckpoint { + let mut delta = SourceCheckpointDelta::default(); + delta + .record_partition_delta( + PartitionId::from(partition), + Position::Beginning, + Position::offset(offset), + ) + .unwrap(); + let mut checkpoint = SourceCheckpoint::default(); + checkpoint.try_apply_delta(delta).unwrap(); + checkpoint + } + + #[test] + fn advance_position_adopts_committed_when_ahead() { + // Failover (re)gain: another pipeline advanced the partition to offset 42 + // while this pipeline held a stale position (10). + let partition = PartitionId::from("topic-partition-0"); + let committed = checkpoint_with("topic-partition-0", 42); + let mut current_positions = BTreeMap::new(); + current_positions.insert(partition.clone(), Position::offset(10u64)); + + advance_position(&mut current_positions, &partition, &committed); + + assert_eq!( + current_positions.get(&partition), + Some(&Position::offset(42u64)) + ); + } + + #[test] + fn advance_position_keeps_local_when_ahead_of_committed() { + // Local uncommitted progress (50) must not be moved back to the committed + // checkpoint (42), otherwise already-consumed messages would be re-emitted. + let partition = PartitionId::from("topic-partition-0"); + let committed = checkpoint_with("topic-partition-0", 42); + let mut current_positions = BTreeMap::new(); + current_positions.insert(partition.clone(), Position::offset(50u64)); + + advance_position(&mut current_positions, &partition, &committed); + + assert_eq!( + current_positions.get(&partition), + Some(&Position::offset(50u64)) + ); + } + + #[test] + fn advance_position_noop_when_partition_uncommitted() { + // The committed checkpoint has nothing for this partition: leave the local + // entry untouched (a missing entry means the next delta starts at the + // beginning, which is correct when nothing was ever committed). + let partition = PartitionId::from("topic-partition-1"); + let committed = checkpoint_with("topic-partition-0", 42); + let mut current_positions = BTreeMap::new(); + + advance_position(&mut current_positions, &partition, &committed); + + assert_eq!(current_positions.get(&partition), None); + } } #[async_trait] @@ -230,6 +416,12 @@ impl Source for PulsarSource { .ok_or_else(|| ActorExitStatus::from(anyhow!("consumer was dropped")))? .map_err(|e| ActorExitStatus::from(anyhow!("failed to get message from consumer: {:?}", e)))?; + // Align with the committed checkpoint the first time we see a + // partition, so a Failover handoff from another pipeline does not + // record a delta from a stale startup position. + let partition = PartitionId::from(message.topic.clone()); + self.sync_partition_position(&partition).await.map_err(ActorExitStatus::from)?; + self.process_message(message, &mut batch_builder).map_err(ActorExitStatus::from)?; if batch_builder.num_bytes >= BATCH_NUM_BYTES_LIMIT { @@ -283,7 +475,7 @@ impl Source for PulsarSource { "source_id": self.source_runtime.source_id(), "topics": self.source_params.topics, "subscription_name": self.subscription_name, - "consumer_name": self.source_params.consumer_name, + "consumer_name": self.consumer_name, "num_bytes_processed": self.state.num_bytes_processed, "num_messages_processed": self.state.num_messages_processed, "num_invalid_messages": self.state.num_invalid_messages, @@ -306,6 +498,7 @@ impl DeserializeMessage for PulsarMessage { /// Creates a new pulsar consumer async fn create_pulsar_consumer( subscription_name: String, + consumer_name: String, params: PulsarSourceParams, pulsar: Pulsar, current_positions: BTreeMap, @@ -313,8 +506,13 @@ async fn create_pulsar_consumer( let mut consumer: Consumer = pulsar .consumer() .with_topics(¶ms.topics) - .with_consumer_name(¶ms.consumer_name) + .with_consumer_name(&consumer_name) .with_subscription(subscription_name) + // `Failover` (not `Shared`) is required for correctness: it assigns each + // topic partition to exactly one active consumer, so a given partition's + // checkpoint deltas always come from a single pipeline. `Shared` would + // spread a partition's messages across pipelines, producing conflicting, + // non-monotonic deltas that the metastore rejects. .with_subscription_type(SubType::Failover) .build() .await?; @@ -324,6 +522,10 @@ async fn create_pulsar_consumer( .into_iter() .map(|id| id.to_string()) .collect::>(); + // Every pipeline seeks all partitions to the positions read from the shared + // metastore checkpoint. The targets are identical across pipelines, so these + // seeks are idempotent even though each pipeline is only the active consumer + // for a subset of partitions under the Failover subscription. info!(positions = ?current_positions, "seeking to last checkpoint positions"); for (_, position) in current_positions { let seek_to = msg_id_from_position(&position); @@ -437,6 +639,16 @@ fn subscription_name(index_uid: &IndexUid, source_id: &str) -> String { format!("quickwit-{index_uid}-{source_id}") } +/// Builds the effective Pulsar consumer name for a pipeline. +/// +/// `base` is the user-supplied `consumer_name` (a prefix); the `pipeline_uid` +/// suffix makes the name unique per pipeline. This uniqueness lets several +/// pipelines of the same source connect as distinct consumers under the shared +/// Failover subscription, so the broker can distribute partitions across them. +fn consumer_name(base: &str, pipeline_uid: PipelineUid) -> String { + format!("{base}-{pipeline_uid}") +} + #[cfg(all(test, feature = "pulsar-broker-tests"))] mod pulsar_broker_tests { use std::collections::HashSet; @@ -638,15 +850,42 @@ mod pulsar_broker_tests { ); } + /// Default pipeline UID used by single-pipeline tests. + fn default_test_pipeline_uid() -> PipelineUid { + PipelineUid::for_test(0u128) + } + async fn create_source( universe: &Universe, - _metastore: MetastoreServiceClient, + metastore: MetastoreServiceClient, + index_uid: IndexUid, + source_config: SourceConfig, + start_checkpoint: SourceCheckpoint, + ) -> anyhow::Result<(ActorHandle, Inbox)> { + create_source_with_pipeline_uid( + universe, + metastore, + index_uid, + source_config, + start_checkpoint, + default_test_pipeline_uid(), + ) + .await + } + + async fn create_source_with_pipeline_uid( + universe: &Universe, + metastore: MetastoreServiceClient, index_uid: IndexUid, source_config: SourceConfig, _start_checkpoint: SourceCheckpoint, + pipeline_uid: PipelineUid, ) -> anyhow::Result<(ActorHandle, Inbox)> { let source_loader = quickwit_supported_sources(); - let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build(); + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config) + .with_metastore(metastore) + .with_pipeline_uid(pipeline_uid) + .build(); let source = source_loader.load_source(source_runtime).await?; let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); let source_actor = SourceActor::new(source, doc_processor_mailbox); @@ -1079,6 +1318,222 @@ mod pulsar_broker_tests { assert_eq!(exit_state2, expected_state); } + /// Proves the distributed-indexing contract: several pipelines of the same + /// Pulsar source (distinct `pipeline_uid`s, shared Failover subscription) + /// split the partitions of a partitioned topic with no loss, no duplicates, + /// and produce checkpoint deltas that merge into a single consistent + /// metastore checkpoint. + #[tokio::test] + async fn test_partitioned_topic_distributed_indexing() { + let universe = Universe::with_accelerated_time(); + let metastore = metastore_for_test(); + let topic = append_random_suffix("test-pulsar-source--distributed--topic"); + let index_id = append_random_suffix("test-pulsar-source--distributed--index"); + let (_source_id, mut source_config) = get_source_config([&topic]); + source_config.num_pipelines = NonZeroUsize::new(2).unwrap(); + + let num_partitions = 4; + let msgs_per_partition = 10; + let total_messages = num_partitions * msgs_per_partition; + + create_partitioned_topic(&topic, num_partitions).await; + let index_uid = setup_index(metastore.clone(), &index_id, &source_config, &[]).await; + + let partition_topics: Vec = (0..num_partitions) + .map(|partition| format!("{topic}-partition-{partition}")) + .collect(); + + // Two pipelines with distinct pipeline UIDs: the realistic distributed + // setup where the control plane spawns several pipelines for one source. + let (source_handle1, doc_processor_inbox1) = create_source_with_pipeline_uid( + &universe, + metastore.clone(), + index_uid.clone(), + source_config.clone(), + SourceCheckpoint::default(), + PipelineUid::for_test(1u128), + ) + .await + .expect("Create source 1"); + let (source_handle2, doc_processor_inbox2) = create_source_with_pipeline_uid( + &universe, + metastore.clone(), + index_uid.clone(), + source_config.clone(), + SourceCheckpoint::default(), + PipelineUid::for_test(2u128), + ) + .await + .expect("Create source 2"); + + let expected_docs = populate_topic( + partition_topics.iter().map(|topic| topic.as_str()), + 0..msgs_per_partition, + message_generator, + ) + .await + .unwrap(); + + // Wait until the two pipelines together have processed every message. + loop { + let processed1 = source_handle1 + .observe() + .await + .state + .get("num_messages_processed") + .unwrap() + .as_u64() + .unwrap(); + let processed2 = source_handle2 + .observe() + .await + .state + .get("num_messages_processed") + .unwrap() + .as_u64() + .unwrap(); + if processed1 + processed2 >= total_messages as u64 { + break; + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + + source_handle1.quit().await; + source_handle2.quit().await; + + let batch1 = merge_doc_batches(doc_processor_inbox1.drain_for_test_typed()); + let batch2 = merge_doc_batches(doc_processor_inbox2.drain_for_test_typed()); + + // Each pipeline owns a non-empty, disjoint set of partitions. + let partitions1: HashSet = + batch1.checkpoint_delta.partitions().cloned().collect(); + let partitions2: HashSet = + batch2.checkpoint_delta.partitions().cloned().collect(); + assert!( + !partitions1.is_empty(), + "pipeline 1 should own at least one partition" + ); + assert!( + !partitions2.is_empty(), + "pipeline 2 should own at least one partition" + ); + assert!( + partitions1.is_disjoint(&partitions2), + "pipelines must not share partitions: {partitions1:?} vs {partitions2:?}" + ); + let owned_partitions: HashSet = + partitions1.union(&partitions2).cloned().collect(); + assert_eq!( + owned_partitions.len(), + num_partitions, + "every partition must be owned by exactly one pipeline" + ); + + // Every produced message is accounted for, with no loss and no + // duplicates across the pipelines. + let produced: usize = expected_docs + .iter() + .map(|topic_data| topic_data.len()) + .sum(); + assert_eq!(produced, total_messages, "test should produce all messages"); + let delta1 = batch1.checkpoint_delta.clone(); + let delta2 = batch2.checkpoint_delta.clone(); + let total_docs = batch1.docs.len() + batch2.docs.len(); + let batches = vec![batch1, batch2]; + assert_eq!(total_docs, total_messages, "no message should be dropped"); + assert_eq!( + count_unique_messages_in_batches(&batches), + total_messages, + "no message should be duplicated across pipelines" + ); + + // The two pipelines' deltas merge into a single checkpoint without + // conflict (the metastore-consistency proof) and every partition has + // advanced past the beginning. + let mut checkpoint = SourceCheckpoint::default(); + checkpoint + .try_apply_delta(delta1) + .expect("apply pipeline 1 delta"); + checkpoint + .try_apply_delta(delta2) + .expect("apply pipeline 2 delta"); + for partition_topic in &partition_topics { + let partition = PartitionId::from(partition_topic.as_str()); + let position = checkpoint.position_for_partition(&partition); + assert!( + matches!(position, Some(position) if *position != Position::Beginning), + "partition {partition_topic} must have an advanced checkpoint position, got \ + {position:?}" + ); + } + } + + /// Exercises the failover position resync against the real test metastore: a + /// distributed source whose partition was already advanced by "another + /// pipeline" (a committed checkpoint seeded in the metastore) must resume from + /// that committed position and not re-index the already-committed messages. + #[tokio::test] + async fn test_distributed_source_resumes_from_committed_checkpoint() { + let universe = Universe::with_accelerated_time(); + let metastore = metastore_for_test(); + let topic = append_random_suffix("test-pulsar-source--distributed-resume--topic"); + let index_id = append_random_suffix("test-pulsar-source--distributed-resume--index"); + let (_source_id, mut source_config) = get_source_config([&topic]); + // Multiple pipelines: enables the failover resync path. + source_config.num_pipelines = NonZeroUsize::new(2).unwrap(); + + // First half is "already indexed and committed by another pipeline". + let committed = populate_topic([&topic], 0..5, message_generator) + .await + .unwrap(); + // Second half is what this pipeline must index. + let to_index = populate_topic([&topic], 5..10, message_generator) + .await + .unwrap(); + + let partition = PartitionId::from(topic.clone()); + let index_uid = setup_index( + metastore.clone(), + &index_id, + &source_config, + &[( + partition.clone(), + Position::Beginning, + committed[0].expected_position.clone(), + )], + ) + .await; + + let (source_handle, doc_processor_inbox) = create_source_with_pipeline_uid( + &universe, + metastore, + index_uid, + source_config, + SourceCheckpoint::default(), + PipelineUid::for_test(7u128), + ) + .await + .expect("Create source"); + + let exit_state = wait_for_completion( + source_handle, + to_index[0].len(), + partition, + to_index[0].expected_position.clone(), + ) + .await; + + let messages: Vec = doc_processor_inbox.drain_for_test_typed(); + let batch = merge_doc_batches(messages); + // Only the uncommitted second half is indexed; the committed first half is + // skipped because the source resumed from the metastore checkpoint. + assert_eq!(batch.docs, to_index[0].messages); + assert_eq!( + exit_state.get("num_messages_processed").unwrap().as_u64(), + Some(to_index[0].len() as u64) + ); + } + #[tokio::test] async fn test_partitioned_topic_multi_consumer_ingestion_with_failover() { // We test successive failures of one source and observe pulsar failover mechanism. diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index bc4c8d9b105..1131411a152 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -1085,23 +1085,22 @@ mod tests { assert!(body.contains("invalid type: floating point `0.4`")); } { - // Invalid pulsar source config with number of pipelines > 1, not supported yet. + // Invalid kinesis source config with number of pipelines > 1, not supported yet. let resp = warp::test::request() .path("/indexes/my-index/sources") .method("POST") .json(&true) .body( - r#"{"version": "0.8", "source_id": "pulsar-source", - "num_pipelines": 2, "source_type": "pulsar", "params": {"topics": ["my-topic"], - "address": "pulsar://localhost:6650" }}"#, + r#"{"version": "0.8", "source_id": "kinesis-source", + "num_pipelines": 2, "source_type": "kinesis", "params": {"stream_name": "my-stream"}}"#, ) .reply(&index_management_handler) .await; assert_eq!(resp.status(), 400); let body = std::str::from_utf8(resp.body()).unwrap(); assert!(body.contains( - "Quickwit currently supports multiple pipelines only for GCP PubSub or Kafka \ - sources" + "Quickwit currently supports multiple pipelines only for GCP PubSub, Kafka or \ + Pulsar sources" )); } {