Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,7 @@ mod test {
time_range: RangeInclusive<i64>,
size: u64,
) -> SplitMetadata {
let mut split_metadata = SplitMetadata::for_test(split_id.to_string());
let mut split_metadata = SplitMetadata::for_test(split_id.into());
split_metadata.num_docs = num_docs;
split_metadata.time_range = Some(time_range);
split_metadata.footer_offsets = (size - 10)..size;
Expand Down
7 changes: 4 additions & 3 deletions quickwit/quickwit-cli/src/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,10 @@ impl SplitCliCommand {
let index_id = matches
.remove_one::<String>("index")
.expect("`index` should be a required arg.");
let split_id = matches
let split_id: SplitId = matches
.remove_one::<String>("split")
.expect("`split` should be a required arg.");
.expect("`split` should be a required arg.")
.into();
let client_args = ClientArgs::parse(&mut matches)?;
let verbose = matches.get_flag("verbose");

Expand Down Expand Up @@ -345,7 +346,7 @@ async fn describe_split_cli(args: DescribeSplitArgs) -> anyhow::Result<()> {
.await
.expect("Failed to fetch splits.")
.into_iter()
.find(|split| split.split_id() == args.split_id)
.find(|split| split.split_id() == args.split_id.as_str())
.with_context(|| {
format!(
"could not find split metadata in metastore {}",
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,10 @@ impl ToolCliCommand {
let index_id = matches
.remove_one::<String>("index")
.expect("`index` should be a required arg.");
let split_id = matches
let split_id: SplitId = matches
.remove_one::<String>("split")
.expect("`split` should be a required arg.");
.expect("`split` should be a required arg.")
.into();
let config_uri = matches
.remove_one::<String>("config")
.map(|uri_str| Uri::from_str(&uri_str))
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ async fn test_garbage_collect_index_cli() {
index_uid: Some(index_uid.clone()),
split_ids: splits_metadata
.into_iter()
.map(|split_metadata| split_metadata.split_id)
.map(|split_metadata| split_metadata.split_id.to_string())
.collect(),
})
.await
Expand Down
3 changes: 1 addition & 2 deletions quickwit/quickwit-doc-mapper/src/query_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::convert::Infallible;
use std::ops::Bound;
use std::sync::Arc;

use quickwit_proto::types::SplitId;
use quickwit_query::query_ast::{
BuildTantivyAstContext, FieldPresenceQuery, FullTextQuery, PhrasePrefixQuery, QueryAst,
QueryAstTransformer, QueryAstVisitor, RangeQuery, RegexQuery, TermSetQuery, WildcardQuery,
Expand Down Expand Up @@ -158,7 +157,7 @@ impl<'a, 'f> QueryAstVisitor<'a> for ExistsQueryFastFields<'f> {
pub(crate) fn build_query(
query_ast: QueryAst,
context: &BuildTantivyAstContext,
cache_context: Option<(Arc<dyn quickwit_query::query_ast::PredicateCache>, SplitId)>,
cache_context: Option<(Arc<dyn quickwit_query::query_ast::PredicateCache>, String)>,
) -> Result<(Box<dyn Query>, WarmupInfo), QueryParserError> {
let mut fast_fields: HashSet<FastFieldWarmupInfo> = HashSet::new();

Expand Down
18 changes: 9 additions & 9 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ pub async fn delete_splits_from_storage_and_metastore(
let splits_to_delete: Vec<SplitToDelete> = split_infos
.values()
.map(|info| SplitToDelete {
split_id: info.split_id.clone(),
split_id: info.split_id.to_string(),
path: info.file_name.clone(),
size_bytes: info.file_size_bytes.as_u64(),
})
Expand Down Expand Up @@ -508,7 +508,7 @@ pub async fn delete_splits_from_storage_and_metastore(
}

if !successes.is_empty() {
let split_ids: Vec<SplitId> = successes
let split_ids: Vec<String> = successes
.iter()
.map(|split_info| split_info.split_id.to_string())
.collect();
Expand Down Expand Up @@ -597,7 +597,7 @@ mod tests {

let split_id = "test-run-gc--split";
let split_metadata = SplitMetadata {
split_id: split_id.to_string(),
split_id: split_id.into(),
index_uid: index_uid.clone(),
..Default::default()
};
Expand Down Expand Up @@ -697,7 +697,7 @@ mod tests {

let split_id = "test-run-gc--split";
let split_metadata = SplitMetadata {
split_id: split_id.to_string(),
split_id: split_id.into(),
index_uid: index_uid.clone(),
..Default::default()
};
Expand Down Expand Up @@ -827,7 +827,7 @@ mod tests {

let split_id = "test-delete-splits-happy--split";
let split_metadata = SplitMetadata {
split_id: split_id.to_string(),
split_id: split_id.into(),
index_uid: IndexUid::new_with_random_ulid(index_id),
..Default::default()
};
Expand Down Expand Up @@ -932,13 +932,13 @@ mod tests {

let split_id_0 = "test-delete-splits-storage-error--split-0";
let split_metadata_0 = SplitMetadata {
split_id: split_id_0.to_string(),
split_id: split_id_0.into(),
index_uid: index_uid.clone(),
..Default::default()
};
let split_id_1 = "test-delete-splits-storage-error--split-1";
let split_metadata_1 = SplitMetadata {
split_id: split_id_1.to_string(),
split_id: split_id_1.into(),
index_uid: index_uid.clone(),
..Default::default()
};
Expand Down Expand Up @@ -1020,13 +1020,13 @@ mod tests {

let split_id_0 = "test-delete-splits-storage-error--split-0";
let split_metadata_0 = SplitMetadata {
split_id: split_id_0.to_string(),
split_id: split_id_0.into(),
index_uid: index_uid.clone(),
..Default::default()
};
let split_id_1 = "test-delete-splits-storage-error--split-1";
let split_metadata_1 = SplitMetadata {
split_id: split_id_1.to_string(),
split_id: split_id_1.into(),
index_uid: index_uid.clone(),
..Default::default()
};
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl IndexService {
let query = ListSplitsQuery::for_index(index_uid.clone())
.with_split_states([SplitState::Staged, SplitState::Published]);
let list_splits_request = ListSplitsRequest::try_from_list_splits_query(&query)?;
let split_ids: Vec<SplitId> = self
let split_ids = self
.metastore
.list_splits(list_splits_request)
.await?
Expand Down Expand Up @@ -477,7 +477,7 @@ impl IndexService {
.await?;
let split_ids: Vec<SplitId> = splits_metadata
.iter()
.map(|split| split.split_id.to_string())
.map(|split| split.split_id.clone())
.collect();
let mark_splits_for_deletion_request =
MarkSplitsForDeletionRequest::new(index_uid.clone(), split_ids.clone());
Expand Down Expand Up @@ -771,7 +771,7 @@ mod tests {

let split_id = "test-split";
let split_metadata = SplitMetadata {
split_id: split_id.to_string(),
split_id: split_id.into(),
index_uid: index_uid.clone(),
..Default::default()
};
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ impl Indexer {
return Ok(());
}
let num_splits = splits.len() as u64;
let split_ids = splits.iter().map(|split| split.split_id()).join(",");
let split_ids: String = splits.iter().map(|split| split.split_id()).join(",");
info!(
index=%self.indexer_state.pipeline_id.index_uid,
source=self.indexer_state.pipeline_id.source_id.as_str(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2019,7 +2019,7 @@ mod tests {
})
.return_once(|_request| {
let splits = vec![Split {
split_metadata: SplitMetadata::for_test("test-split".to_string()),
split_metadata: SplitMetadata::for_test("test-split".into()),
split_state: SplitState::Published,
update_timestamp: 0,
publish_timestamp: Some(0),
Expand Down
14 changes: 7 additions & 7 deletions quickwit/quickwit-indexing/src/actors/log_publisher_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ impl Handler<SplitsUpdate> for Publisher {
let index_checkpoint_delta_json_opt = serialize_checkpoint_delta(&checkpoint_delta_opt)?;
let split_ids: Vec<String> = new_splits
.iter()
.map(|split| split.split_id.clone())
.map(|split| split.split_id.to_string())
.collect();
if let Some(_guard) = publish_lock.acquire().await {
let publish_splits_request = PublishSplitsRequest {
index_uid: Some(index_uid),
staged_split_ids: split_ids.clone(),
replaced_split_ids: replaced_split_ids.clone(),
replaced_split_ids: replaced_split_ids.iter().map(String::from).collect(),
index_checkpoint_delta_json_opt,
publish_token_opt: publish_token_opt.clone(),
};
Expand Down Expand Up @@ -129,7 +129,7 @@ mod tests {
};
use quickwit_metastore::{PublishSplitsRequestExt, SplitMetadata};
use quickwit_proto::metastore::{EmptyResponse, MetastoreServiceClient, MockMetastoreService};
use quickwit_proto::types::{IndexUid, Position};
use quickwit_proto::types::{IndexUid, Position, SplitId};
use tracing::Span;

use super::PUBLISHER_NAME;
Expand Down Expand Up @@ -176,7 +176,7 @@ mod tests {
.send_message(SplitsUpdate {
index_uid: ref_index_uid.clone(),
new_splits: vec![SplitMetadata {
split_id: "split".to_string(),
split_id: "split".into(),
..Default::default()
}],
replaced_split_ids: Vec::new(),
Expand Down Expand Up @@ -320,10 +320,10 @@ mod tests {
.send_message(SplitsUpdate {
index_uid: ref_index_uid.clone(),
new_splits: vec![SplitMetadata {
split_id: "split3".to_string(),
split_id: "split3".into(),
..Default::default()
}],
replaced_split_ids: vec!["split1".to_string(), "split2".to_string()],
replaced_split_ids: vec![SplitId::from("split1"), SplitId::from("split2")],
checkpoint_delta_opt: None,
publish_lock: PublishLock::default(),
publish_token_opt: None,
Expand Down Expand Up @@ -365,7 +365,7 @@ mod tests {
publisher_mailbox
.send_message(SplitsUpdate {
index_uid: IndexUid::new_with_random_ulid("index"),
new_splits: vec![SplitMetadata::for_test("test-split".to_string())],
new_splits: vec![SplitMetadata::for_test("test-split".into())],
replaced_split_ids: Vec::new(),
checkpoint_delta_opt: None,
publish_lock,
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-indexing/src/actors/merge_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ pub fn merge_split_attrs(
let num_docs = sum_num_docs(splits);
let replaced_split_ids: Vec<SplitId> = splits
.iter()
.map(|split| split.split_id().to_string())
.map(|split| split.split_id().clone())
.collect();
let delete_opstamp = splits
.iter()
Expand Down Expand Up @@ -434,7 +434,7 @@ impl MergeExecutor {
);
let mark_splits_for_deletion_request = MarkSplitsForDeletionRequest::new(
split.index_uid.clone(),
vec![split.split_id.clone()],
[split.split_id.clone()],
);
self.metastore
.mark_splits_for_deletion(mark_splits_for_deletion_request)
Expand Down Expand Up @@ -594,7 +594,7 @@ mod tests {

use super::*;
use crate::merge_policy::{MergeOperation, MergeTask};
use crate::{TestSandbox, get_tantivy_directory_from_split_bundle, new_split_id};
use crate::{TestSandbox, get_tantivy_directory_from_split_bundle};

#[tokio::test]
async fn test_merge_executor() -> anyhow::Result<()> {
Expand Down Expand Up @@ -755,7 +755,7 @@ mod tests {

// We want to test a delete on a split with num_merge_ops > 0.
let mut new_split_metadata = splits[0].split_metadata.clone();
new_split_metadata.split_id = new_split_id();
new_split_metadata.split_id = SplitId::new();
new_split_metadata.num_merge_ops = 1;
let stage_splits_request =
StageSplitsRequest::try_from_split_metadata(index_uid.clone(), &new_split_metadata)
Expand Down
18 changes: 9 additions & 9 deletions quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use async_trait::async_trait;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
use quickwit_metastore::{SplitMaturity, SplitMetadata};
use quickwit_proto::indexing::MergePipelineId;
use quickwit_proto::types::DocMappingUid;
use quickwit_proto::types::{DocMappingUid, SplitId};
use tantivy::Inventory;
use time::OffsetDateTime;
use tracing::{info, warn};
Expand Down Expand Up @@ -70,7 +70,7 @@ pub struct MergePlanner {
///
/// We incrementally build this set, by adding new splits to it.
/// When it becomes too large, we entirely rebuild it.
known_split_ids: HashSet<String>,
known_split_ids: HashSet<SplitId>,
known_split_ids_recompute_attempt_id: usize,

merge_policy: Arc<dyn MergePolicy>,
Expand Down Expand Up @@ -207,19 +207,19 @@ impl MergePlanner {
merge_planner
}

fn rebuild_known_split_ids(&self) -> HashSet<String> {
let mut known_split_ids: HashSet<String> = HashSet::default();
fn rebuild_known_split_ids(&self) -> HashSet<SplitId> {
let mut known_split_ids: HashSet<SplitId> = HashSet::default();
// Add splits that in `partitioned_young_splits`.
for young_split_partition in self.partitioned_young_splits.values() {
for split in young_split_partition {
known_split_ids.insert(split.split_id().to_string());
known_split_ids.insert(split.split_id().clone());
}
}
let ongoing_merge_operations = self.ongoing_merge_operations_inventory.list();
// Add splits that are known as in merge.
for merge_op in ongoing_merge_operations {
for split in &merge_op.splits {
known_split_ids.insert(split.split_id().to_string());
known_split_ids.insert(split.split_id().clone());
}
}
if known_split_ids.len() * 2 >= self.known_split_ids.len() {
Expand All @@ -235,11 +235,11 @@ impl MergePlanner {

/// Updates `known_split_ids` and return true if the split was not
/// previously known and should be recorded.
fn acknownledge_split(&mut self, split_id: &str) -> bool {
fn acknownledge_split(&mut self, split_id: &SplitId) -> bool {
if self.known_split_ids.contains(split_id) {
return false;
}
self.known_split_ids.insert(split_id.to_string());
self.known_split_ids.insert(split_id.clone());
true
}

Expand Down Expand Up @@ -389,7 +389,7 @@ mod tests {
num_merge_ops: usize,
) -> SplitMetadata {
SplitMetadata {
split_id: split_id.to_string(),
split_id: split_id.into(),
index_uid: index_uid.clone(),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl MergeSplitDownloader {
for split in splits {
if ctx.kill_switch().is_dead() {
debug!(
split_id = split.split_id(),
split_id = %split.split_id(),
"Kill switch was activated. Cancelling download."
);
return Err(ActorExitStatus::Killed);
Expand All @@ -114,7 +114,7 @@ impl MergeSplitDownloader {
let _protect_guard = ctx.protect_zone();
let tantivy_dir = self
.split_store
.fetch_and_open_split(split.split_id(), download_directory, &io_controls)
.fetch_and_open_split(split.split_id().clone(), download_directory, &io_controls)
.await
.map_err(|error| {
let split_id = split.split_id();
Expand All @@ -133,17 +133,17 @@ mod tests {

use quickwit_actors::Universe;
use quickwit_common::split_file;
use quickwit_proto::types::SplitId;
use quickwit_storage::{PutPayload, RamStorageBuilder, SplitPayloadBuilder};

use super::*;
use crate::merge_policy::MergeOperation;
use crate::new_split_id;

#[tokio::test]
async fn test_merge_split_downloader() -> anyhow::Result<()> {
let scratch_directory = TempDirectory::for_test();
let splits_to_merge: Vec<SplitMetadata> = iter::repeat_with(|| {
let split_id = new_split_id();
let split_id = SplitId::new();
SplitMetadata {
split_id,
..Default::default()
Expand Down
Loading
Loading