Skip to content

Commit

Permalink
add missing aggregation part 2
Browse files Browse the repository at this point in the history
Add missing support for:
- Mixed types columns
- Key of type string on numerical fields

The special aggregation is slower than the integrated one in TermsAggregation and therefore not
chosen by default, although it can cover all use cases.
  • Loading branch information
PSeitz committed Aug 29, 2023
1 parent 73cb717 commit 76f304d
Show file tree
Hide file tree
Showing 9 changed files with 630 additions and 137 deletions.
9 changes: 8 additions & 1 deletion columnar/src/column/dictionary_encoded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ impl fmt::Debug for BytesColumn {
}

impl BytesColumn {
pub fn empty() -> BytesColumn {
BytesColumn {
dictionary: Arc::new(Dictionary::empty()),
term_ord_column: Column::build_empty_column(0),
}
}

/// Fills the given `output` buffer with the term associated to the ordinal `ord`.
///
/// Returns `false` if the term does not exist (e.g. `term_ord` is greater or equal to the
Expand Down Expand Up @@ -77,7 +84,7 @@ impl From<StrColumn> for BytesColumn {
}

impl StrColumn {
pub(crate) fn wrap(bytes_column: BytesColumn) -> StrColumn {
pub fn wrap(bytes_column: BytesColumn) -> StrColumn {
StrColumn(bytes_column)
}

Expand Down
9 changes: 6 additions & 3 deletions columnar/src/dynamic_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl DynamicColumnHandle {
}

/// Returns the `u64` fast field reader reader associated with `fields` of types
/// Str, u64, i64, f64, or datetime.
/// Str, u64, i64, f64, bool, or datetime.
///
/// If not, the fastfield reader will returns the u64-value associated with the original
/// FastValue.
Expand All @@ -258,9 +258,12 @@ impl DynamicColumnHandle {
let column: BytesColumn = crate::column::open_column_bytes(column_bytes)?;
Ok(Some(column.term_ord_column))
}
ColumnType::Bool => Ok(None),
ColumnType::IpAddr => Ok(None),
ColumnType::I64 | ColumnType::U64 | ColumnType::F64 | ColumnType::DateTime => {
ColumnType::Bool
| ColumnType::I64
| ColumnType::U64
| ColumnType::F64
| ColumnType::DateTime => {
let column = crate::column::open_column_u64::<u64>(column_bytes)?;
Ok(Some(column))
}
Expand Down
164 changes: 110 additions & 54 deletions src/aggregation/agg_req_with_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ pub struct AggregationWithAccessor {
pub(crate) sub_aggregation: AggregationsWithAccessor,
pub(crate) limits: ResourceLimitGuard,
pub(crate) column_block_accessor: ColumnBlockAccessor<u64>,
/// Used for missing term aggregation, which checks all columns for existence.
/// By convention the missing aggregation is chosen, when this property is set
/// (instead bein set in `agg`).
/// If this needs to used by other aggregations, we need to refactor this.
pub(crate) accessors: Vec<Column<u64>>,
pub(crate) agg: Aggregation,
}

Expand All @@ -54,38 +59,68 @@ impl AggregationWithAccessor {
reader: &SegmentReader,
limits: AggregationLimits,
) -> crate::Result<Vec<AggregationWithAccessor>> {
let mut missing_value_term_agg = None;
let mut str_dict_column = None;
let get_agg_with_accessor = |aggs: &mut Vec<AggregationWithAccessor>,
accessor,
column_type,
str_dict_column: Option<StrColumn>,
missing_value_term_agg: Option<Key>|
-> crate::Result<()> {
let missing_value_for_accessor = if let Some(missing) = missing_value_term_agg.as_ref()
{
get_missing_val(column_type, missing, agg.agg.get_fast_field_name())?
} else {
None
};

let res = AggregationWithAccessor {
missing_value_for_accessor,
accessor,
accessors: Vec::new(),
field_type: column_type,
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
sub_aggregation,
reader,
&limits,
)?,
agg: agg.clone(),
str_dict_column: str_dict_column.clone(),
limits: limits.new_guard(),
column_block_accessor: Default::default(),
};
aggs.push(res);
Ok(())
};

let mut res: Vec<AggregationWithAccessor> = Vec::new();
use AggregationVariants::*;
let acc_field_types: Vec<(Column, ColumnType)> = match &agg.agg {
match &agg.agg {
Range(RangeAggregation {
field: field_name, ..
}) => vec![get_ff_reader(
reader,
field_name,
Some(get_numeric_or_date_column_types()),
)?],
}) => {
let (accessor, column_type) =
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
get_agg_with_accessor(&mut res, accessor, column_type, None, None)?;
}
Histogram(HistogramAggregation {
field: field_name, ..
}) => vec![get_ff_reader(
reader,
field_name,
Some(get_numeric_or_date_column_types()),
)?],
}) => {
let (accessor, column_type) =
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
get_agg_with_accessor(&mut res, accessor, column_type, None, None)?;
}
DateHistogram(DateHistogramAggregationReq {
field: field_name, ..
}) => vec![get_ff_reader(
reader,
field_name,
Some(get_numeric_or_date_column_types()),
)?],
}) => {
let (accessor, column_type) =
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
get_agg_with_accessor(&mut res, accessor, column_type, None, None)?;
}
Terms(TermsAggregation {
field: field_name,
missing,
..
}) => {
missing_value_term_agg = missing.clone();
str_dict_column = reader.fast_fields().str(field_name)?;
let str_dict_column = reader.fast_fields().str(field_name)?;
let allowed_column_types = [
ColumnType::I64,
ColumnType::U64,
Expand All @@ -105,12 +140,60 @@ impl AggregationWithAccessor {
Key::F64(_) => ColumnType::F64,
})
.unwrap_or(ColumnType::U64);
get_all_ff_reader_or_empty(
let column_and_types = get_all_ff_reader_or_empty(
reader,
field_name,
Some(&allowed_column_types),
fallback_type,
)?
)?;
let missing_and_more_than_one_col = column_and_types.len() > 1 && missing.is_some();
let text_on_non_text_col = column_and_types.len() == 1
&& column_and_types[0].1.numerical_type().is_some()
&& missing
.as_ref()
.map(|m| matches!(m, Key::Str(_)))
.unwrap_or(false);

let use_special_missing_agg = missing_and_more_than_one_col || text_on_non_text_col;
if use_special_missing_agg {
let column_and_types =
get_all_ff_reader_or_empty(reader, field_name, None, fallback_type)?;

let accessors: Vec<Column> =
column_and_types.iter().map(|(a, _)| a.clone()).collect();
let agg_wit_acc = AggregationWithAccessor {
missing_value_for_accessor: None,
accessor: accessors[0].clone(),
accessors,
field_type: ColumnType::U64,
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
sub_aggregation,
reader,
&limits,
)?,
agg: agg.clone(),
str_dict_column: str_dict_column.clone(),
limits: limits.new_guard(),
column_block_accessor: Default::default(),
};
res.push(agg_wit_acc);
}

for (accessor, column_type) in column_and_types {
let missing_value_term_agg = if use_special_missing_agg {
None
} else {
missing.clone()
};

get_agg_with_accessor(
&mut res,
accessor,
column_type,
str_dict_column.clone(),
missing_value_term_agg,
)?;
}
}
Average(AverageAggregation {
field: field_name, ..
Expand All @@ -130,48 +213,21 @@ impl AggregationWithAccessor {
| Sum(SumAggregation {
field: field_name, ..
}) => {
let (accessor, field_type) =
let (accessor, column_type) =
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;

vec![(accessor, field_type)]
get_agg_with_accessor(&mut res, accessor, column_type, None, None)?;
}
Percentiles(percentiles) => {
let (accessor, field_type) = get_ff_reader(
let (accessor, column_type) = get_ff_reader(
reader,
percentiles.field_name(),
Some(get_numeric_or_date_column_types()),
)?;
vec![(accessor, field_type)]
get_agg_with_accessor(&mut res, accessor, column_type, None, None)?;
}
};

let aggs: Vec<AggregationWithAccessor> = acc_field_types
.into_iter()
.map(|(accessor, column_type)| {
let missing_value_for_accessor =
if let Some(missing) = missing_value_term_agg.as_ref() {
get_missing_val(column_type, missing, agg.agg.get_fast_field_name())?
} else {
None
};

Ok(AggregationWithAccessor {
missing_value_for_accessor,
accessor,
field_type: column_type,
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
sub_aggregation,
reader,
&limits,
)?,
agg: agg.clone(),
str_dict_column: str_dict_column.clone(),
limits: limits.new_guard(),
column_block_accessor: Default::default(),
})
})
.collect::<crate::Result<_>>()?;
Ok(aggs)
Ok(res)
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/aggregation/bucket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
mod histogram;
mod range;
mod term_agg;
mod term_missing_agg;

use std::collections::HashMap;

Expand All @@ -34,6 +35,7 @@ pub(crate) use range::SegmentRangeCollector;
pub use range::*;
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
pub use term_agg::*;
pub use term_missing_agg::*;

/// Order for buckets in a bucket aggregation.
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize, Default)]
Expand Down
71 changes: 3 additions & 68 deletions src/aggregation/bucket/term_agg.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::Debug;

use columnar::ColumnType;
use columnar::{BytesColumn, ColumnType, StrColumn};
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -470,8 +470,8 @@ impl SegmentTermCollector {
let term_dict = agg_with_accessor
.str_dict_column
.as_ref()
.expect("internal error: term dictionary not found for term aggregation");

.cloned()
.unwrap_or_else(|| StrColumn::wrap(BytesColumn::empty()));
let mut buffer = String::new();
for (term_id, doc_count) in entries {
let intermediate_entry = into_intermediate_bucket_entry(term_id, doc_count)?;
Expand Down Expand Up @@ -1811,69 +1811,4 @@ mod tests {

Ok(())
}

#[test]
#[ignore]
// TODO: This is not yet implemented
fn terms_aggregation_missing_mixed_type() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let json = schema_builder.add_json_field("json", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests().unwrap();
// => Segment with all values numeric
index_writer
.add_document(doc!(json => json!({"mixed_type": 10.0})))
.unwrap();
index_writer.add_document(doc!())?;
index_writer.commit().unwrap();
//// => Segment with all values text
index_writer
.add_document(doc!(json => json!({"mixed_type": "blue"})))
.unwrap();
index_writer.add_document(doc!())?;
index_writer.commit().unwrap();

// => Segment with mixed values
index_writer
.add_document(doc!(json => json!({"mixed_type": "red"})))
.unwrap();
index_writer
.add_document(doc!(json => json!({"mixed_type": -20.5})))
.unwrap();
index_writer
.add_document(doc!(json => json!({"mixed_type": true})))
.unwrap();
index_writer.add_document(doc!())?;

index_writer.commit().unwrap();

let agg_req: Aggregations = serde_json::from_value(json!({
"replace_null": {
"terms": {
"field": "json.mixed_type",
"missing": "NULL"
},
},
"replace_num": {
"terms": {
"field": "json.mixed_type",
"missing": 1337
},
},
}))
.unwrap();

let res = exec_request_with_query(agg_req, &index, None)?;

// text field
assert_eq!(res["replace_null"]["buckets"][0]["key"], "NULL");
assert_eq!(res["replace_null"]["buckets"][0]["doc_count"], 4); // WRONG should be 3
assert_eq!(res["replace_num"]["buckets"][0]["key"], 1337.0);
assert_eq!(res["replace_num"]["buckets"][0]["doc_count"], 5); // WRONG should be 3
assert_eq!(res["replace_null"]["sum_other_doc_count"], 0);
assert_eq!(res["replace_null"]["doc_count_error_upper_bound"], 0);

Ok(())
}
}
Loading

0 comments on commit 76f304d

Please sign in to comment.