Skip to content

Commit

Permalink
perf: optimize ingestion speed
Browse files Browse the repository at this point in the history
  • Loading branch information
hengfeiyang committed May 10, 2024
1 parent d8b4325 commit efc8885
Show file tree
Hide file tree
Showing 23 changed files with 176 additions and 194 deletions.
3 changes: 1 addition & 2 deletions src/config/src/utils/record_batch_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ pub fn convert_json_to_record_batch(
let mut record_keys = HashSet::with_capacity(max_keys);
for record in data.iter() {
record_keys.clear();
let record = record.as_object().unwrap();
for (k, v) in record {
for (k, v) in record.as_object().unwrap() {
record_keys.insert(k);
let res = builders.get_mut(k);
// where the value is null, the key maybe not exists in the schema
Expand Down
2 changes: 1 addition & 1 deletion src/handler/http/request/search/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ async fn values_v1(
let key = format!("{org_id}/{stream_type}/{stream_name}");
let r = STREAM_SCHEMAS_LATEST.read().await;
let schema = if let Some(schema) = r.get(&key) {
schema.clone()
schema.schema().clone()
} else {
arrow_schema::Schema::empty()
};
Expand Down
2 changes: 1 addition & 1 deletion src/handler/http/request/status/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ async fn get_stream_schema_status() -> (usize, usize, usize) {
stream_num += 1;
stream_schema_num += 1;
mem_size += key.len();
mem_size += schema.size();
mem_size += schema.schema().size();
}
drop(r);
(stream_num, stream_schema_num, mem_size)
Expand Down
63 changes: 60 additions & 3 deletions src/infra/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashMap;

use chrono::Utc;
use config::{
meta::stream::{PartitionTimeLevel, StreamSettings, StreamType},
utils::json,
utils::{json, schema_ext::SchemaExt},
RwAHashMap, CONFIG,
};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
Expand All @@ -34,7 +36,8 @@ pub static STREAM_SCHEMAS: Lazy<RwAHashMap<String, Vec<(i64, Schema)>>> =
Lazy::new(Default::default);
pub static STREAM_SCHEMAS_COMPRESSED: Lazy<RwAHashMap<String, Vec<(i64, bytes::Bytes)>>> =
Lazy::new(Default::default);
pub static STREAM_SCHEMAS_LATEST: Lazy<RwAHashMap<String, Schema>> = Lazy::new(Default::default);
pub static STREAM_SCHEMAS_LATEST: Lazy<RwAHashMap<String, SchemaCache>> =
Lazy::new(Default::default);
pub static STREAM_SCHEMAS_FIELDS: Lazy<RwAHashMap<String, (i64, Vec<String>)>> =
Lazy::new(Default::default);
pub static STREAM_SETTINGS: Lazy<RwAHashMap<String, StreamSettings>> = Lazy::new(Default::default);
Expand All @@ -54,13 +57,31 @@ pub async fn get(org_id: &str, stream_name: &str, stream_type: StreamType) -> Re

let r = STREAM_SCHEMAS_LATEST.read().await;
if let Some(schema) = r.get(cache_key) {
return Ok(schema.clone());
return Ok(schema.schema.clone());
}
drop(r);
// if not found in cache, get from db
get_from_db(org_id, stream_name, stream_type).await
}

pub async fn get_cache(
org_id: &str,
stream_name: &str,
stream_type: StreamType,
) -> Result<SchemaCache> {
let key = mk_key(org_id, stream_type, stream_name);
let cache_key = key.strip_prefix("/schema/").unwrap();

let r = STREAM_SCHEMAS_LATEST.read().await;
if let Some(schema) = r.get(cache_key) {
return Ok(schema.clone());
}
drop(r);
// if not found in cache, get from db
let schema = get_from_db(org_id, stream_name, stream_type).await?;
Ok(SchemaCache::new(schema))
}

pub async fn get_from_db(
org_id: &str,
stream_name: &str,
Expand Down Expand Up @@ -617,6 +638,42 @@ pub fn get_merge_schema_changes(
}
}

#[derive(Clone, Debug)]
pub struct SchemaCache {
schema: Schema,
fields_map: HashMap<String, usize>,
hash_key: String,
}

impl SchemaCache {
pub fn new(schema: Schema) -> Self {
let hash_key = schema.hash_key();
let fields_map = schema
.fields()
.iter()
.enumerate()
.map(|(i, f)| (f.name().to_owned(), i))
.collect();
Self {
schema,
fields_map,
hash_key,
}
}

pub fn hash_key(&self) -> &str {
&self.hash_key
}

pub fn schema(&self) -> &Schema {
&self.schema
}

pub fn fields_map(&self) -> &HashMap<String, usize> {
&self.fields_map
}
}

pub fn is_widening_conversion(from: &DataType, to: &DataType) -> bool {
let allowed_type = match from {
DataType::Boolean => vec![DataType::Utf8],
Expand Down
3 changes: 1 addition & 2 deletions src/job/files/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use config::{
};
use datafusion::{arrow::json as arrow_json, datasource::MemTable, prelude::*};
use hashbrown::HashSet;
use infra::{cache, storage};
use infra::{cache, schema::SchemaCache, storage};
use ingester::WAL_PARQUET_METADATA;
use once_cell::sync::Lazy;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
Expand All @@ -51,7 +51,6 @@ use crate::{
job::files::idx::write_to_disk,
service::{
db,
schema::SchemaCache,
search::datafusion::{
exec::merge_parquet_files, string_to_array_v2_udf::STRING_TO_ARRAY_V2_UDF,
},
Expand Down
2 changes: 1 addition & 1 deletion src/service/compact/file_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub async fn run_merge(offset: i64) -> Result<(), anyhow::Error> {
offset = time_now.timestamp_micros();
let r = STREAM_SCHEMAS_LATEST.read().await;
for (key, val) in r.iter() {
if let Some(val) = val.metadata.get("created_at") {
if let Some(val) = val.schema().metadata.get("created_at") {
let time_min = val.parse().unwrap();
if time_min == 0 {
log::info!(
Expand Down
17 changes: 12 additions & 5 deletions src/service/db/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use hashbrown::{HashMap, HashSet};
use infra::{
cache,
schema::{
unwrap_stream_settings, STREAM_SCHEMAS, STREAM_SCHEMAS_COMPRESSED, STREAM_SCHEMAS_LATEST,
STREAM_SETTINGS,
unwrap_stream_settings, SchemaCache, STREAM_SCHEMAS, STREAM_SCHEMAS_COMPRESSED,
STREAM_SCHEMAS_LATEST, STREAM_SETTINGS,
},
};
#[cfg(feature = "enterprise")]
Expand Down Expand Up @@ -170,7 +170,7 @@ async fn list_stream_schemas(
stream_name,
stream_type,
schema: if fetch_schema {
val.clone()
val.schema().clone()
} else {
Schema::empty()
},
Expand Down Expand Up @@ -277,6 +277,7 @@ pub async fn watch() -> Result<(), anyhow::Error> {
let r = STREAM_SCHEMAS_LATEST.read().await;
let prev_schema_start_dt = if let Some(schema) = r.get(&item_key.to_owned()) {
schema
.schema()
.metadata()
.get("start_dt")
.unwrap_or(&"0".to_string())
Expand Down Expand Up @@ -337,7 +338,10 @@ pub async fn watch() -> Result<(), anyhow::Error> {
w.insert(item_key.to_string(), settings);
drop(w);
let mut w = STREAM_SCHEMAS_LATEST.write().await;
w.insert(item_key.to_string(), latest_schema.clone());
w.insert(
item_key.to_string(),
SchemaCache::new(latest_schema.clone()),
);
drop(w);
if CONFIG.common.schema_cache_compress_enabled {
let schema_versions = schema_versions
Expand Down Expand Up @@ -485,7 +489,10 @@ pub async fn cache() -> Result<(), anyhow::Error> {
w.insert(item_key.to_string(), settings);
drop(w);
let mut w = STREAM_SCHEMAS_LATEST.write().await;
w.insert(item_key.to_string(), latest_schema.clone());
w.insert(
item_key.to_string(),
SchemaCache::new(latest_schema.clone()),
);
drop(w);
if CONFIG.common.schema_cache_compress_enabled {
let schema_versions = schema_versions
Expand Down
7 changes: 5 additions & 2 deletions src/service/enrichment_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ use config::{
use futures::{StreamExt, TryStreamExt};
use infra::{
cache::stats,
schema::{STREAM_SCHEMAS, STREAM_SCHEMAS_COMPRESSED, STREAM_SCHEMAS_LATEST, STREAM_SETTINGS},
schema::{
SchemaCache, STREAM_SCHEMAS, STREAM_SCHEMAS_COMPRESSED, STREAM_SCHEMAS_LATEST,
STREAM_SETTINGS,
},
};

use crate::{
Expand All @@ -43,7 +46,7 @@ use crate::{
compact::retention,
db, format_stream_name,
ingestion::write_file,
schema::{check_for_schema, stream_schema_exists, SchemaCache},
schema::{check_for_schema, stream_schema_exists},
usage::report_request_usage_stats,
},
};
Expand Down
6 changes: 2 additions & 4 deletions src/service/logs/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use config::{
utils::{flatten, json, schema_ext::SchemaExt, time::parse_timestamp_micro_from_value},
BLOCKED_STREAMS, CONFIG, DISTINCT_FIELDS,
};
use infra::schema::unwrap_partition_time_level;
use infra::schema::{unwrap_partition_time_level, SchemaCache};

use super::{add_record, cast_to_schema_v1, StreamMeta};
use crate::{
Expand All @@ -49,9 +49,7 @@ use crate::{
db, format_stream_name,
ingestion::{evaluate_trigger, write_file, TriggerAlertData},
metadata::{distinct_values::DvItem, write, MetadataItem, MetadataType},
schema::{
get_invalid_schema_start_dt, get_upto_discard_error, stream_schema_exists, SchemaCache,
},
schema::{get_invalid_schema_start_dt, get_upto_discard_error, stream_schema_exists},
usage::report_request_usage_stats,
},
};
Expand Down
5 changes: 3 additions & 2 deletions src/service/logs/ingest.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 Zinc Labs Inc.
// Copyright 2024 Zinc Labs Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
Expand Down Expand Up @@ -28,6 +28,7 @@ use config::{
CONFIG, DISTINCT_FIELDS,
};
use flate2::read::GzDecoder;
use infra::schema::SchemaCache;
use vrl::compiler::runtime::Runtime;

use crate::{
Expand All @@ -45,7 +46,7 @@ use crate::{
ingestion::{check_ingestion_allowed, evaluate_trigger, write_file, TriggerAlertData},
logs::StreamMeta,
metadata::{distinct_values::DvItem, write, MetadataItem, MetadataType},
schema::{get_upto_discard_error, SchemaCache},
schema::get_upto_discard_error,
usage::report_request_usage_stats,
},
};
Expand Down
9 changes: 3 additions & 6 deletions src/service/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,12 @@ use config::{
},
CONFIG,
};
use infra::schema::unwrap_partition_time_level;
use infra::schema::{unwrap_partition_time_level, SchemaCache};

use super::{ingestion::TriggerAlertData, schema::get_invalid_schema_start_dt};
use crate::{
common::meta::{alerts::Alert, ingestion::RecordStatus, stream::SchemaRecords},
service::{
ingestion::get_wal_time_key,
schema::{check_for_schema, SchemaCache},
},
service::{ingestion::get_wal_time_key, schema::check_for_schema},
};

pub mod bulk;
Expand Down Expand Up @@ -467,7 +464,7 @@ pub fn refactor_map(
defined_schema_keys: &HashSet<String>,
) -> Map<String, Value> {
let mut new_map = Map::with_capacity(defined_schema_keys.len() + 2);
let mut non_schema_map = Vec::with_capacity(1024);
let mut non_schema_map = Vec::with_capacity(1024); // 1KB

let mut has_elements = false;
non_schema_map.write_all(b"{").unwrap();
Expand Down
5 changes: 3 additions & 2 deletions src/service/logs/multi.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 Zinc Labs Inc.
// Copyright 2024 Zinc Labs Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
Expand Down Expand Up @@ -27,6 +27,7 @@ use config::{
utils::{flatten, json, time::parse_timestamp_micro_from_value},
CONFIG, DISTINCT_FIELDS,
};
use infra::schema::SchemaCache;

use crate::{
common::meta::{
Expand All @@ -39,7 +40,7 @@ use crate::{
ingestion::{check_ingestion_allowed, evaluate_trigger, write_file, TriggerAlertData},
logs::StreamMeta,
metadata::{distinct_values::DvItem, write, MetadataItem, MetadataType},
schema::{get_upto_discard_error, SchemaCache},
schema::get_upto_discard_error,
usage::report_request_usage_stats,
},
};
Expand Down
5 changes: 3 additions & 2 deletions src/service/logs/otlp_grpc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 Zinc Labs Inc.
// Copyright 2024 Zinc Labs Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
Expand Down Expand Up @@ -26,6 +26,7 @@ use config::{
utils::{flatten, json, time::parse_timestamp_micro_from_value},
CONFIG, DISTINCT_FIELDS,
};
use infra::schema::SchemaCache;
use opentelemetry::trace::{SpanId, TraceId};
use opentelemetry_proto::tonic::collector::logs::v1::{
ExportLogsServiceRequest, ExportLogsServiceResponse,
Expand All @@ -48,7 +49,7 @@ use crate::{
write_file, TriggerAlertData,
},
metadata::{distinct_values::DvItem, write, MetadataItem, MetadataType},
schema::{get_upto_discard_error, stream_schema_exists, SchemaCache},
schema::{get_upto_discard_error, stream_schema_exists},
usage::report_request_usage_stats,
},
};
Expand Down
3 changes: 2 additions & 1 deletion src/service/logs/otlp_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use config::{
utils::{flatten, json},
CONFIG, DISTINCT_FIELDS,
};
use infra::schema::SchemaCache;
use opentelemetry::trace::{SpanId, TraceId};
use opentelemetry_proto::tonic::collector::logs::v1::{
ExportLogsPartialSuccess, ExportLogsServiceRequest, ExportLogsServiceResponse,
Expand All @@ -44,7 +45,7 @@ use crate::{
db, get_formatted_stream_name,
ingestion::{evaluate_trigger, get_val_for_attr, write_file, TriggerAlertData},
metadata::{distinct_values::DvItem, write, MetadataItem, MetadataType},
schema::{get_upto_discard_error, stream_schema_exists, SchemaCache},
schema::{get_upto_discard_error, stream_schema_exists},
usage::report_request_usage_stats,
},
};
Expand Down
5 changes: 3 additions & 2 deletions src/service/logs/syslog.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 Zinc Labs Inc.
// Copyright 2024 Zinc Labs Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
Expand All @@ -25,6 +25,7 @@ use config::{
utils::{flatten, json, time::parse_timestamp_micro_from_value},
CONFIG, DISTINCT_FIELDS,
};
use infra::schema::SchemaCache;
use syslog_loose::{Message, ProcId, Protocol};

use super::StreamMeta;
Expand All @@ -43,7 +44,7 @@ use crate::{
db, get_formatted_stream_name,
ingestion::{evaluate_trigger, write_file, TriggerAlertData},
metadata::{distinct_values::DvItem, write, MetadataItem, MetadataType},
schema::{get_upto_discard_error, SchemaCache},
schema::get_upto_discard_error,
},
};

Expand Down

0 comments on commit efc8885

Please sign in to comment.