From 4817b08bf49e8baa2a9ec8bfbd3fba2a0fd7e0bb Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 25 Dec 2024 15:55:30 +0800 Subject: [PATCH 01/20] add --- .../codec/src/decoder/protobuf/mod.rs | 26 ++++++++++++------- .../codec/src/decoder/protobuf/parser.rs | 24 ++++++++++++++--- src/connector/src/parser/config.rs | 1 + src/connector/src/parser/protobuf/parser.rs | 18 ++++++++++++- src/connector/src/parser/unified/mod.rs | 2 +- 5 files changed, 56 insertions(+), 15 deletions(-) diff --git a/src/connector/codec/src/decoder/protobuf/mod.rs b/src/connector/codec/src/decoder/protobuf/mod.rs index 7ad357fef50fb..58af64298570d 100644 --- a/src/connector/codec/src/decoder/protobuf/mod.rs +++ b/src/connector/codec/src/decoder/protobuf/mod.rs @@ -14,6 +14,7 @@ pub mod parser; use std::borrow::Cow; +use std::collections::HashSet; use std::sync::LazyLock; use parser::from_protobuf_value; @@ -24,13 +25,17 @@ use thiserror_ext::AsReport; use super::{uncategorized, Access, AccessResult}; -pub struct ProtobufAccess { +pub struct ProtobufAccess<'a> { message: DynamicMessage, + struct_as_jsonb: &'a HashSet, } -impl ProtobufAccess { - pub fn new(message: DynamicMessage) -> Self { - Self { message } +impl<'a> ProtobufAccess<'a> { + pub fn new(message: DynamicMessage, struct_as_jsonb: &'a HashSet) -> Self { + Self { + message, + struct_as_jsonb, + } } #[cfg(test)] @@ -39,7 +44,7 @@ impl ProtobufAccess { } } -impl Access for ProtobufAccess { +impl Access for ProtobufAccess<'_> { fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult> { debug_assert_eq!(1, path.len()); let field_desc = self @@ -56,12 +61,15 @@ impl Access for ProtobufAccess { })?; match self.message.get_field(&field_desc) { - Cow::Borrowed(value) => from_protobuf_value(&field_desc, value, type_expected), + Cow::Borrowed(value) => { + from_protobuf_value(&field_desc, value, type_expected, self.struct_as_jsonb) + } // `Owned` variant occurs only if there's no such field and the default value is returned. - Cow::Owned(value) => from_protobuf_value(&field_desc, &value, type_expected) - // enforce `Owned` variant to avoid returning a reference to a temporary value - .map(|d| d.to_owned_datum().into()), + Cow::Owned(value) => { + from_protobuf_value(&field_desc, &value, type_expected, self.struct_as_jsonb) + .map(|d| d.to_owned_datum().into()) + } } } } diff --git a/src/connector/codec/src/decoder/protobuf/parser.rs b/src/connector/codec/src/decoder/protobuf/parser.rs index 57a353fcf744b..fd5f81f80be07 100644 --- a/src/connector/codec/src/decoder/protobuf/parser.rs +++ b/src/connector/codec/src/decoder/protobuf/parser.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + use anyhow::Context; use itertools::Itertools; use prost_reflect::{Cardinality, FieldDescriptor, Kind, MessageDescriptor, ReflectMessage, Value}; @@ -106,6 +108,7 @@ pub fn from_protobuf_value<'a>( field_desc: &FieldDescriptor, value: &'a Value, type_expected: &DataType, + struct_as_jsonb: &'a HashSet, ) -> AccessResult> { let kind = field_desc.kind(); @@ -136,7 +139,8 @@ pub fn from_protobuf_value<'a>( ScalarImpl::Utf8(enum_symbol.name().into()) } Value::Message(dyn_msg) => { - if dyn_msg.descriptor().full_name() == "google.protobuf.Any" { + let msg_full_name = dyn_msg.descriptor().full_name().to_owned(); + if struct_as_jsonb.contains(&msg_full_name) { ScalarImpl::Jsonb(JsonbVal::from( serde_json::to_value(dyn_msg).map_err(AccessError::ProtobufAnyToJson)?, )) @@ -159,8 +163,13 @@ pub fn from_protobuf_value<'a>( }; let value = dyn_msg.get_field(&field_desc); rw_values.push( - from_protobuf_value(&field_desc, &value, expected_field_type)? - .to_owned_datum(), + from_protobuf_value( + &field_desc, + &value, + expected_field_type, + struct_as_jsonb, + )? + .to_owned_datum(), ); } ScalarImpl::Struct(StructValue::new(rw_values)) @@ -176,7 +185,12 @@ pub fn from_protobuf_value<'a>( }; let mut builder = element_type.create_array_builder(values.len()); for value in values { - builder.append(from_protobuf_value(field_desc, value, element_type)?); + builder.append(from_protobuf_value( + field_desc, + value, + element_type, + struct_as_jsonb, + )?); } ScalarImpl::List(ListValue::new(builder.finish())) } @@ -209,11 +223,13 @@ pub fn from_protobuf_value<'a>( &map_desc.map_entry_key_field(), &key.clone().into(), map_type.key(), + struct_as_jsonb, )?); value_builder.append(from_protobuf_value( &map_desc.map_entry_value_field(), value, map_type.value(), + struct_as_jsonb, )?); } let keys = key_builder.finish(); diff --git a/src/connector/src/parser/config.rs b/src/connector/src/parser/config.rs index 52355ae058635..8fef61f2de37f 100644 --- a/src/connector/src/parser/config.rs +++ b/src/connector/src/parser/config.rs @@ -322,6 +322,7 @@ pub struct ProtobufProperties { pub topic: String, pub key_message_name: Option, pub name_strategy: PbSchemaRegistryNameStrategy, + pub message_as_jsonb: Option, } #[derive(Debug, Default, Clone, Copy)] diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index a82bed6b69e17..a45a59f24e4b9 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + use anyhow::Context; use prost_reflect::{DescriptorPool, DynamicMessage, FileDescriptor, MessageDescriptor}; use risingwave_common::{bail, try_match_expand}; @@ -30,6 +32,7 @@ use crate::schema::SchemaLoader; pub struct ProtobufAccessBuilder { confluent_wire_type: bool, message_descriptor: MessageDescriptor, + struct_as_jsonb: HashSet, } impl AccessBuilder for ProtobufAccessBuilder { @@ -44,7 +47,10 @@ impl AccessBuilder for ProtobufAccessBuilder { let message = DynamicMessage::decode(self.message_descriptor.clone(), payload) .context("failed to parse message")?; - Ok(AccessImpl::Protobuf(ProtobufAccess::new(message))) + Ok(AccessImpl::Protobuf(ProtobufAccess::new( + message, + &self.struct_as_jsonb, + ))) } } @@ -53,11 +59,19 @@ impl ProtobufAccessBuilder { let ProtobufParserConfig { confluent_wire_type, message_descriptor, + struct_as_jsonb_str, } = config; + let mut struct_as_jsonb: HashSet = struct_as_jsonb_str + .split(',') + .map(|s| s.to_owned()) + .collect(); + struct_as_jsonb.insert("google.protobuf.Any".to_owned()); + Ok(Self { confluent_wire_type, message_descriptor, + struct_as_jsonb, }) } } @@ -66,6 +80,7 @@ impl ProtobufAccessBuilder { pub struct ProtobufParserConfig { confluent_wire_type: bool, pub(crate) message_descriptor: MessageDescriptor, + struct_as_jsonb_str: String, } impl ProtobufParserConfig { @@ -110,6 +125,7 @@ impl ProtobufParserConfig { Ok(Self { message_descriptor, confluent_wire_type: protobuf_config.use_schema_registry, + struct_as_jsonb_str: protobuf_config.message_as_jsonb.unwrap_or_default(), }) } diff --git a/src/connector/src/parser/unified/mod.rs b/src/connector/src/parser/unified/mod.rs index adf32df572307..6a7e1f6fd9371 100644 --- a/src/connector/src/parser/unified/mod.rs +++ b/src/connector/src/parser/unified/mod.rs @@ -35,7 +35,7 @@ pub mod util; pub enum AccessImpl<'a> { Avro(AvroAccess<'a>), Bytes(BytesAccess<'a>), - Protobuf(ProtobufAccess), + Protobuf(ProtobufAccess<'a>), Json(JsonAccess<'a>), MongoJson(MongoJsonAccess>), } From 725a7c821137e289161f58b8c47d793fb0c01ab1 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Sun, 29 Dec 2024 00:31:04 +0800 Subject: [PATCH 02/20] fix comments --- src/connector/codec/src/decoder/protobuf/mod.rs | 10 +++++----- src/connector/codec/src/decoder/protobuf/parser.rs | 13 ++++++------- .../codec/tests/integration_tests/protobuf.rs | 5 +++-- src/connector/src/parser/protobuf/parser.rs | 13 ++++++++----- 4 files changed, 22 insertions(+), 19 deletions(-) diff --git a/src/connector/codec/src/decoder/protobuf/mod.rs b/src/connector/codec/src/decoder/protobuf/mod.rs index 58af64298570d..824f8131d6bcd 100644 --- a/src/connector/codec/src/decoder/protobuf/mod.rs +++ b/src/connector/codec/src/decoder/protobuf/mod.rs @@ -27,14 +27,14 @@ use super::{uncategorized, Access, AccessResult}; pub struct ProtobufAccess<'a> { message: DynamicMessage, - struct_as_jsonb: &'a HashSet, + messages_as_jsonb: &'a HashSet, } impl<'a> ProtobufAccess<'a> { - pub fn new(message: DynamicMessage, struct_as_jsonb: &'a HashSet) -> Self { + pub fn new(message: DynamicMessage, messages_as_jsonb: &'a HashSet) -> Self { Self { message, - struct_as_jsonb, + messages_as_jsonb, } } @@ -62,12 +62,12 @@ impl Access for ProtobufAccess<'_> { match self.message.get_field(&field_desc) { Cow::Borrowed(value) => { - from_protobuf_value(&field_desc, value, type_expected, self.struct_as_jsonb) + from_protobuf_value(&field_desc, value, type_expected, self.messages_as_jsonb) } // `Owned` variant occurs only if there's no such field and the default value is returned. Cow::Owned(value) => { - from_protobuf_value(&field_desc, &value, type_expected, self.struct_as_jsonb) + from_protobuf_value(&field_desc, &value, type_expected, self.messages_as_jsonb) .map(|d| d.to_owned_datum().into()) } } diff --git a/src/connector/codec/src/decoder/protobuf/parser.rs b/src/connector/codec/src/decoder/protobuf/parser.rs index fd5f81f80be07..060b5b58cda9d 100644 --- a/src/connector/codec/src/decoder/protobuf/parser.rs +++ b/src/connector/codec/src/decoder/protobuf/parser.rs @@ -108,7 +108,7 @@ pub fn from_protobuf_value<'a>( field_desc: &FieldDescriptor, value: &'a Value, type_expected: &DataType, - struct_as_jsonb: &'a HashSet, + messages_as_jsonb: &'a HashSet, ) -> AccessResult> { let kind = field_desc.kind(); @@ -139,8 +139,7 @@ pub fn from_protobuf_value<'a>( ScalarImpl::Utf8(enum_symbol.name().into()) } Value::Message(dyn_msg) => { - let msg_full_name = dyn_msg.descriptor().full_name().to_owned(); - if struct_as_jsonb.contains(&msg_full_name) { + if messages_as_jsonb.contains(dyn_msg.descriptor().full_name()) { ScalarImpl::Jsonb(JsonbVal::from( serde_json::to_value(dyn_msg).map_err(AccessError::ProtobufAnyToJson)?, )) @@ -167,7 +166,7 @@ pub fn from_protobuf_value<'a>( &field_desc, &value, expected_field_type, - struct_as_jsonb, + messages_as_jsonb, )? .to_owned_datum(), ); @@ -189,7 +188,7 @@ pub fn from_protobuf_value<'a>( field_desc, value, element_type, - struct_as_jsonb, + messages_as_jsonb, )?); } ScalarImpl::List(ListValue::new(builder.finish())) @@ -223,13 +222,13 @@ pub fn from_protobuf_value<'a>( &map_desc.map_entry_key_field(), &key.clone().into(), map_type.key(), - struct_as_jsonb, + messages_as_jsonb, )?); value_builder.append(from_protobuf_value( &map_desc.map_entry_value_field(), value, map_type.value(), - struct_as_jsonb, + messages_as_jsonb, )?); } let keys = key_builder.finish(); diff --git a/src/connector/codec/tests/integration_tests/protobuf.rs b/src/connector/codec/tests/integration_tests/protobuf.rs index aad81a8e7320e..32b899b7fffa7 100644 --- a/src/connector/codec/tests/integration_tests/protobuf.rs +++ b/src/connector/codec/tests/integration_tests/protobuf.rs @@ -19,7 +19,7 @@ mod recursive; #[allow(clippy::all)] mod all_types; use std::collections::HashMap; - +use std::collections::HashSet; use anyhow::Context; use prost::Message; use prost_reflect::{DescriptorPool, DynamicMessage, MessageDescriptor}; @@ -58,8 +58,9 @@ fn check( )); let mut data_str = vec![]; + let messages_as_jsonb = HashSet::from(["google.protobuf.Any".to_owned()]); for data in pb_data { - let access = ProtobufAccess::new(DynamicMessage::decode(pb_schema.clone(), *data).unwrap()); + let access = ProtobufAccess::new(DynamicMessage::decode(pb_schema.clone(), *data).unwrap(), &messages_as_jsonb); let mut row = vec![]; for col in &rw_schema { let rw_data = access.access(&[&col.name], &col.data_type); diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index a45a59f24e4b9..c857c7b272762 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -32,7 +32,10 @@ use crate::schema::SchemaLoader; pub struct ProtobufAccessBuilder { confluent_wire_type: bool, message_descriptor: MessageDescriptor, - struct_as_jsonb: HashSet, + + // A HashSet containing protobuf message type full names (e.g. "google.protobuf.Any") + // that should be mapped to JSONB type when storing in RisingWave + messages_as_jsonb: HashSet, } impl AccessBuilder for ProtobufAccessBuilder { @@ -49,7 +52,7 @@ impl AccessBuilder for ProtobufAccessBuilder { Ok(AccessImpl::Protobuf(ProtobufAccess::new( message, - &self.struct_as_jsonb, + &self.messages_as_jsonb, ))) } } @@ -62,16 +65,16 @@ impl ProtobufAccessBuilder { struct_as_jsonb_str, } = config; - let mut struct_as_jsonb: HashSet = struct_as_jsonb_str + let mut messages_as_jsonb: HashSet = struct_as_jsonb_str .split(',') .map(|s| s.to_owned()) .collect(); - struct_as_jsonb.insert("google.protobuf.Any".to_owned()); + messages_as_jsonb.insert("google.protobuf.Any".to_owned()); Ok(Self { confluent_wire_type, message_descriptor, - struct_as_jsonb, + messages_as_jsonb, }) } } From bc44be4f4aa0e1b43b4b25f910f14bb558ab74e4 Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 30 Dec 2024 11:39:14 +0800 Subject: [PATCH 03/20] format --- src/connector/codec/tests/integration_tests/protobuf.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/connector/codec/tests/integration_tests/protobuf.rs b/src/connector/codec/tests/integration_tests/protobuf.rs index 32b899b7fffa7..fb16769925b34 100644 --- a/src/connector/codec/tests/integration_tests/protobuf.rs +++ b/src/connector/codec/tests/integration_tests/protobuf.rs @@ -18,8 +18,8 @@ mod recursive; #[rustfmt::skip] #[allow(clippy::all)] mod all_types; -use std::collections::HashMap; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; + use anyhow::Context; use prost::Message; use prost_reflect::{DescriptorPool, DynamicMessage, MessageDescriptor}; @@ -60,7 +60,10 @@ fn check( let mut data_str = vec![]; let messages_as_jsonb = HashSet::from(["google.protobuf.Any".to_owned()]); for data in pb_data { - let access = ProtobufAccess::new(DynamicMessage::decode(pb_schema.clone(), *data).unwrap(), &messages_as_jsonb); + let access = ProtobufAccess::new( + DynamicMessage::decode(pb_schema.clone(), *data).unwrap(), + &messages_as_jsonb, + ); let mut row = vec![]; for col in &rw_schema { let rw_data = access.access(&[&col.name], &col.data_type); From e1fe3dff80406042980a50f0bef077f1b994f327 Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 30 Dec 2024 12:09:19 +0800 Subject: [PATCH 04/20] load from format_encode_options --- src/connector/src/parser/config.rs | 14 +++++++++++++- src/connector/src/parser/protobuf/parser.rs | 12 +++++------- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/connector/src/parser/config.rs b/src/connector/src/parser/config.rs index 8fef61f2de37f..ba845b1a55f8e 100644 --- a/src/connector/src/parser/config.rs +++ b/src/connector/src/parser/config.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + use risingwave_common::bail; use risingwave_common::secret::LocalSecretManager; use risingwave_connector_codec::decoder::avro::MapHandling; @@ -21,6 +23,7 @@ use super::utils::get_kafka_topic; use super::{DebeziumProps, TimestamptzHandling}; use crate::connector_common::AwsAuthProps; use crate::error::ConnectorResult; +use crate::parser::PROTOBUF_MESSAGES_AS_JSONB; use crate::schema::schema_registry::SchemaRegistryAuth; use crate::schema::AWS_GLUE_SCHEMA_ARN_KEY; use crate::source::{extract_source_struct, SourceColumnDesc, SourceEncode, SourceFormat}; @@ -186,6 +189,14 @@ impl SpecificParserConfig { if info.row_schema_location.is_empty() { bail!("protobuf file location not provided"); } + let messages_as_jsonb = if let Some(messages_as_jsonb) = + format_encode_options_with_secret.get(PROTOBUF_MESSAGES_AS_JSONB) + { + messages_as_jsonb.split(',').map(|s| s.to_owned()).collect() + } else { + HashSet::new() + }; + let mut config = ProtobufProperties { message_name: info.proto_message_name.clone(), use_schema_registry: info.use_schema_registry, @@ -193,6 +204,7 @@ impl SpecificParserConfig { name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy) .unwrap(), key_message_name: info.key_message_name.clone(), + messages_as_jsonb, ..Default::default() }; if format == SourceFormat::Upsert { @@ -322,7 +334,7 @@ pub struct ProtobufProperties { pub topic: String, pub key_message_name: Option, pub name_strategy: PbSchemaRegistryNameStrategy, - pub message_as_jsonb: Option, + pub messages_as_jsonb: HashSet, } #[derive(Debug, Default, Clone, Copy)] diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index c857c7b272762..f8bfff9ef2683 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -28,6 +28,8 @@ use crate::parser::{AccessBuilder, EncodingProperties}; use crate::schema::schema_registry::{extract_schema_id, handle_sr_list, Client, WireFormatError}; use crate::schema::SchemaLoader; +pub const PROTOBUF_MESSAGES_AS_JSONB: &str = "messages_as_jsonb"; + #[derive(Debug)] pub struct ProtobufAccessBuilder { confluent_wire_type: bool, @@ -62,13 +64,9 @@ impl ProtobufAccessBuilder { let ProtobufParserConfig { confluent_wire_type, message_descriptor, - struct_as_jsonb_str, + mut messages_as_jsonb, } = config; - let mut messages_as_jsonb: HashSet = struct_as_jsonb_str - .split(',') - .map(|s| s.to_owned()) - .collect(); messages_as_jsonb.insert("google.protobuf.Any".to_owned()); Ok(Self { @@ -83,7 +81,7 @@ impl ProtobufAccessBuilder { pub struct ProtobufParserConfig { confluent_wire_type: bool, pub(crate) message_descriptor: MessageDescriptor, - struct_as_jsonb_str: String, + messages_as_jsonb: HashSet, } impl ProtobufParserConfig { @@ -128,7 +126,7 @@ impl ProtobufParserConfig { Ok(Self { message_descriptor, confluent_wire_type: protobuf_config.use_schema_registry, - struct_as_jsonb_str: protobuf_config.message_as_jsonb.unwrap_or_default(), + messages_as_jsonb: protobuf_config.messages_as_jsonb, }) } From 2a6f68c39e751ac1346d33a1e4cdc54ffda027ce Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 30 Dec 2024 13:01:50 +0800 Subject: [PATCH 05/20] circular detect error message --- src/connector/codec/src/decoder/protobuf/parser.rs | 6 +++++- src/connector/codec/tests/integration_tests/protobuf.rs | 2 +- src/connector/src/parser/protobuf/parser.rs | 4 +--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/connector/codec/src/decoder/protobuf/parser.rs b/src/connector/codec/src/decoder/protobuf/parser.rs index 060b5b58cda9d..2f48338e03231 100644 --- a/src/connector/codec/src/decoder/protobuf/parser.rs +++ b/src/connector/codec/src/decoder/protobuf/parser.rs @@ -28,6 +28,8 @@ use thiserror_ext::Macro; use crate::decoder::{uncategorized, AccessError, AccessResult}; +pub const PROTOBUF_MESSAGES_AS_JSONB: &str = "messages_as_jsonb"; + pub fn pb_schema_to_column_descs( message_descriptor: &MessageDescriptor, ) -> anyhow::Result> { @@ -94,10 +96,12 @@ fn detect_loop_and_push( let identifier = format!("{}({})", fd.name(), fd.full_name()); if trace.iter().any(|s| s == identifier.as_str()) { bail_protobuf_type_error!( - "circular reference detected: {}, conflict with {}, kind {:?}", + "circular reference detected: {}, conflict with {}, kind {:?}. Adding {:?} to {:?} may help.", trace.iter().format("->"), identifier, fd.kind(), + fd.full_name(), + PROTOBUF_MESSAGES_AS_JSONB, ); } trace.push(identifier); diff --git a/src/connector/codec/tests/integration_tests/protobuf.rs b/src/connector/codec/tests/integration_tests/protobuf.rs index fb16769925b34..9f107c75c3a20 100644 --- a/src/connector/codec/tests/integration_tests/protobuf.rs +++ b/src/connector/codec/tests/integration_tests/protobuf.rs @@ -714,7 +714,7 @@ fn test_recursive() -> anyhow::Result<()> { failed to map protobuf type Caused by: - circular reference detected: parent(recursive.ComplexRecursiveMessage.parent)->siblings(recursive.ComplexRecursiveMessage.Parent.siblings), conflict with parent(recursive.ComplexRecursiveMessage.parent), kind recursive.ComplexRecursiveMessage.Parent + circular reference detected: parent(recursive.ComplexRecursiveMessage.parent)->siblings(recursive.ComplexRecursiveMessage.Parent.siblings), conflict with parent(recursive.ComplexRecursiveMessage.parent), kind recursive.ComplexRecursiveMessage.Parent. Adding "recursive.ComplexRecursiveMessage.parent" to "messages_as_jsonb" may help. "#]], expect![""], ); diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index f8bfff9ef2683..5a8b52204d930 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -17,7 +17,7 @@ use std::collections::HashSet; use anyhow::Context; use prost_reflect::{DescriptorPool, DynamicMessage, FileDescriptor, MessageDescriptor}; use risingwave_common::{bail, try_match_expand}; -pub use risingwave_connector_codec::decoder::protobuf::parser::*; +pub use risingwave_connector_codec::decoder::protobuf::parser::{PROTOBUF_MESSAGES_AS_JSONB, *}; use risingwave_connector_codec::decoder::protobuf::ProtobufAccess; use risingwave_pb::plan_common::ColumnDesc; @@ -28,8 +28,6 @@ use crate::parser::{AccessBuilder, EncodingProperties}; use crate::schema::schema_registry::{extract_schema_id, handle_sr_list, Client, WireFormatError}; use crate::schema::SchemaLoader; -pub const PROTOBUF_MESSAGES_AS_JSONB: &str = "messages_as_jsonb"; - #[derive(Debug)] pub struct ProtobufAccessBuilder { confluent_wire_type: bool, From 11ed16fff14bdc45b28c33d3abc8a46c1d395be2 Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 30 Dec 2024 14:43:43 +0800 Subject: [PATCH 06/20] fix --- .../codec/src/decoder/protobuf/parser.rs | 37 ++++++-- .../codec/tests/integration_tests/protobuf.rs | 4 +- .../test_data/opentelemetry_common.proto | 81 ++++++++++++++++++ .../tests/test_data/opentelemetry_test.pb | Bin 0 -> 1369 bytes .../tests/test_data/opentelemetry_test.proto | 11 +++ src/connector/src/parser/config.rs | 3 +- src/connector/src/parser/protobuf/parser.rs | 7 +- .../create_source/external_schema/protobuf.rs | 3 + 8 files changed, 131 insertions(+), 15 deletions(-) create mode 100644 src/connector/codec/tests/test_data/opentelemetry_common.proto create mode 100644 src/connector/codec/tests/test_data/opentelemetry_test.pb create mode 100644 src/connector/codec/tests/test_data/opentelemetry_test.proto diff --git a/src/connector/codec/src/decoder/protobuf/parser.rs b/src/connector/codec/src/decoder/protobuf/parser.rs index 2f48338e03231..0d9096351b9d8 100644 --- a/src/connector/codec/src/decoder/protobuf/parser.rs +++ b/src/connector/codec/src/decoder/protobuf/parser.rs @@ -32,12 +32,18 @@ pub const PROTOBUF_MESSAGES_AS_JSONB: &str = "messages_as_jsonb"; pub fn pb_schema_to_column_descs( message_descriptor: &MessageDescriptor, + messages_as_jsonb: &HashSet, ) -> anyhow::Result> { let mut columns = Vec::with_capacity(message_descriptor.fields().len()); let mut index = 0; let mut parse_trace: Vec = vec![]; for field in message_descriptor.fields() { - columns.push(pb_field_to_col_desc(&field, &mut index, &mut parse_trace)?); + columns.push(pb_field_to_col_desc( + &field, + &mut index, + &mut parse_trace, + messages_as_jsonb, + )?); } Ok(columns) @@ -48,15 +54,16 @@ fn pb_field_to_col_desc( field_descriptor: &FieldDescriptor, index: &mut i32, parse_trace: &mut Vec, + messages_as_jsonb: &HashSet, ) -> anyhow::Result { - let field_type = protobuf_type_mapping(field_descriptor, parse_trace) + let field_type = protobuf_type_mapping(field_descriptor, parse_trace, messages_as_jsonb) .context("failed to map protobuf type")?; if let Kind::Message(m) = field_descriptor.kind() { let field_descs = if let DataType::List { .. } = field_type { vec![] } else { m.fields() - .map(|f| pb_field_to_col_desc(&f, index, parse_trace)) + .map(|f| pb_field_to_col_desc(&f, index, parse_trace, messages_as_jsonb)) .try_collect()? }; *index += 1; @@ -100,7 +107,7 @@ fn detect_loop_and_push( trace.iter().format("->"), identifier, fd.kind(), - fd.full_name(), + fd.kind(), PROTOBUF_MESSAGES_AS_JSONB, ); } @@ -250,6 +257,7 @@ pub fn from_protobuf_value<'a>( fn protobuf_type_mapping( field_descriptor: &FieldDescriptor, parse_trace: &mut Vec, + messages_as_jsonb: &HashSet, ) -> std::result::Result { detect_loop_and_push(parse_trace, field_descriptor)?; let mut t = match field_descriptor.kind() { @@ -264,20 +272,33 @@ fn protobuf_type_mapping( Kind::Uint64 | Kind::Fixed64 => DataType::Decimal, Kind::String => DataType::Varchar, Kind::Message(m) => { - if m.full_name() == "google.protobuf.Any" { + if messages_as_jsonb.contains(m.full_name()) { // Well-Known Types are identified by their full name DataType::Jsonb } else if m.is_map_entry() { // Map is equivalent to `repeated MapFieldEntry map_field = N;` debug_assert!(field_descriptor.is_map()); - let key = protobuf_type_mapping(&m.map_entry_key_field(), parse_trace)?; - let value = protobuf_type_mapping(&m.map_entry_value_field(), parse_trace)?; + let key = protobuf_type_mapping( + &m.map_entry_key_field(), + parse_trace, + messages_as_jsonb, + )?; + let value = protobuf_type_mapping( + &m.map_entry_value_field(), + parse_trace, + messages_as_jsonb, + )?; _ = parse_trace.pop(); return Ok(DataType::Map(MapType::from_kv(key, value))); } else { let fields = m .fields() - .map(|f| Ok((f.name().to_owned(), protobuf_type_mapping(&f, parse_trace)?))) + .map(|f| { + Ok(( + f.name().to_owned(), + protobuf_type_mapping(&f, parse_trace, messages_as_jsonb)?, + )) + }) .try_collect::<_, Vec<_>, _>()?; StructType::new(fields).into() } diff --git a/src/connector/codec/tests/integration_tests/protobuf.rs b/src/connector/codec/tests/integration_tests/protobuf.rs index 9f107c75c3a20..2f87a64d3e583 100644 --- a/src/connector/codec/tests/integration_tests/protobuf.rs +++ b/src/connector/codec/tests/integration_tests/protobuf.rs @@ -39,7 +39,7 @@ fn check( expected_risingwave_schema: expect_test::Expect, expected_risingwave_data: expect_test::Expect, ) { - let rw_schema = pb_schema_to_column_descs(&pb_schema); + let rw_schema = pb_schema_to_column_descs(&pb_schema, &HashSet::new()); if let Err(e) = rw_schema { expected_risingwave_schema.assert_eq(&e.to_report_string_pretty()); @@ -714,7 +714,7 @@ fn test_recursive() -> anyhow::Result<()> { failed to map protobuf type Caused by: - circular reference detected: parent(recursive.ComplexRecursiveMessage.parent)->siblings(recursive.ComplexRecursiveMessage.Parent.siblings), conflict with parent(recursive.ComplexRecursiveMessage.parent), kind recursive.ComplexRecursiveMessage.Parent. Adding "recursive.ComplexRecursiveMessage.parent" to "messages_as_jsonb" may help. + circular reference detected: parent(recursive.ComplexRecursiveMessage.parent)->siblings(recursive.ComplexRecursiveMessage.Parent.siblings), conflict with parent(recursive.ComplexRecursiveMessage.parent), kind recursive.ComplexRecursiveMessage.Parent. Adding "recursive.ComplexRecursiveMessage" to "messages_as_jsonb" may help. "#]], expect![""], ); diff --git a/src/connector/codec/tests/test_data/opentelemetry_common.proto b/src/connector/codec/tests/test_data/opentelemetry_common.proto new file mode 100644 index 0000000000000..356ac1293ca29 --- /dev/null +++ b/src/connector/codec/tests/test_data/opentelemetry_common.proto @@ -0,0 +1,81 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.common.v1; + +option csharp_namespace = "OpenTelemetry.Proto.Common.V1"; +option java_multiple_files = true; +option java_package = "io.opentelemetry.proto.common.v1"; +option java_outer_classname = "CommonProto"; +option go_package = "go.opentelemetry.io/proto/otlp/common/v1"; + +// AnyValue is used to represent any type of attribute value. AnyValue may contain a +// primitive value such as a string or integer or it may contain an arbitrary nested +// object containing arrays, key-value lists and primitives. +message AnyValue { + // The value is one of the listed fields. It is valid for all values to be unspecified + // in which case this AnyValue is considered to be "empty". + oneof value { + string string_value = 1; + bool bool_value = 2; + int64 int_value = 3; + double double_value = 4; + ArrayValue array_value = 5; + KeyValueList kvlist_value = 6; + bytes bytes_value = 7; + } +} + +// ArrayValue is a list of AnyValue messages. We need ArrayValue as a message +// since oneof in AnyValue does not allow repeated fields. +message ArrayValue { + // Array of values. The array may be empty (contain 0 elements). + repeated AnyValue values = 1; +} + +// KeyValueList is a list of KeyValue messages. We need KeyValueList as a message +// since `oneof` in AnyValue does not allow repeated fields. Everywhere else where we need +// a list of KeyValue messages (e.g. in Span) we use `repeated KeyValue` directly to +// avoid unnecessary extra wrapping (which slows down the protocol). The 2 approaches +// are semantically equivalent. +message KeyValueList { + // A collection of key/value pairs of key-value pairs. The list may be empty (may + // contain 0 elements). + // The keys MUST be unique (it is not allowed to have more than one + // value with the same key). + repeated KeyValue values = 1; +} + +// KeyValue is a key-value pair that is used to store Span attributes, Link +// attributes, etc. +message KeyValue { + string key = 1; + AnyValue value = 2; +} + +// InstrumentationScope is a message representing the instrumentation scope information +// such as the fully qualified name and version. +message InstrumentationScope { + // An empty instrumentation scope name means the name is unknown. + string name = 1; + string version = 2; + + // Additional attributes that describe the scope. [Optional]. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + repeated KeyValue attributes = 3; + uint32 dropped_attributes_count = 4; +} \ No newline at end of file diff --git a/src/connector/codec/tests/test_data/opentelemetry_test.pb b/src/connector/codec/tests/test_data/opentelemetry_test.pb new file mode 100644 index 0000000000000000000000000000000000000000..8d146c5873b4e053a3268e3021bc4f9d240c1591 GIT binary patch literal 1369 zcmb7E&2G~`5XN?!IFsYuQ&Lb$Zr+)1zMY-70fg?Sr8t4ro=8OBs2b`W8ROH@>W#?TJ(Jg78g7cuSQBX{Fl+@YwD^+*S@ zUa?J~b5+JsQ8(3)bkA?0dq)-77z;&zib63yfp%DNTIl|oNZ1jV%Akadny_1nDfZ3* zt~(rJ5ZXJ=az*2OErPhi(E@&K$WTB*f*;*-44&v3-1 zw`f;C{<{HtZ&TFIGC_$3JYwmGOI#x%u#*Of)|e;1i381&<^^KpZ7r`plGgwMM`?yj z9J*uu*W~I)p%hl7eI~6p z70#O)9}gEd3nJE+w(K(=XMJVEemVGNb|+}zS#4ogGPsJ(y%>ZIS#<(GO>ovs;Nv{F zwmSJcv%lfj(EDn_{mE=Rogu|JEs9`4$t!3DX>B{<|1DN-W2Lgkd$@<%QReW4@w9rj qv7FaI3P~pnNIt{EXkABsA%uit5KW$J-i-S%4SIVw7^O1nS^oyID$Sz+ literal 0 HcmV?d00001 diff --git a/src/connector/codec/tests/test_data/opentelemetry_test.proto b/src/connector/codec/tests/test_data/opentelemetry_test.proto new file mode 100644 index 0000000000000..85d3f2a52c27a --- /dev/null +++ b/src/connector/codec/tests/test_data/opentelemetry_test.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; + +package opentelemetry_test; + +import "opentelemetry_common.proto"; + +message OTLPTestMessage { + opentelemetry.proto.common.v1.AnyValue any_value = 1; + opentelemetry.proto.common.v1.KeyValueList key_value_list = 2; + opentelemetry.proto.common.v1.InstrumentationScope instrumentation_scope = 3; +} diff --git a/src/connector/src/parser/config.rs b/src/connector/src/parser/config.rs index ba845b1a55f8e..eed05d71e4b20 100644 --- a/src/connector/src/parser/config.rs +++ b/src/connector/src/parser/config.rs @@ -189,13 +189,14 @@ impl SpecificParserConfig { if info.row_schema_location.is_empty() { bail!("protobuf file location not provided"); } - let messages_as_jsonb = if let Some(messages_as_jsonb) = + let mut messages_as_jsonb = if let Some(messages_as_jsonb) = format_encode_options_with_secret.get(PROTOBUF_MESSAGES_AS_JSONB) { messages_as_jsonb.split(',').map(|s| s.to_owned()).collect() } else { HashSet::new() }; + messages_as_jsonb.insert("google.protobuf.Any".to_owned()); let mut config = ProtobufProperties { message_name: info.proto_message_name.clone(), diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index 5a8b52204d930..1e9589c15a3c6 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -62,11 +62,9 @@ impl ProtobufAccessBuilder { let ProtobufParserConfig { confluent_wire_type, message_descriptor, - mut messages_as_jsonb, + messages_as_jsonb, } = config; - messages_as_jsonb.insert("google.protobuf.Any".to_owned()); - Ok(Self { confluent_wire_type, message_descriptor, @@ -130,7 +128,8 @@ impl ProtobufParserConfig { /// Maps the protobuf schema to relational schema. pub fn map_to_columns(&self) -> ConnectorResult> { - pb_schema_to_column_descs(&self.message_descriptor).map_err(|e| e.into()) + pb_schema_to_column_descs(&self.message_descriptor, &self.messages_as_jsonb) + .map_err(|e| e.into()) } } diff --git a/src/frontend/src/handler/create_source/external_schema/protobuf.rs b/src/frontend/src/handler/create_source/external_schema/protobuf.rs index 7d3f8d2692c27..21f89df824371 100644 --- a/src/frontend/src/handler/create_source/external_schema/protobuf.rs +++ b/src/frontend/src/handler/create_source/external_schema/protobuf.rs @@ -14,6 +14,8 @@ use super::*; +use risingwave_connector::parser::PROTOBUF_MESSAGES_AS_JSONB; + /// Map a protobuf schema to a relational schema. pub async fn extract_protobuf_table_schema( schema: &ProtobufSchema, @@ -32,6 +34,7 @@ pub async fn extract_protobuf_table_schema( let parser_config = SpecificParserConfig::new(&info, with_properties)?; try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME); try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD); + try_consume_string_from_options(format_encode_options, PROTOBUF_MESSAGES_AS_JSONB); consume_aws_config_from_options(format_encode_options); let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?; From e13ad5370565e6eca362579b2bd6f92981d0403b Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 30 Dec 2024 15:43:50 +0800 Subject: [PATCH 07/20] fix --- .../codec/tests/integration_tests/protobuf.rs | 14 +++++++------- .../create_source/external_schema/protobuf.rs | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/connector/codec/tests/integration_tests/protobuf.rs b/src/connector/codec/tests/integration_tests/protobuf.rs index 2f87a64d3e583..a38db7cb6e8c4 100644 --- a/src/connector/codec/tests/integration_tests/protobuf.rs +++ b/src/connector/codec/tests/integration_tests/protobuf.rs @@ -39,7 +39,7 @@ fn check( expected_risingwave_schema: expect_test::Expect, expected_risingwave_data: expect_test::Expect, ) { - let rw_schema = pb_schema_to_column_descs(&pb_schema, &HashSet::new()); + let rw_schema = pb_schema_to_column_descs(&pb_schema, &HashSet::from(["google.protobuf.Any".to_owned()])); if let Err(e) = rw_schema { expected_risingwave_schema.assert_eq(&e.to_report_string_pretty()); @@ -239,11 +239,11 @@ fn test_any_schema() -> anyhow::Result<()> { // } static ANY_DATA_3: &[u8] = b"\x08\xb9\x60\x12\x32\x0a\x24\x74\x79\x70\x65\x2e\x67\x6f\x6f\x67\x6c\x65\x61\x70\x69\x73\x2e\x63\x6f\x6d\x2f\x74\x65\x73\x74\x2e\x53\x74\x72\x69\x6e\x67\x56\x61\x6c\x75\x65\x12\x0a\x0a\x08\x4a\x6f\x68\x6e\x20\x44\x6f\x65"; - // // id: 12345 - // // any_value: { - // // type_url: "type.googleapis.com/test.StringXalue" - // // value: "\n\010John Doe" - // // } + // id: 12345 + // any_value: { + // type_url: "type.googleapis.com/test.StringXalue" + // value: "\n\010John Doe" + // } static ANY_DATA_INVALID: &[u8] = b"\x08\xb9\x60\x12\x32\x0a\x24\x74\x79\x70\x65\x2e\x67\x6f\x6f\x67\x6c\x65\x61\x70\x69\x73\x2e\x63\x6f\x6d\x2f\x74\x65\x73\x74\x2e\x53\x74\x72\x69\x6e\x67\x58\x61\x6c\x75\x65\x12\x0a\x0a\x08\x4a\x6f\x68\x6e\x20\x44\x6f\x65"; // validate the binary data is correct @@ -714,7 +714,7 @@ fn test_recursive() -> anyhow::Result<()> { failed to map protobuf type Caused by: - circular reference detected: parent(recursive.ComplexRecursiveMessage.parent)->siblings(recursive.ComplexRecursiveMessage.Parent.siblings), conflict with parent(recursive.ComplexRecursiveMessage.parent), kind recursive.ComplexRecursiveMessage.Parent. Adding "recursive.ComplexRecursiveMessage" to "messages_as_jsonb" may help. + circular reference detected: parent(recursive.ComplexRecursiveMessage.parent)->siblings(recursive.ComplexRecursiveMessage.Parent.siblings), conflict with parent(recursive.ComplexRecursiveMessage.parent), kind recursive.ComplexRecursiveMessage.Parent. Adding recursive.ComplexRecursiveMessage.Parent to "messages_as_jsonb" may help. "#]], expect![""], ); diff --git a/src/frontend/src/handler/create_source/external_schema/protobuf.rs b/src/frontend/src/handler/create_source/external_schema/protobuf.rs index 21f89df824371..b22179f89b88a 100644 --- a/src/frontend/src/handler/create_source/external_schema/protobuf.rs +++ b/src/frontend/src/handler/create_source/external_schema/protobuf.rs @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use super::*; - use risingwave_connector::parser::PROTOBUF_MESSAGES_AS_JSONB; +use super::*; + /// Map a protobuf schema to a relational schema. pub async fn extract_protobuf_table_schema( schema: &ProtobufSchema, From 1ae513abb306da5d281b4de60a3f906713f42138 Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 30 Dec 2024 18:03:35 +0800 Subject: [PATCH 08/20] test --- .../kafka/protobuf/opentelemetry.slt | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 e2e_test/source_inline/kafka/protobuf/opentelemetry.slt diff --git a/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt new file mode 100644 index 0000000000000..e72a4610c8e9f --- /dev/null +++ b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt @@ -0,0 +1,35 @@ +system ok +rpk registry schema create "opentelemetry_common.proto" --schema "/risingwave/src/connector/codec/tests/test_data/opentelemetry_common.proto" + +system ok +rpk registry schema create "opentelemetry_test-value" --schema "/risingwave/src/connector/codec/tests/test_data/opentelemetry_test.proto" --references opentelemetry_common.proto:opentelemetry_common.proto:1 + +system ok +echo '{"any_value":{"string_value":"example"},"key_value_list":{"values":[{"key":"key1","value":{"string_value":"value1"}},{"key":"key2","value":{"int_value":42}}]},"instrumentation_scope":{"name":"test-scope","version":"1.0"}}' | rpk topic produce "opentelemetry_test" --schema-id=topic --schema-type="opentelemetry_test.OTLPTestMessage" + +statement ok +create table opentelemetry_test with ( ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'opentelemetry_test') format plain encode protobuf (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}, message = 'opentelemetry_test.OTLPTestMessage')) ; + +statement ok +flush; + +sleep 1s + +query T +select count(*) from opentelemetry_test; +---- +1 + +# ==== clean up ==== + +statement ok +drop table opentelemetry_test; + +system ok +rpk topic delete opentelemetry_test; + +system ok +rpk registry subject delete "opentelemetry_test-value" + +system ok +rpk registry subject delete "opentelemetry_common.proto" From e89de72562e53d699a2582f91b5b3e40b5f58feb Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 30 Dec 2024 20:43:11 +0800 Subject: [PATCH 09/20] fix test --- .../kafka/protobuf/opentelemetry.slt | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt index e72a4610c8e9f..82617ea3e04b7 100644 --- a/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt +++ b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt @@ -5,10 +5,17 @@ system ok rpk registry schema create "opentelemetry_test-value" --schema "/risingwave/src/connector/codec/tests/test_data/opentelemetry_test.proto" --references opentelemetry_common.proto:opentelemetry_common.proto:1 system ok -echo '{"any_value":{"string_value":"example"},"key_value_list":{"values":[{"key":"key1","value":{"string_value":"value1"}},{"key":"key2","value":{"int_value":42}}]},"instrumentation_scope":{"name":"test-scope","version":"1.0"}}' | rpk topic produce "opentelemetry_test" --schema-id=topic --schema-type="opentelemetry_test.OTLPTestMessage" +echo '{"any_value":{"string_value":"example"},"key_value_list":{"values":[{"key":"key1","value":{"string_value":"value1"}},{"key":"key2","value":{"int_value":42}}]},"instrumentation_scope":{"name":"test-scope","version":"1.0"}}' | rpk topic produce "opentelemetry_test" --schema-id=topic --schema-type="opentelemetry_test.OTLPTestMessage" --allow-auto-topic-creation statement ok -create table opentelemetry_test with ( ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'opentelemetry_test') format plain encode protobuf (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}, message = 'opentelemetry_test.OTLPTestMessage')) ; +create table opentelemetry_test with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'opentelemetry_test') +format plain encode protobuf ( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}, + message = 'opentelemetry_test.OTLPTestMessage', + messages_as_jsonb = 'opentelemetry.proto.common.v1.ArrayValue,opentelemetry.proto.common.v1.KeyValueList' +) ; statement ok flush; @@ -20,6 +27,21 @@ select count(*) from opentelemetry_test; ---- 1 +query TTIITTT +select (any_value).string_value, (any_value).bool_value, (any_value).int_value, (any_value).double_value, (any_value).array_value, (any_value).kvlist_value, (any_value).bytes_value from opentelemetry_test ; +---- +example f 0 0 {} {} \x + +query T +select key_value_list from opentelemetry_test; +---- +{"values": [{"key": "key1", "value": {"stringValue": "value1"}}, {"key": "key2", "value": {"intValue": "42"}}]} + +query TTTI +select (instrumentation_scope).name, (instrumentation_scope).version, (instrumentation_scope).attributes, (instrumentation_scope).dropped_attributes_count from opentelemetry_test; +---- +test-scope 1.0 {} 0 + # ==== clean up ==== statement ok From 7bd3d3f3e40158d47f40364082545a73632ef478 Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 30 Dec 2024 21:25:03 +0800 Subject: [PATCH 10/20] fix --- e2e_test/source_inline/kafka/protobuf/opentelemetry.slt | 3 +-- src/connector/codec/tests/integration_tests/protobuf.rs | 5 ++++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt index 82617ea3e04b7..6541d21de7a4b 100644 --- a/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt +++ b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt @@ -14,8 +14,7 @@ create table opentelemetry_test with ( format plain encode protobuf ( schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}, message = 'opentelemetry_test.OTLPTestMessage', - messages_as_jsonb = 'opentelemetry.proto.common.v1.ArrayValue,opentelemetry.proto.common.v1.KeyValueList' -) ; + messages_as_jsonb = 'opentelemetry.proto.common.v1.ArrayValue,opentelemetry.proto.common.v1.KeyValueList,opentelemetry.proto.common.v1.AnyValue') statement ok flush; diff --git a/src/connector/codec/tests/integration_tests/protobuf.rs b/src/connector/codec/tests/integration_tests/protobuf.rs index a38db7cb6e8c4..b4dc10f7c94dd 100644 --- a/src/connector/codec/tests/integration_tests/protobuf.rs +++ b/src/connector/codec/tests/integration_tests/protobuf.rs @@ -39,7 +39,10 @@ fn check( expected_risingwave_schema: expect_test::Expect, expected_risingwave_data: expect_test::Expect, ) { - let rw_schema = pb_schema_to_column_descs(&pb_schema, &HashSet::from(["google.protobuf.Any".to_owned()])); + let rw_schema = pb_schema_to_column_descs( + &pb_schema, + &HashSet::from(["google.protobuf.Any".to_owned()]), + ); if let Err(e) = rw_schema { expected_risingwave_schema.assert_eq(&e.to_report_string_pretty()); From 35768e5d1654671083854ae6ec3cb53cb618f5a8 Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 30 Dec 2024 21:47:20 +0800 Subject: [PATCH 11/20] fix --- e2e_test/source_inline/kafka/protobuf/opentelemetry.slt | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt index 6541d21de7a4b..b8b416804921c 100644 --- a/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt +++ b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt @@ -8,13 +8,7 @@ system ok echo '{"any_value":{"string_value":"example"},"key_value_list":{"values":[{"key":"key1","value":{"string_value":"value1"}},{"key":"key2","value":{"int_value":42}}]},"instrumentation_scope":{"name":"test-scope","version":"1.0"}}' | rpk topic produce "opentelemetry_test" --schema-id=topic --schema-type="opentelemetry_test.OTLPTestMessage" --allow-auto-topic-creation statement ok -create table opentelemetry_test with ( - ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, - topic = 'opentelemetry_test') -format plain encode protobuf ( - schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}, - message = 'opentelemetry_test.OTLPTestMessage', - messages_as_jsonb = 'opentelemetry.proto.common.v1.ArrayValue,opentelemetry.proto.common.v1.KeyValueList,opentelemetry.proto.common.v1.AnyValue') +create table opentelemetry_test with ( ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'opentelemetry_test' ) format plain encode protobuf ( schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}, message = 'opentelemetry_test.OTLPTestMessage', messages_as_jsonb = 'opentelemetry.proto.common.v1.ArrayValue,opentelemetry.proto.common.v1.KeyValueList,opentelemetry.proto.common.v1.AnyValue'); statement ok flush; From cf83eb3eca467755a60caf992ffc1a128c357a60 Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 30 Dec 2024 21:49:36 +0800 Subject: [PATCH 12/20] fix --- e2e_test/source_inline/kafka/protobuf/opentelemetry.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt index b8b416804921c..08ac38ec7f2f2 100644 --- a/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt +++ b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt @@ -8,7 +8,7 @@ system ok echo '{"any_value":{"string_value":"example"},"key_value_list":{"values":[{"key":"key1","value":{"string_value":"value1"}},{"key":"key2","value":{"int_value":42}}]},"instrumentation_scope":{"name":"test-scope","version":"1.0"}}' | rpk topic produce "opentelemetry_test" --schema-id=topic --schema-type="opentelemetry_test.OTLPTestMessage" --allow-auto-topic-creation statement ok -create table opentelemetry_test with ( ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'opentelemetry_test' ) format plain encode protobuf ( schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}, message = 'opentelemetry_test.OTLPTestMessage', messages_as_jsonb = 'opentelemetry.proto.common.v1.ArrayValue,opentelemetry.proto.common.v1.KeyValueList,opentelemetry.proto.common.v1.AnyValue'); +create table opentelemetry_test with ( ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'opentelemetry_test' ) format plain encode protobuf ( schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', message = 'opentelemetry_test.OTLPTestMessage', messages_as_jsonb = 'opentelemetry.proto.common.v1.ArrayValue,opentelemetry.proto.common.v1.KeyValueList,opentelemetry.proto.common.v1.AnyValue'); statement ok flush; From 666da5f766c24719f47fbcfbc504bdafa01326ed Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 30 Dec 2024 22:53:40 +0800 Subject: [PATCH 13/20] fix: enable control substitution --- e2e_test/source_inline/kafka/protobuf/opentelemetry.slt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt index 08ac38ec7f2f2..0698ef65b1ae7 100644 --- a/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt +++ b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt @@ -1,3 +1,5 @@ +control substitution on + system ok rpk registry schema create "opentelemetry_common.proto" --schema "/risingwave/src/connector/codec/tests/test_data/opentelemetry_common.proto" From f165b1dc3d13aa654fedc5c381840140f2652c8a Mon Sep 17 00:00:00 2001 From: tabversion Date: Tue, 31 Dec 2024 14:16:32 +0800 Subject: [PATCH 14/20] fix --- .../kafka/protobuf/opentelemetry.slt | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt index 0698ef65b1ae7..31e34c21ecdc1 100644 --- a/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt +++ b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt @@ -22,20 +22,10 @@ select count(*) from opentelemetry_test; ---- 1 -query TTIITTT -select (any_value).string_value, (any_value).bool_value, (any_value).int_value, (any_value).double_value, (any_value).array_value, (any_value).kvlist_value, (any_value).bytes_value from opentelemetry_test ; +query TTT +select any_value, key_value_list, instrumentation_scope from opentelemetry_test; ---- -example f 0 0 {} {} \x - -query T -select key_value_list from opentelemetry_test; ----- -{"values": [{"key": "key1", "value": {"stringValue": "value1"}}, {"key": "key2", "value": {"intValue": "42"}}]} - -query TTTI -select (instrumentation_scope).name, (instrumentation_scope).version, (instrumentation_scope).attributes, (instrumentation_scope).dropped_attributes_count from opentelemetry_test; ----- -test-scope 1.0 {} 0 +{"stringValue": "example"} {"values": [{"key": "key1", "value": {"stringValue": "value1"}}, {"key": "key2", "value": {"intValue": "42"}}]} (test-scope,1.0,{},0) # ==== clean up ==== From 4aa9d1bdbb47b05003297de1a2cf33bbcce69ffc Mon Sep 17 00:00:00 2001 From: tabversion Date: Tue, 31 Dec 2024 15:03:49 +0800 Subject: [PATCH 15/20] handle stack overflow --- .../kafka/protobuf/recursive_overflow.slt | 22 +++++++++++++++++++ .../codec/src/decoder/protobuf/parser.rs | 2 +- .../tests/test_data/recursive_complex.proto | 21 ++++++++++++++++++ 3 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 e2e_test/source_inline/kafka/protobuf/recursive_overflow.slt create mode 100644 src/connector/codec/tests/test_data/recursive_complex.proto diff --git a/e2e_test/source_inline/kafka/protobuf/recursive_overflow.slt b/e2e_test/source_inline/kafka/protobuf/recursive_overflow.slt new file mode 100644 index 0000000000000..19d9f32275a1c --- /dev/null +++ b/e2e_test/source_inline/kafka/protobuf/recursive_overflow.slt @@ -0,0 +1,22 @@ +control substitution on + +system ok +rpk registry schema create "recursive_complex-value" --schema "/risingwave/src/connector/codec/tests/test_data/recursive_complex.proto" + +system ok +echo '{"array_value":{"value1":{"string_value":"This is a string value"},"value2":{"int_value":42},"array_value":{"value1":{"double_value":3.14159},"value2":{"bool_value":true},"array_value":{"value1":{"string_value":"Deeply nested string"},"value2":{"int_value":100}}}}}' | rpk topic produce "recursive_complex" --schema-id=topic --schema-type="recursive_complex.AnyValue" --allow-auto-topic-creation + +# the test just make sure the table can finish create process +statement ok +create table recursive_complex with ( ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'recursive_complex' ) format plain encode protobuf ( schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', message = 'recursive_complex.AnyValue', messages_as_jsonb = 'recursive_complex.AnyValue,recursive_complex.ArrayValue'); + +# ==== clean up ==== + +statement ok +drop table recursive_complex; + +system ok +rpk topic delete recursive_complex; + +system ok +rpk registry subject delete "recursive_complex-value" diff --git a/src/connector/codec/src/decoder/protobuf/parser.rs b/src/connector/codec/src/decoder/protobuf/parser.rs index 0d9096351b9d8..a99a2b08db64e 100644 --- a/src/connector/codec/src/decoder/protobuf/parser.rs +++ b/src/connector/codec/src/decoder/protobuf/parser.rs @@ -58,7 +58,7 @@ fn pb_field_to_col_desc( ) -> anyhow::Result { let field_type = protobuf_type_mapping(field_descriptor, parse_trace, messages_as_jsonb) .context("failed to map protobuf type")?; - if let Kind::Message(m) = field_descriptor.kind() { + if let Kind::Message(m) = field_descriptor.kind() && !messages_as_jsonb.contains(m.full_name()) { let field_descs = if let DataType::List { .. } = field_type { vec![] } else { diff --git a/src/connector/codec/tests/test_data/recursive_complex.proto b/src/connector/codec/tests/test_data/recursive_complex.proto new file mode 100644 index 0000000000000..4e0d964214020 --- /dev/null +++ b/src/connector/codec/tests/test_data/recursive_complex.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; + +// a recursive complex type can cause stack overflow in the frontend when inferring the schema + +package recursive_complex; + +message AnyValue { + oneof value { + string string_value = 1; + int32 int_value = 2; + double double_value = 3; + bool bool_value = 4; + ArrayValue array_value = 5; + } +} + +message ArrayValue { + AnyValue value1 = 1; + AnyValue value2 = 2; + ArrayValue array_value = 3; +} From b931440dcf1a95cb2f8d21579be391d5817b89e2 Mon Sep 17 00:00:00 2001 From: tabversion Date: Tue, 31 Dec 2024 15:04:18 +0800 Subject: [PATCH 16/20] remove pb --- .../codec/tests/test_data/opentelemetry_test.pb | Bin 1369 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 src/connector/codec/tests/test_data/opentelemetry_test.pb diff --git a/src/connector/codec/tests/test_data/opentelemetry_test.pb b/src/connector/codec/tests/test_data/opentelemetry_test.pb deleted file mode 100644 index 8d146c5873b4e053a3268e3021bc4f9d240c1591..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1369 zcmb7E&2G~`5XN?!IFsYuQ&Lb$Zr+)1zMY-70fg?Sr8t4ro=8OBs2b`W8ROH@>W#?TJ(Jg78g7cuSQBX{Fl+@YwD^+*S@ zUa?J~b5+JsQ8(3)bkA?0dq)-77z;&zib63yfp%DNTIl|oNZ1jV%Akadny_1nDfZ3* zt~(rJ5ZXJ=az*2OErPhi(E@&K$WTB*f*;*-44&v3-1 zw`f;C{<{HtZ&TFIGC_$3JYwmGOI#x%u#*Of)|e;1i381&<^^KpZ7r`plGgwMM`?yj z9J*uu*W~I)p%hl7eI~6p z70#O)9}gEd3nJE+w(K(=XMJVEemVGNb|+}zS#4ogGPsJ(y%>ZIS#<(GO>ovs;Nv{F zwmSJcv%lfj(EDn_{mE=Rogu|JEs9`4$t!3DX>B{<|1DN-W2Lgkd$@<%QReW4@w9rj qv7FaI3P~pnNIt{EXkABsA%uit5KW$J-i-S%4SIVw7^O1nS^oyID$Sz+ From ebaba06e8da968c42ae0033ac3581642e3139ec7 Mon Sep 17 00:00:00 2001 From: tabversion Date: Tue, 31 Dec 2024 15:14:01 +0800 Subject: [PATCH 17/20] format --- src/connector/codec/src/decoder/protobuf/parser.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/connector/codec/src/decoder/protobuf/parser.rs b/src/connector/codec/src/decoder/protobuf/parser.rs index a99a2b08db64e..3fc8a511dc163 100644 --- a/src/connector/codec/src/decoder/protobuf/parser.rs +++ b/src/connector/codec/src/decoder/protobuf/parser.rs @@ -58,7 +58,9 @@ fn pb_field_to_col_desc( ) -> anyhow::Result { let field_type = protobuf_type_mapping(field_descriptor, parse_trace, messages_as_jsonb) .context("failed to map protobuf type")?; - if let Kind::Message(m) = field_descriptor.kind() && !messages_as_jsonb.contains(m.full_name()) { + if let Kind::Message(m) = field_descriptor.kind() + && !messages_as_jsonb.contains(m.full_name()) + { let field_descs = if let DataType::List { .. } = field_type { vec![] } else { From 4735203cf151159df628fed48607a8b81edacf98 Mon Sep 17 00:00:00 2001 From: tabversion Date: Tue, 31 Dec 2024 15:39:06 +0800 Subject: [PATCH 18/20] update testcase --- .../codec/tests/integration_tests/protobuf.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/connector/codec/tests/integration_tests/protobuf.rs b/src/connector/codec/tests/integration_tests/protobuf.rs index b4dc10f7c94dd..62a5c5af3ddb6 100644 --- a/src/connector/codec/tests/integration_tests/protobuf.rs +++ b/src/connector/codec/tests/integration_tests/protobuf.rs @@ -456,7 +456,7 @@ fn test_any_schema() -> anyhow::Result<()> { expect![[r#" [ id(#1): Int32, - any_value(#4): Jsonb, type_name: google.protobuf.Any, field_descs: [type_url(#2): Varchar, value(#3): Bytea], + any_value(#2): Jsonb, ]"#]], expect![[r#" Owned(Int32(12345)) @@ -607,14 +607,14 @@ fn test_all_types() -> anyhow::Result<()> { seconds: Int64, nanos: Int32, }, type_name: google.protobuf.Duration, field_descs: [seconds(#30): Int64, nanos(#31): Int32], - any_field(#35): Jsonb, type_name: google.protobuf.Any, field_descs: [type_url(#33): Varchar, value(#34): Bytea], - int32_value_field(#37): Struct { value: Int32 }, type_name: google.protobuf.Int32Value, field_descs: [value(#36): Int32], - string_value_field(#39): Struct { value: Varchar }, type_name: google.protobuf.StringValue, field_descs: [value(#38): Varchar], - map_struct_field(#44): Map(Varchar,Struct { id: Int32, name: Varchar }), type_name: all_types.AllTypes.MapStructFieldEntry, field_descs: [key(#40): Varchar, value(#43): Struct { + any_field(#33): Jsonb, + int32_value_field(#35): Struct { value: Int32 }, type_name: google.protobuf.Int32Value, field_descs: [value(#34): Int32], + string_value_field(#37): Struct { value: Varchar }, type_name: google.protobuf.StringValue, field_descs: [value(#36): Varchar], + map_struct_field(#42): Map(Varchar,Struct { id: Int32, name: Varchar }), type_name: all_types.AllTypes.MapStructFieldEntry, field_descs: [key(#38): Varchar, value(#41): Struct { id: Int32, name: Varchar, - }, type_name: all_types.AllTypes.NestedMessage, field_descs: [id(#41): Int32, name(#42): Varchar]], - map_enum_field(#47): Map(Int32,Varchar), type_name: all_types.AllTypes.MapEnumFieldEntry, field_descs: [key(#45): Int32, value(#46): Varchar], + }, type_name: all_types.AllTypes.NestedMessage, field_descs: [id(#39): Int32, name(#40): Varchar]], + map_enum_field(#45): Map(Int32,Varchar), type_name: all_types.AllTypes.MapEnumFieldEntry, field_descs: [key(#43): Int32, value(#44): Varchar], ]"#]], expect![[r#" Owned(Float64(OrderedFloat(1.2345))) From e20d4fc7c606115ccc01d8c5a7e0790616376be7 Mon Sep 17 00:00:00 2001 From: tabversion Date: Tue, 31 Dec 2024 16:03:23 +0800 Subject: [PATCH 19/20] move proto to inline --- .../kafka/protobuf/opentelemetry.slt | 14 ++++++++++- .../kafka/protobuf/recursive_overflow.slt | 25 ++++++++++++++++++- .../tests/test_data/opentelemetry_test.proto | 11 -------- .../tests/test_data/recursive_complex.proto | 21 ---------------- 4 files changed, 37 insertions(+), 34 deletions(-) delete mode 100644 src/connector/codec/tests/test_data/opentelemetry_test.proto delete mode 100644 src/connector/codec/tests/test_data/recursive_complex.proto diff --git a/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt index 31e34c21ecdc1..c3297327c3f85 100644 --- a/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt +++ b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt @@ -4,7 +4,19 @@ system ok rpk registry schema create "opentelemetry_common.proto" --schema "/risingwave/src/connector/codec/tests/test_data/opentelemetry_common.proto" system ok -rpk registry schema create "opentelemetry_test-value" --schema "/risingwave/src/connector/codec/tests/test_data/opentelemetry_test.proto" --references opentelemetry_common.proto:opentelemetry_common.proto:1 +rpk registry schema create "opentelemetry_test-value" --schema "/dev/stdin" --references opentelemetry_common.proto:opentelemetry_common.proto:1 --type protobuf << EOF +syntax = "proto3"; + +package opentelemetry_test; + +import "opentelemetry_common.proto"; + +message OTLPTestMessage { + opentelemetry.proto.common.v1.AnyValue any_value = 1; + opentelemetry.proto.common.v1.KeyValueList key_value_list = 2; + opentelemetry.proto.common.v1.InstrumentationScope instrumentation_scope = 3; +} +EOF system ok echo '{"any_value":{"string_value":"example"},"key_value_list":{"values":[{"key":"key1","value":{"string_value":"value1"}},{"key":"key2","value":{"int_value":42}}]},"instrumentation_scope":{"name":"test-scope","version":"1.0"}}' | rpk topic produce "opentelemetry_test" --schema-id=topic --schema-type="opentelemetry_test.OTLPTestMessage" --allow-auto-topic-creation diff --git a/e2e_test/source_inline/kafka/protobuf/recursive_overflow.slt b/e2e_test/source_inline/kafka/protobuf/recursive_overflow.slt index 19d9f32275a1c..b4256db41582f 100644 --- a/e2e_test/source_inline/kafka/protobuf/recursive_overflow.slt +++ b/e2e_test/source_inline/kafka/protobuf/recursive_overflow.slt @@ -1,7 +1,30 @@ control substitution on system ok -rpk registry schema create "recursive_complex-value" --schema "/risingwave/src/connector/codec/tests/test_data/recursive_complex.proto" +rpk registry schema create "recursive_complex-value" --schema "/dev/stdin" --type protobuf << EOF +syntax = "proto3"; + +// a recursive complex type can cause stack overflow in the frontend when inferring the schema + +package recursive_complex; + +message AnyValue { + oneof value { + string string_value = 1; + int32 int_value = 2; + double double_value = 3; + bool bool_value = 4; + ArrayValue array_value = 5; + } +} + +message ArrayValue { + AnyValue value1 = 1; + AnyValue value2 = 2; + ArrayValue array_value = 3; +} + +EOF system ok echo '{"array_value":{"value1":{"string_value":"This is a string value"},"value2":{"int_value":42},"array_value":{"value1":{"double_value":3.14159},"value2":{"bool_value":true},"array_value":{"value1":{"string_value":"Deeply nested string"},"value2":{"int_value":100}}}}}' | rpk topic produce "recursive_complex" --schema-id=topic --schema-type="recursive_complex.AnyValue" --allow-auto-topic-creation diff --git a/src/connector/codec/tests/test_data/opentelemetry_test.proto b/src/connector/codec/tests/test_data/opentelemetry_test.proto deleted file mode 100644 index 85d3f2a52c27a..0000000000000 --- a/src/connector/codec/tests/test_data/opentelemetry_test.proto +++ /dev/null @@ -1,11 +0,0 @@ -syntax = "proto3"; - -package opentelemetry_test; - -import "opentelemetry_common.proto"; - -message OTLPTestMessage { - opentelemetry.proto.common.v1.AnyValue any_value = 1; - opentelemetry.proto.common.v1.KeyValueList key_value_list = 2; - opentelemetry.proto.common.v1.InstrumentationScope instrumentation_scope = 3; -} diff --git a/src/connector/codec/tests/test_data/recursive_complex.proto b/src/connector/codec/tests/test_data/recursive_complex.proto deleted file mode 100644 index 4e0d964214020..0000000000000 --- a/src/connector/codec/tests/test_data/recursive_complex.proto +++ /dev/null @@ -1,21 +0,0 @@ -syntax = "proto3"; - -// a recursive complex type can cause stack overflow in the frontend when inferring the schema - -package recursive_complex; - -message AnyValue { - oneof value { - string string_value = 1; - int32 int_value = 2; - double double_value = 3; - bool bool_value = 4; - ArrayValue array_value = 5; - } -} - -message ArrayValue { - AnyValue value1 = 1; - AnyValue value2 = 2; - ArrayValue array_value = 3; -} From 2d14a7cd0c35309a5e6696a1dea3654089d50c3f Mon Sep 17 00:00:00 2001 From: tabversion Date: Tue, 31 Dec 2024 16:17:44 +0800 Subject: [PATCH 20/20] fix --- e2e_test/source_inline/kafka/protobuf/opentelemetry.slt | 4 +--- .../source_inline/kafka/protobuf/recursive_overflow.slt | 6 +----- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt index c3297327c3f85..f95b840b4054a 100644 --- a/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt +++ b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt @@ -6,11 +6,8 @@ rpk registry schema create "opentelemetry_common.proto" --schema "/risingwave/sr system ok rpk registry schema create "opentelemetry_test-value" --schema "/dev/stdin" --references opentelemetry_common.proto:opentelemetry_common.proto:1 --type protobuf << EOF syntax = "proto3"; - package opentelemetry_test; - import "opentelemetry_common.proto"; - message OTLPTestMessage { opentelemetry.proto.common.v1.AnyValue any_value = 1; opentelemetry.proto.common.v1.KeyValueList key_value_list = 2; @@ -18,6 +15,7 @@ message OTLPTestMessage { } EOF + system ok echo '{"any_value":{"string_value":"example"},"key_value_list":{"values":[{"key":"key1","value":{"string_value":"value1"}},{"key":"key2","value":{"int_value":42}}]},"instrumentation_scope":{"name":"test-scope","version":"1.0"}}' | rpk topic produce "opentelemetry_test" --schema-id=topic --schema-type="opentelemetry_test.OTLPTestMessage" --allow-auto-topic-creation diff --git a/e2e_test/source_inline/kafka/protobuf/recursive_overflow.slt b/e2e_test/source_inline/kafka/protobuf/recursive_overflow.slt index b4256db41582f..eb137ca965a92 100644 --- a/e2e_test/source_inline/kafka/protobuf/recursive_overflow.slt +++ b/e2e_test/source_inline/kafka/protobuf/recursive_overflow.slt @@ -3,11 +3,8 @@ control substitution on system ok rpk registry schema create "recursive_complex-value" --schema "/dev/stdin" --type protobuf << EOF syntax = "proto3"; - // a recursive complex type can cause stack overflow in the frontend when inferring the schema - package recursive_complex; - message AnyValue { oneof value { string string_value = 1; @@ -17,15 +14,14 @@ message AnyValue { ArrayValue array_value = 5; } } - message ArrayValue { AnyValue value1 = 1; AnyValue value2 = 2; ArrayValue array_value = 3; } - EOF + system ok echo '{"array_value":{"value1":{"string_value":"This is a string value"},"value2":{"int_value":42},"array_value":{"value1":{"double_value":3.14159},"value2":{"bool_value":true},"array_value":{"value1":{"string_value":"Deeply nested string"},"value2":{"int_value":100}}}}}' | rpk topic produce "recursive_complex" --schema-id=topic --schema-type="recursive_complex.AnyValue" --allow-auto-topic-creation