Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: only write defined schema fields to parquet #3454

Merged
merged 35 commits into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
f3ea817
debug: only write define schema fields to file
oasisk May 9, 2024
89a2cc9
clippy fix
oasisk May 9, 2024
ce361a3
test: schema only the defined fields
hengfeiyang May 10, 2024
450fd26
add a check
oasisk May 10, 2024
f1f0c32
removed print
oasisk May 10, 2024
f66d849
change cast to schema v1
oasisk May 10, 2024
c9678f5
changes
oasisk May 10, 2024
5c3e059
add if condition
oasisk May 10, 2024
c435ac6
perf: improve factor_map
hengfeiyang May 10, 2024
0c968f6
perf: optimize cast schema v1
hengfeiyang May 10, 2024
918af3f
debug: only write define schema fields to file
oasisk May 9, 2024
31cb10b
removed print
oasisk May 10, 2024
0d58c4b
perf: convert json to recordBatch
hengfeiyang May 10, 2024
d7fdb54
perf: optimize ingestion speed
hengfeiyang May 10, 2024
189ca55
debug: only write define schema fields to file
oasisk May 9, 2024
9835565
clippy fix
oasisk May 9, 2024
23e3059
test: schema only the defined fields
hengfeiyang May 10, 2024
c5f62c7
add a check
oasisk May 10, 2024
4e4d504
removed print
oasisk May 10, 2024
4861d9e
change cast to schema v1
oasisk May 10, 2024
bfcfda2
changes
oasisk May 10, 2024
08a4ffa
add if condition
oasisk May 10, 2024
f7b95a3
perf: improve factor_map
hengfeiyang May 10, 2024
6c9a5de
perf: optimize cast schema v1
hengfeiyang May 10, 2024
9fe8fec
debug: only write define schema fields to file
oasisk May 9, 2024
76774b3
removed print
oasisk May 10, 2024
ccfbc83
perf: convert json to recordBatch
hengfeiyang May 10, 2024
251c02a
perf: optimize ingestion speed
hengfeiyang May 10, 2024
3eff757
Merge branch 'debug-only-write-define-schema-fields-to-file' of https…
oasisk May 10, 2024
e3317f8
updating index changes
oasisk May 11, 2024
156931c
updated for inverted index
oasisk May 11, 2024
6a05761
Merge branch 'main' into debug-only-write-define-schema-fields-to-file
hengfeiyang May 11, 2024
f066520
perf: improve vertical partiton ingestion
hengfeiyang May 11, 2024
ec1de2b
feat: add memtable ttl check job
hengfeiyang May 11, 2024
74963a9
fix: code format
hengfeiyang May 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/config/src/utils/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ pub fn get_float_value(val: &Value) -> f64 {
match val {
Value::String(v) => v.parse::<f64>().unwrap_or(0.0),
Value::Number(v) => v.as_f64().unwrap_or(0.0),
Value::Bool(v) => {
if *v {
1.0
} else {
0.0
}
}
_ => 0.0,
}
}
Expand All @@ -29,6 +36,13 @@ pub fn get_int_value(val: &Value) -> i64 {
match val {
Value::String(v) => v.parse::<i64>().unwrap_or(0),
Value::Number(v) => v.as_i64().unwrap_or(0),
Value::Bool(v) => {
if *v {
1
} else {
0
}
}
_ => 0,
}
}
Expand All @@ -37,6 +51,13 @@ pub fn get_uint_value(val: &Value) -> u64 {
match val {
Value::String(v) => v.parse::<u64>().unwrap_or(0),
Value::Number(v) => v.as_u64().unwrap_or(0),
Value::Bool(v) => {
if *v {
1
} else {
0
}
}
_ => 0,
}
}
Expand All @@ -57,6 +78,15 @@ pub fn get_string_value(value: &Value) -> String {
}
}

pub fn pickup_string_value(val: Value) -> String {
match val {
Value::String(v) => v,
Value::Number(v) => v.to_string(),
Value::Bool(v) => v.to_string(),
_ => val.to_string(),
}
}

pub fn estimate_json_bytes(val: &Value) -> usize {
let mut size = 0;
match val {
Expand Down
17 changes: 12 additions & 5 deletions src/config/src/utils/record_batch_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,29 +47,36 @@ pub fn convert_json_to_record_batch(
data: &[Arc<serde_json::Value>],
) -> Result<RecordBatch, ArrowError> {
// collect all keys from the json data
let mut keys = HashSet::new();
let mut keys = HashSet::with_capacity(schema.fields().len());
let mut max_keys = 0;
for record in data.iter() {
for (k, _) in record.as_object().unwrap() {
let record = record.as_object().unwrap();
if record.len() > max_keys {
max_keys = record.len();
}
for (k, _) in record {
keys.insert(k);
}
}

// create builders for each key
let records_len = data.len();
let mut builders: FxIndexMap<&String, (&DataType, Box<dyn ArrayBuilder>)> = schema
.fields()
.iter()
.filter(|f| keys.contains(f.name()))
.map(|f| {
(
f.name(),
(f.data_type(), make_builder(f.data_type(), data.len())),
(f.data_type(), make_builder(f.data_type(), records_len)),
)
})
.collect();

// fill builders with data
let mut record_keys = HashSet::with_capacity(max_keys);
for record in data.iter() {
let mut record_keys = HashSet::new();
record_keys.clear();
for (k, v) in record.as_object().unwrap() {
record_keys.insert(k);
let res = builders.get_mut(k);
Expand Down Expand Up @@ -200,7 +207,7 @@ pub fn convert_json_to_record_batch(
if let Some((_, builder)) = builders.get_mut(field.name()) {
cols.push(builder.finish());
} else {
cols.push(new_null_array(field.data_type(), data.len()))
cols.push(new_null_array(field.data_type(), records_len))
}
}

Expand Down
28 changes: 13 additions & 15 deletions src/config/src/utils/schema_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,19 @@ impl SchemaExt for Schema {

// ensure schema is compatible
fn cloned_from(&self, schema: &Schema) -> Schema {
let mut schema_latest_map = HashMap::with_capacity(schema.fields().len());
for field in schema.fields() {
schema_latest_map.insert(field.name(), field.clone());
}
let mut fields = Vec::with_capacity(self.fields().len());
for field in self.fields() {
match schema_latest_map.get(field.name()) {
Some(f) => {
fields.push(f.clone());
}
None => {
fields.push(field.clone());
}
}
}
let schema_latest_map: HashMap<_, _> = schema
.fields()
.iter()
.map(|field| (field.name(), field))
.collect();
let fields = self
.fields()
.iter()
.map(|f| match schema_latest_map.get(f.name()) {
Some(f) => (*f).clone(),
None => f.clone(),
})
.collect::<Vec<_>>();
Schema::new(fields)
}

Expand Down
2 changes: 1 addition & 1 deletion src/handler/http/request/search/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ async fn values_v1(
let key = format!("{org_id}/{stream_type}/{stream_name}");
let r = STREAM_SCHEMAS_LATEST.read().await;
let schema = if let Some(schema) = r.get(&key) {
schema.clone()
schema.schema().clone()
} else {
arrow_schema::Schema::empty()
};
Expand Down
2 changes: 1 addition & 1 deletion src/handler/http/request/status/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ async fn get_stream_schema_status() -> (usize, usize, usize) {
stream_num += 1;
stream_schema_num += 1;
mem_size += key.len();
mem_size += schema.size();
mem_size += schema.schema().size();
}
drop(r);
(stream_num, stream_schema_num, mem_size)
Expand Down
63 changes: 60 additions & 3 deletions src/infra/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashMap;

use chrono::Utc;
use config::{
meta::stream::{PartitionTimeLevel, StreamSettings, StreamType},
utils::json,
utils::{json, schema_ext::SchemaExt},
RwAHashMap, CONFIG,
};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
Expand All @@ -34,7 +36,8 @@ pub static STREAM_SCHEMAS: Lazy<RwAHashMap<String, Vec<(i64, Schema)>>> =
Lazy::new(Default::default);
pub static STREAM_SCHEMAS_COMPRESSED: Lazy<RwAHashMap<String, Vec<(i64, bytes::Bytes)>>> =
Lazy::new(Default::default);
pub static STREAM_SCHEMAS_LATEST: Lazy<RwAHashMap<String, Schema>> = Lazy::new(Default::default);
pub static STREAM_SCHEMAS_LATEST: Lazy<RwAHashMap<String, SchemaCache>> =
Lazy::new(Default::default);
pub static STREAM_SCHEMAS_FIELDS: Lazy<RwAHashMap<String, (i64, Vec<String>)>> =
Lazy::new(Default::default);
pub static STREAM_SETTINGS: Lazy<RwAHashMap<String, StreamSettings>> = Lazy::new(Default::default);
Expand All @@ -54,13 +57,31 @@ pub async fn get(org_id: &str, stream_name: &str, stream_type: StreamType) -> Re

let r = STREAM_SCHEMAS_LATEST.read().await;
if let Some(schema) = r.get(cache_key) {
return Ok(schema.clone());
return Ok(schema.schema.clone());
}
drop(r);
// if not found in cache, get from db
get_from_db(org_id, stream_name, stream_type).await
}

pub async fn get_cache(
org_id: &str,
stream_name: &str,
stream_type: StreamType,
) -> Result<SchemaCache> {
let key = mk_key(org_id, stream_type, stream_name);
let cache_key = key.strip_prefix("/schema/").unwrap();

let r = STREAM_SCHEMAS_LATEST.read().await;
if let Some(schema) = r.get(cache_key) {
return Ok(schema.clone());
}
drop(r);
// if not found in cache, get from db
let schema = get_from_db(org_id, stream_name, stream_type).await?;
Ok(SchemaCache::new(schema))
}

pub async fn get_from_db(
org_id: &str,
stream_name: &str,
Expand Down Expand Up @@ -617,6 +638,42 @@ pub fn get_merge_schema_changes(
}
}

#[derive(Clone, Debug)]
pub struct SchemaCache {
schema: Schema,
fields_map: HashMap<String, usize>,
hash_key: String,
}

impl SchemaCache {
pub fn new(schema: Schema) -> Self {
let hash_key = schema.hash_key();
let fields_map = schema
.fields()
.iter()
.enumerate()
.map(|(i, f)| (f.name().to_owned(), i))
.collect();
Self {
schema,
fields_map,
hash_key,
}
}

pub fn hash_key(&self) -> &str {
&self.hash_key
}

pub fn schema(&self) -> &Schema {
&self.schema
}

pub fn fields_map(&self) -> &HashMap<String, usize> {
&self.fields_map
}
}

pub fn is_widening_conversion(from: &DataType, to: &DataType) -> bool {
let allowed_type = match from {
DataType::Boolean => vec![DataType::Utf8],
Expand Down
15 changes: 15 additions & 0 deletions src/ingester/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ pub struct Entry {
}

impl Entry {
pub fn new() -> Self {
Self {
stream: "".into(),
schema_key: "".into(),
partition_key: "".into(),
data: Vec::new(),
data_size: 0,
}
}
pub fn into_bytes(&mut self) -> Result<Vec<u8>> {
let mut buf = Vec::with_capacity(4096);
let stream = self.stream.as_bytes();
Expand Down Expand Up @@ -103,6 +112,12 @@ impl Entry {
}
}

impl Default for Entry {
fn default() -> Self {
Self::new()
}
}

pub struct RecordBatchEntry {
pub data: RecordBatch,
pub data_json_size: usize,
Expand Down
17 changes: 16 additions & 1 deletion src/ingester/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,28 @@ pub async fn init() -> errors::Result<()> {
loop {
// persist immutable data to disk
if let Err(e) = immutable::persist().await {
log::error!("persist error: {}", e);
log::error!("immutable persist error: {}", e);
}
// shrink metadata cache
WAL_PARQUET_METADATA.write().await.shrink_to_fit();
interval.tick().await;
}
});

// start a job to flush memtable to immutable
tokio::task::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
config::CONFIG.limit.max_file_retention_time,
));
interval.tick().await; // the first tick is immediate
loop {
// check memtable ttl
if let Err(e) = writer::check_ttl().await {
log::error!("memtable check ttl error: {}", e);
}
interval.tick().await;
}
});

Ok(())
}
38 changes: 31 additions & 7 deletions src/ingester/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,18 @@ pub async fn read_from_memtable(
Ok(batches)
}

pub async fn check_ttl() -> Result<()> {
for w in WRITERS.iter() {
let w = w.read().await;
for r in w.values() {
// check writer
r.write(Arc::new(Schema::empty()), Entry::default(), true)
.await?;
}
}
Ok(())
}

pub async fn flush_all() -> Result<()> {
for w in WRITERS.iter() {
let mut w = w.write().await;
Expand Down Expand Up @@ -156,11 +168,21 @@ impl Writer {
}
}

pub async fn write(&self, schema: Arc<Schema>, mut entry: Entry) -> Result<()> {
if entry.data.is_empty() {
// check_ttl is used to check if the memtable has expired
pub async fn write(
&self,
schema: Arc<Schema>,
mut entry: Entry,
check_ttl: bool,
) -> Result<()> {
if entry.data.is_empty() && !check_ttl {
return Ok(());
}
let entry_bytes = entry.into_bytes()?;
let entry_bytes = if !check_ttl {
entry.into_bytes()?
} else {
Vec::new()
};
let mut wal = self.wal.lock().await;
let mut mem = self.memtable.write().await;
if self.check_wal_threshold(wal.size(), entry_bytes.len())
Expand Down Expand Up @@ -211,11 +233,13 @@ impl Writer {
});
}

// write into wal
wal.write(&entry_bytes, false).context(WalSnafu)?;
if !check_ttl {
// write into wal
wal.write(&entry_bytes, false).context(WalSnafu)?;
// write into memtable
mem.write(schema, entry).await?;
}

// write into memtable
mem.write(schema, entry).await?;
Ok(())
}

Expand Down
3 changes: 1 addition & 2 deletions src/job/files/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use config::{
};
use datafusion::{arrow::json as arrow_json, datasource::MemTable, prelude::*};
use hashbrown::HashSet;
use infra::{cache, storage};
use infra::{cache, schema::SchemaCache, storage};
use ingester::WAL_PARQUET_METADATA;
use once_cell::sync::Lazy;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
Expand All @@ -51,7 +51,6 @@ use crate::{
job::files::idx::write_to_disk,
service::{
db,
schema::SchemaCache,
search::datafusion::{
exec::merge_parquet_files, string_to_array_v2_udf::STRING_TO_ARRAY_V2_UDF,
},
Expand Down
Loading
Loading