Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow updating of doc mapper #4928

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ serde_with = { workspace = true }
serde_yaml = { workspace = true }
toml = { workspace = true }
tracing = { workspace = true }
ulid = { workspace = true }
utoipa = { workspace = true }
vrl = { workspace = true, optional = true }

Expand Down
7 changes: 7 additions & 0 deletions quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use quickwit_proto::types::IndexId;
use serde::{Deserialize, Serialize};
pub use serialize::load_index_config_from_user_config;
use tracing::warn;
use ulid::Ulid;

use crate::index_config::serialize::VersionedIndexConfig;
use crate::merge_policy_config::{MergePolicyConfig, StableLogMergePolicyConfig};
Expand Down Expand Up @@ -93,6 +94,10 @@ pub struct DocMapping {
/// Record document length
#[serde(default)]
pub document_length: bool,
/// Version of the doc mapper
#[serde(default)]
#[schema(value_type = String)]
pub version: Ulid,
}

#[derive(Clone, Debug, Serialize, Deserialize, utoipa::ToSchema)]
Expand Down Expand Up @@ -458,6 +463,7 @@ impl TestableForRegression for IndexConfig {
timestamp_field: Some("timestamp".to_string()),
tokenizers: vec![tokenizer],
document_length: false,
version: Ulid::nil(),
};
let retention_policy = Some(RetentionPolicy {
retention_period: "90 days".to_string(),
Expand Down Expand Up @@ -536,6 +542,7 @@ pub fn build_doc_mapper(
max_num_partitions: doc_mapping.max_num_partitions,
tokenizers: doc_mapping.tokenizers.clone(),
document_length: doc_mapping.document_length,
version: doc_mapping.version,
};
Ok(Arc::new(builder.try_build()?))
}
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-doc-mapper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tantivy = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
typetag = { workspace = true }
ulid = { workspace = true }
utoipa = { workspace = true }

quickwit-common = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use tantivy::schema::{
Field, FieldType, FieldValue, OwnedValue as TantivyValue, Schema, INDEXED, STORED,
};
use tantivy::TantivyDocument as Document;
use ulid::Ulid;

use super::field_mapping_entry::RAW_TOKENIZER_NAME;
use super::DefaultDocMapperBuilder;
Expand Down Expand Up @@ -82,6 +83,8 @@ pub struct DefaultDocMapper {
concatenate_dynamic_fields: Vec<Field>,
/// Schema generated by the store source and field mappings parameters.
schema: Schema,
/// Version of the doc mapper
version: Ulid,
/// List of field names used for tagging.
tag_field_names: BTreeSet<String>,
/// The partition key is a DSL used to route documents
Expand Down Expand Up @@ -186,6 +189,7 @@ impl TryFrom<DefaultDocMapperBuilder> for DefaultDocMapper {
};

let schema = schema_builder.build();
let version = builder.version;

let tokenizer_manager = create_default_quickwit_tokenizer_manager();
let mut custom_tokenizer_names = HashSet::new();
Expand Down Expand Up @@ -267,6 +271,7 @@ impl TryFrom<DefaultDocMapperBuilder> for DefaultDocMapper {
let required_fields = Vec::new();
Ok(DefaultDocMapper {
schema,
version,
index_field_presence: builder.index_field_presence,
source_field,
dynamic_field,
Expand Down Expand Up @@ -388,6 +393,7 @@ impl From<DefaultDocMapper> for DefaultDocMapperBuilder {
max_num_partitions: default_doc_mapper.max_num_partitions,
tokenizers: default_doc_mapper.tokenizer_entries,
document_length: false,
version: default_doc_mapper.version,
}
}
}
Expand Down Expand Up @@ -712,6 +718,10 @@ impl DocMapper for DefaultDocMapper {
self.schema.clone()
}

fn version(&self) -> Ulid {
self.version
}

fn timestamp_field_name(&self) -> Option<&str> {
self.timestamp_field_name.as_deref()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use std::num::NonZeroU32;

use serde::{Deserialize, Serialize};
use ulid::Ulid;

use super::tokenizer_entry::TokenizerEntry;
use super::FieldMappingEntry;
Expand All @@ -30,7 +31,7 @@ use crate::DefaultDocMapper;
/// to create a valid DocMapper.
///
/// It is also used to serialize/deserialize a DocMapper.
/// note that this is not the way is the DocMapping is deserialized
/// note that this is not the way the DocMapping is deserialized
/// from the configuration.
#[quickwit_macros::serde_multikey]
#[derive(Serialize, Deserialize, Clone)]
Expand Down Expand Up @@ -83,6 +84,9 @@ pub struct DefaultDocMapperBuilder {
/// Record document length
#[serde(default)]
pub document_length: bool,
/// Version of the doc mapper
#[serde(default)]
pub version: Ulid,
}

/// Defines how an unmapped field should be handled.
Expand Down
6 changes: 6 additions & 0 deletions quickwit/quickwit-doc-mapper/src/doc_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use serde_json::Value as JsonValue;
use tantivy::query::Query;
use tantivy::schema::{Field, FieldType, OwnedValue as Value, Schema};
use tantivy::{TantivyDocument as Document, Term};
use ulid::Ulid;

pub type Partition = u64;

Expand Down Expand Up @@ -99,6 +100,11 @@ pub trait DocMapper: Send + Sync + Debug + DynClone + 'static {
/// over time. The schema returned here represents the most up-to-date schema of the index.
fn schema(&self) -> Schema;

/// Returns the version of the doc mapper
///
/// Splits with the same doc mapper version should use the same schema
fn version(&self) -> Ulid;

/// Returns the query.
///
/// Considering schema evolution, splits within an index can have different schema
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ struct IndexerState {
publish_lock: PublishLock,
publish_token_opt: Option<PublishToken>,
schema: Schema,
doc_mapper_version: Ulid,
tokenizer_manager: TokenizerManager,
max_num_partitions: NonZeroU32,
index_settings: IndexSettings,
Expand Down Expand Up @@ -130,6 +131,7 @@ impl IndexerState {
self.pipeline_id.clone(),
partition_id,
last_delete_opstamp,
self.doc_mapper_version,
self.indexing_directory.clone(),
index_builder,
io_controls,
Expand Down Expand Up @@ -537,6 +539,7 @@ impl Indexer {
index_serializer_mailbox: Mailbox<IndexSerializer>,
) -> Self {
let schema = doc_mapper.schema();
let doc_mapper_version = doc_mapper.version();
let tokenizer_manager = doc_mapper.tokenizer_manager().clone();
let docstore_compression = Compressor::Zstd(ZstdCompressor {
compression_level: Some(indexing_settings.docstore_compression_level),
Expand Down Expand Up @@ -564,6 +567,7 @@ impl Indexer {
publish_lock: PublishLock::default(),
publish_token_opt: None,
schema,
doc_mapper_version,
tokenizer_manager: tokenizer_manager.tantivy_manager().clone(),
index_settings,
max_num_partitions: doc_mapper.max_num_partitions(),
Expand Down
20 changes: 16 additions & 4 deletions quickwit/quickwit-indexing/src/actors/merge_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ pub fn merge_split_attrs(
pipeline_id: MergePipelineId,
merge_split_id: SplitId,
splits: &[SplitMetadata],
) -> SplitAttrs {
) -> anyhow::Result<SplitAttrs> {
let partition_id = combine_partition_ids_aux(splits.iter().map(|split| split.partition_id));
let time_range: Option<RangeInclusive<DateTime>> = merge_time_range(splits);
let uncompressed_docs_size_in_bytes = sum_doc_sizes_in_bytes(splits);
Expand All @@ -250,7 +250,17 @@ pub fn merge_split_attrs(
.map(|split| split.delete_opstamp)
.min()
.unwrap_or(0);
SplitAttrs {
let doc_mapper_version = splits
.first()
.ok_or_else(|| anyhow::anyhow!("attempted to merge zero splits"))?
.doc_mapper_version;
if splits
.iter()
.any(|split| split.doc_mapper_version != doc_mapper_version)
{
anyhow::bail!("attempted to merge splits with different doc mapper version");
}
Ok(SplitAttrs {
node_id: pipeline_id.node_id.clone(),
index_uid: pipeline_id.index_uid.clone(),
source_id: pipeline_id.source_id.clone(),
Expand All @@ -262,7 +272,8 @@ pub fn merge_split_attrs(
uncompressed_docs_size_in_bytes,
delete_opstamp,
num_merge_ops: max_merge_ops(splits) + 1,
}
doc_mapper_version,
})
}

fn max_merge_ops(splits: &[SplitMetadata]) -> usize {
Expand Down Expand Up @@ -324,7 +335,7 @@ impl MergeExecutor {
)?;
ctx.record_progress();

let split_attrs = merge_split_attrs(self.pipeline_id.clone(), merge_split_id, &splits);
let split_attrs = merge_split_attrs(self.pipeline_id.clone(), merge_split_id, &splits)?;
Ok(IndexedSplit {
split_attrs,
index: merged_index,
Expand Down Expand Up @@ -444,6 +455,7 @@ impl MergeExecutor {
uncompressed_docs_size_in_bytes,
delete_opstamp: last_delete_opstamp,
num_merge_ops: split.num_merge_ops,
doc_mapper_version: split.doc_mapper_version,
},
index: merged_index,
split_scratch_directory: merge_scratch_directory,
Expand Down
Loading
Loading