Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/configuration/source-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ will be decided by the control plane.

:::info

Note that distributing the indexing load of partitioned sources like Kafka is done by assigning the different partitions to different pipelines. As a result, it is important to ensure that the number of partitions is a multiple of `num_pipelines`.
Note that distributing the indexing load of partitioned sources like Kafka and Pulsar is done by assigning the different partitions to different pipelines. As a result, it is important to ensure that the number of partitions is a multiple of `num_pipelines`. For Pulsar specifically, the topic(s) must be partitioned: a non-partitioned Pulsar topic is served by a single active consumer, so extra pipelines stay idle and provide no parallelism.

Also, assuming you are only indexing a single Kafka source in your Quickwit cluster, you should set the number of pipelines to a multiple of the number of indexers. Finally, if your indexing throughput is high, you should provision between 2 and 4 vCPUs per pipeline.

Expand Down
23 changes: 23 additions & 0 deletions docs/ingest-data/pulsar.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,29 @@ As soon as the Pulsar source is created, Quickwit control plane will ask an inde
INFO spawn_pipeline{index=stackoverflow gen=0}:pulsar-consumer{subscription_name="quickwit-stackoverflow-pulsar-source" params=PulsarSourceParams { topics: ["stackoverflow"], address: "pulsar://localhost:6650", consumer_name: "quickwit", authentication: None } current_positions={}}: quickwit_indexing::source::pulsar_source: Seeking to last checkpoint positions. positions={}
```

### Distributed indexing

To parallelize indexing across several pipelines (and indexers), set `num_pipelines` on the source config to the number of pipelines you want:

```yaml
version: 0.8
source_id: pulsar-source
source_type: pulsar
num_pipelines: 4
params:
topics:
- stackoverflow
address: pulsar://localhost:6650
```

The pipelines share a single Pulsar `Failover` subscription, and the broker assigns each topic partition to exactly one active pipeline — so each partition is indexed by a single pipeline, with no duplicates and no loss. When a pipeline dies, Pulsar promotes another and it resumes from the subscription cursor.

:::info

Distribution happens **per partition**, so the topic(s) must be partitioned. A non-partitioned topic is served by a single active consumer: extra pipelines stay idle as standbys and provide no parallelism. For balanced load, make the number of partitions a multiple of `num_pipelines`.

:::

## Create and populate a Pulsar topic

We will use the Pulsar's default tenant/namespace `public/default`. To populate the topic, we will use a python script:
Expand Down
34 changes: 27 additions & 7 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,10 @@ pub struct PulsarSourceParams {
pub address: String,
#[schema(default = "quickwit")]
#[serde(default = "default_consumer_name")]
/// The name to register with the pulsar source.
/// The consumer name prefix to register with the pulsar source. With
/// distributed indexing (`num_pipelines > 1`), each pipeline appends its
/// pipeline UID to this prefix so it connects as a distinct consumer under
/// the shared subscription.
pub consumer_name: String,
// Serde yaml has some specific behaviour when deserializing
// enums (see https://github.com/dtolnay/serde-yaml/issues/342)
Expand Down Expand Up @@ -955,15 +958,14 @@ mod tests {
"source_type": "pulsar",
"params": {
"topics": ["my-topic"],
"address": "http://localhost:6650"
"address": "pulsar://localhost:6650"
}
}
"#;
load_source_config_from_user_config(ConfigFormat::Json, content.as_bytes())
.unwrap_err();
// TODO: uncomment asserts once distributed indexing is activated for pulsar.
// assert_eq!(source_config.num_pipelines(), 3);
// assert_eq!(source_config.max_num_pipelines_per_indexer(), 3);
let source_config =
load_source_config_from_user_config(ConfigFormat::Json, content.as_bytes())
.unwrap();
assert_eq!(source_config.num_pipelines.get(), 3);
}
}

Expand All @@ -986,6 +988,24 @@ mod tests {
.unwrap();
assert_eq!(source_config.num_pipelines.get(), 3);
}
{
let content = r#"
{
"version": "0.8",
"source_id": "hdfs-logs-pulsar-source",
"num_pipelines": 3,
"source_type": "pulsar",
"params": {
"topics": ["my-topic"],
"address": "pulsar://localhost:6650"
}
}
"#;
let source_config =
load_source_config_from_user_config(ConfigFormat::Json, content.as_bytes())
.unwrap();
assert_eq!(source_config.num_pipelines.get(), 3);
}
}

#[test]
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/source_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,12 @@ impl SourceConfigForSerialization {
match &self.source_params {
SourceParams::PubSub(_)
| SourceParams::Kafka(_)
| SourceParams::Pulsar(_)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Update the REST bad-config expectation

Allowing Pulsar in this multi-pipeline allow-list changes the /indexes/{index}/sources path so the existing test_create_source_with_bad_config case in quickwit-serve/src/index_api/rest_handler.rs no longer fails config validation for num_pipelines: 2; it proceeds to the metastore lookup for the missing my-index instead, so that test still asserting a 400 with the old Kafka/PubSub-only error will fail. Please update or remove that bad-config case with this feature enabled.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 97b03e0. Updated test_create_source_with_bad_config to assert the multi-pipeline rejection using a kinesis source (still gated) plus the new error-message wording.

| SourceParams::File(FileSourceParams::Notifications(_)) => {}
_ => {
if self.num_pipelines > 1 {
bail!(
"Quickwit currently supports multiple pipelines only for GCP PubSub or Kafka sources. open an issue https://github.com/quickwit-oss/quickwit/issues if you need the feature for other source types"
"Quickwit currently supports multiple pipelines only for GCP PubSub, Kafka or Pulsar sources. open an issue https://github.com/quickwit-oss/quickwit/issues if you need the feature for other source types"
);
}
}
Expand Down
16 changes: 14 additions & 2 deletions quickwit/quickwit-indexing/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ mod tests {
source_config: SourceConfig,
metastore_opt: Option<MetastoreServiceClient>,
queues_dir_path_opt: Option<PathBuf>,
pipeline_uid: PipelineUid,
}

impl SourceRuntimeBuilder {
Expand All @@ -604,6 +605,7 @@ mod tests {
source_config,
metastore_opt: None,
queues_dir_path_opt: None,
pipeline_uid: PipelineUid::for_test(0u128),
}
}

Expand All @@ -622,7 +624,7 @@ mod tests {
node_id: NodeId::from_str("test-node"),
index_uid: self.index_uid,
source_id: self.source_config.source_id.clone(),
pipeline_uid: PipelineUid::for_test(0u128),
pipeline_uid: self.pipeline_uid,
},
metastore,
ingester_pool: IngesterPool::default(),
Expand All @@ -636,7 +638,11 @@ mod tests {

#[cfg(all(
test,
any(feature = "kafka-broker-tests", feature = "sqs-localstack-tests")
any(
feature = "kafka-broker-tests",
feature = "sqs-localstack-tests",
feature = "pulsar-broker-tests"
)
))]
pub fn with_metastore(mut self, metastore: MetastoreServiceClient) -> Self {
self.metastore_opt = Some(metastore);
Expand All @@ -656,6 +662,12 @@ mod tests {
self
}

#[cfg(all(test, feature = "pulsar-broker-tests"))]
pub fn with_pipeline_uid(mut self, pipeline_uid: PipelineUid) -> Self {
self.pipeline_uid = pipeline_uid;
self
}

fn setup_mock_metastore(
&self,
source_checkpoint_delta_opt: Option<SourceCheckpointDelta>,
Expand Down
Loading