diff --git a/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt new file mode 100644 index 0000000000000..f95b840b4054a --- /dev/null +++ b/e2e_test/source_inline/kafka/protobuf/opentelemetry.slt @@ -0,0 +1,52 @@ +control substitution on + +system ok +rpk registry schema create "opentelemetry_common.proto" --schema "/risingwave/src/connector/codec/tests/test_data/opentelemetry_common.proto" + +system ok +rpk registry schema create "opentelemetry_test-value" --schema "/dev/stdin" --references opentelemetry_common.proto:opentelemetry_common.proto:1 --type protobuf << EOF +syntax = "proto3"; +package opentelemetry_test; +import "opentelemetry_common.proto"; +message OTLPTestMessage { + opentelemetry.proto.common.v1.AnyValue any_value = 1; + opentelemetry.proto.common.v1.KeyValueList key_value_list = 2; + opentelemetry.proto.common.v1.InstrumentationScope instrumentation_scope = 3; +} +EOF + + +system ok +echo '{"any_value":{"string_value":"example"},"key_value_list":{"values":[{"key":"key1","value":{"string_value":"value1"}},{"key":"key2","value":{"int_value":42}}]},"instrumentation_scope":{"name":"test-scope","version":"1.0"}}' | rpk topic produce "opentelemetry_test" --schema-id=topic --schema-type="opentelemetry_test.OTLPTestMessage" --allow-auto-topic-creation + +statement ok +create table opentelemetry_test with ( ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'opentelemetry_test' ) format plain encode protobuf ( schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', message = 'opentelemetry_test.OTLPTestMessage', messages_as_jsonb = 'opentelemetry.proto.common.v1.ArrayValue,opentelemetry.proto.common.v1.KeyValueList,opentelemetry.proto.common.v1.AnyValue'); + +statement ok +flush; + +sleep 1s + +query T +select count(*) from opentelemetry_test; +---- +1 + +query TTT +select any_value, key_value_list, instrumentation_scope from opentelemetry_test; +---- +{"stringValue": "example"} {"values": [{"key": "key1", "value": {"stringValue": "value1"}}, {"key": "key2", "value": {"intValue": "42"}}]} (test-scope,1.0,{},0) + +# ==== clean up ==== + +statement ok +drop table opentelemetry_test; + +system ok +rpk topic delete opentelemetry_test; + +system ok +rpk registry subject delete "opentelemetry_test-value" + +system ok +rpk registry subject delete "opentelemetry_common.proto" diff --git a/e2e_test/source_inline/kafka/protobuf/recursive_overflow.slt b/e2e_test/source_inline/kafka/protobuf/recursive_overflow.slt new file mode 100644 index 0000000000000..eb137ca965a92 --- /dev/null +++ b/e2e_test/source_inline/kafka/protobuf/recursive_overflow.slt @@ -0,0 +1,41 @@ +control substitution on + +system ok +rpk registry schema create "recursive_complex-value" --schema "/dev/stdin" --type protobuf << EOF +syntax = "proto3"; +// a recursive complex type can cause stack overflow in the frontend when inferring the schema +package recursive_complex; +message AnyValue { + oneof value { + string string_value = 1; + int32 int_value = 2; + double double_value = 3; + bool bool_value = 4; + ArrayValue array_value = 5; + } +} +message ArrayValue { + AnyValue value1 = 1; + AnyValue value2 = 2; + ArrayValue array_value = 3; +} +EOF + + +system ok +echo '{"array_value":{"value1":{"string_value":"This is a string value"},"value2":{"int_value":42},"array_value":{"value1":{"double_value":3.14159},"value2":{"bool_value":true},"array_value":{"value1":{"string_value":"Deeply nested string"},"value2":{"int_value":100}}}}}' | rpk topic produce "recursive_complex" --schema-id=topic --schema-type="recursive_complex.AnyValue" --allow-auto-topic-creation + +# the test just make sure the table can finish create process +statement ok +create table recursive_complex with ( ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, topic = 'recursive_complex' ) format plain encode protobuf ( schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', message = 'recursive_complex.AnyValue', messages_as_jsonb = 'recursive_complex.AnyValue,recursive_complex.ArrayValue'); + +# ==== clean up ==== + +statement ok +drop table recursive_complex; + +system ok +rpk topic delete recursive_complex; + +system ok +rpk registry subject delete "recursive_complex-value" diff --git a/src/connector/codec/src/decoder/protobuf/mod.rs b/src/connector/codec/src/decoder/protobuf/mod.rs index 7ad357fef50fb..824f8131d6bcd 100644 --- a/src/connector/codec/src/decoder/protobuf/mod.rs +++ b/src/connector/codec/src/decoder/protobuf/mod.rs @@ -14,6 +14,7 @@ pub mod parser; use std::borrow::Cow; +use std::collections::HashSet; use std::sync::LazyLock; use parser::from_protobuf_value; @@ -24,13 +25,17 @@ use thiserror_ext::AsReport; use super::{uncategorized, Access, AccessResult}; -pub struct ProtobufAccess { +pub struct ProtobufAccess<'a> { message: DynamicMessage, + messages_as_jsonb: &'a HashSet, } -impl ProtobufAccess { - pub fn new(message: DynamicMessage) -> Self { - Self { message } +impl<'a> ProtobufAccess<'a> { + pub fn new(message: DynamicMessage, messages_as_jsonb: &'a HashSet) -> Self { + Self { + message, + messages_as_jsonb, + } } #[cfg(test)] @@ -39,7 +44,7 @@ impl ProtobufAccess { } } -impl Access for ProtobufAccess { +impl Access for ProtobufAccess<'_> { fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult> { debug_assert_eq!(1, path.len()); let field_desc = self @@ -56,12 +61,15 @@ impl Access for ProtobufAccess { })?; match self.message.get_field(&field_desc) { - Cow::Borrowed(value) => from_protobuf_value(&field_desc, value, type_expected), + Cow::Borrowed(value) => { + from_protobuf_value(&field_desc, value, type_expected, self.messages_as_jsonb) + } // `Owned` variant occurs only if there's no such field and the default value is returned. - Cow::Owned(value) => from_protobuf_value(&field_desc, &value, type_expected) - // enforce `Owned` variant to avoid returning a reference to a temporary value - .map(|d| d.to_owned_datum().into()), + Cow::Owned(value) => { + from_protobuf_value(&field_desc, &value, type_expected, self.messages_as_jsonb) + .map(|d| d.to_owned_datum().into()) + } } } } diff --git a/src/connector/codec/src/decoder/protobuf/parser.rs b/src/connector/codec/src/decoder/protobuf/parser.rs index 852fa9cca48d6..35cda3a5b8de1 100644 --- a/src/connector/codec/src/decoder/protobuf/parser.rs +++ b/src/connector/codec/src/decoder/protobuf/parser.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + use anyhow::Context; use itertools::Itertools; use prost_reflect::{Cardinality, FieldDescriptor, Kind, MessageDescriptor, ReflectMessage, Value}; @@ -25,14 +27,22 @@ use thiserror_ext::Macro; use crate::decoder::{uncategorized, AccessError, AccessResult}; +pub const PROTOBUF_MESSAGES_AS_JSONB: &str = "messages_as_jsonb"; + pub fn pb_schema_to_column_descs( message_descriptor: &MessageDescriptor, + messages_as_jsonb: &HashSet, ) -> anyhow::Result> { let mut columns = Vec::with_capacity(message_descriptor.fields().len()); let mut index = 0; let mut parse_trace: Vec = vec![]; for field in message_descriptor.fields() { - columns.push(pb_field_to_col_desc(&field, &mut index, &mut parse_trace)?); + columns.push(pb_field_to_col_desc( + &field, + &mut index, + &mut parse_trace, + messages_as_jsonb, + )?); } Ok(columns) @@ -43,15 +53,18 @@ fn pb_field_to_col_desc( field_descriptor: &FieldDescriptor, index: &mut i32, parse_trace: &mut Vec, + messages_as_jsonb: &HashSet, ) -> anyhow::Result { - let field_type = protobuf_type_mapping(field_descriptor, parse_trace) + let field_type = protobuf_type_mapping(field_descriptor, parse_trace, messages_as_jsonb) .context("failed to map protobuf type")?; - if let Kind::Message(m) = field_descriptor.kind() { + if let Kind::Message(m) = field_descriptor.kind() + && !messages_as_jsonb.contains(m.full_name()) + { let field_descs = if let DataType::List { .. } = field_type { vec![] } else { m.fields() - .map(|f| pb_field_to_col_desc(&f, index, parse_trace)) + .map(|f| pb_field_to_col_desc(&f, index, parse_trace, messages_as_jsonb)) .try_collect()? }; *index += 1; @@ -91,10 +104,12 @@ fn detect_loop_and_push( let identifier = format!("{}({})", fd.name(), fd.full_name()); if trace.iter().any(|s| s == identifier.as_str()) { bail_protobuf_type_error!( - "circular reference detected: {}, conflict with {}, kind {:?}", + "circular reference detected: {}, conflict with {}, kind {:?}. Adding {:?} to {:?} may help.", trace.iter().format("->"), identifier, fd.kind(), + fd.kind(), + PROTOBUF_MESSAGES_AS_JSONB, ); } trace.push(identifier); @@ -105,6 +120,7 @@ pub fn from_protobuf_value<'a>( field_desc: &FieldDescriptor, value: &'a Value, type_expected: &DataType, + messages_as_jsonb: &'a HashSet, ) -> AccessResult> { let kind = field_desc.kind(); @@ -135,7 +151,7 @@ pub fn from_protobuf_value<'a>( ScalarImpl::Utf8(enum_symbol.name().into()) } Value::Message(dyn_msg) => { - if dyn_msg.descriptor().full_name() == "google.protobuf.Any" { + if messages_as_jsonb.contains(dyn_msg.descriptor().full_name()) { ScalarImpl::Jsonb(JsonbVal::from( serde_json::to_value(dyn_msg).map_err(AccessError::ProtobufAnyToJson)?, )) @@ -158,8 +174,13 @@ pub fn from_protobuf_value<'a>( }; let value = dyn_msg.get_field(&field_desc); rw_values.push( - from_protobuf_value(&field_desc, &value, expected_field_type)? - .to_owned_datum(), + from_protobuf_value( + &field_desc, + &value, + expected_field_type, + messages_as_jsonb, + )? + .to_owned_datum(), ); } ScalarImpl::Struct(StructValue::new(rw_values)) @@ -175,7 +196,12 @@ pub fn from_protobuf_value<'a>( }; let mut builder = element_type.create_array_builder(values.len()); for value in values { - builder.append(from_protobuf_value(field_desc, value, element_type)?); + builder.append(from_protobuf_value( + field_desc, + value, + element_type, + messages_as_jsonb, + )?); } ScalarImpl::List(ListValue::new(builder.finish())) } @@ -208,11 +234,13 @@ pub fn from_protobuf_value<'a>( &map_desc.map_entry_key_field(), &key.clone().into(), map_type.key(), + messages_as_jsonb, )?); value_builder.append(from_protobuf_value( &map_desc.map_entry_value_field(), value, map_type.value(), + messages_as_jsonb, )?); } let keys = key_builder.finish(); @@ -230,6 +258,7 @@ pub fn from_protobuf_value<'a>( fn protobuf_type_mapping( field_descriptor: &FieldDescriptor, parse_trace: &mut Vec, + messages_as_jsonb: &HashSet, ) -> std::result::Result { detect_loop_and_push(parse_trace, field_descriptor)?; let mut t = match field_descriptor.kind() { @@ -244,20 +273,28 @@ fn protobuf_type_mapping( Kind::Uint64 | Kind::Fixed64 => DataType::Decimal, Kind::String => DataType::Varchar, Kind::Message(m) => { - if m.full_name() == "google.protobuf.Any" { + if messages_as_jsonb.contains(m.full_name()) { // Well-Known Types are identified by their full name DataType::Jsonb } else if m.is_map_entry() { // Map is equivalent to `repeated MapFieldEntry map_field = N;` debug_assert!(field_descriptor.is_map()); - let key = protobuf_type_mapping(&m.map_entry_key_field(), parse_trace)?; - let value = protobuf_type_mapping(&m.map_entry_value_field(), parse_trace)?; + let key = protobuf_type_mapping( + &m.map_entry_key_field(), + parse_trace, + messages_as_jsonb, + )?; + let value = protobuf_type_mapping( + &m.map_entry_value_field(), + parse_trace, + messages_as_jsonb, + )?; _ = parse_trace.pop(); return Ok(DataType::Map(MapType::from_kv(key, value))); } else { let fields = m .fields() - .map(|f| protobuf_type_mapping(&f, parse_trace)) + .map(|f| protobuf_type_mapping(&f, parse_trace, messages_as_jsonb)) .try_collect()?; let field_names = m.fields().map(|f| f.name().to_string()).collect_vec(); DataType::new_struct(fields, field_names) diff --git a/src/connector/codec/tests/integration_tests/protobuf.rs b/src/connector/codec/tests/integration_tests/protobuf.rs index 0b022f603a46d..5cee6110c287c 100644 --- a/src/connector/codec/tests/integration_tests/protobuf.rs +++ b/src/connector/codec/tests/integration_tests/protobuf.rs @@ -18,7 +18,7 @@ mod recursive; #[rustfmt::skip] #[allow(clippy::all)] mod all_types; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use anyhow::Context; use prost::Message; @@ -39,7 +39,10 @@ fn check( expected_risingwave_schema: expect_test::Expect, expected_risingwave_data: expect_test::Expect, ) { - let rw_schema = pb_schema_to_column_descs(&pb_schema); + let rw_schema = pb_schema_to_column_descs( + &pb_schema, + &HashSet::from(["google.protobuf.Any".to_owned()]), + ); if let Err(e) = rw_schema { expected_risingwave_schema.assert_eq(&e.to_report_string_pretty()); @@ -58,8 +61,12 @@ fn check( )); let mut data_str = vec![]; + let messages_as_jsonb = HashSet::from(["google.protobuf.Any".to_owned()]); for data in pb_data { - let access = ProtobufAccess::new(DynamicMessage::decode(pb_schema.clone(), *data).unwrap()); + let access = ProtobufAccess::new( + DynamicMessage::decode(pb_schema.clone(), *data).unwrap(), + &messages_as_jsonb, + ); let mut row = vec![]; for col in &rw_schema { let rw_data = access.access(&[&col.name], &col.data_type); @@ -235,11 +242,11 @@ fn test_any_schema() -> anyhow::Result<()> { // } static ANY_DATA_3: &[u8] = b"\x08\xb9\x60\x12\x32\x0a\x24\x74\x79\x70\x65\x2e\x67\x6f\x6f\x67\x6c\x65\x61\x70\x69\x73\x2e\x63\x6f\x6d\x2f\x74\x65\x73\x74\x2e\x53\x74\x72\x69\x6e\x67\x56\x61\x6c\x75\x65\x12\x0a\x0a\x08\x4a\x6f\x68\x6e\x20\x44\x6f\x65"; - // // id: 12345 - // // any_value: { - // // type_url: "type.googleapis.com/test.StringXalue" - // // value: "\n\010John Doe" - // // } + // id: 12345 + // any_value: { + // type_url: "type.googleapis.com/test.StringXalue" + // value: "\n\010John Doe" + // } static ANY_DATA_INVALID: &[u8] = b"\x08\xb9\x60\x12\x32\x0a\x24\x74\x79\x70\x65\x2e\x67\x6f\x6f\x67\x6c\x65\x61\x70\x69\x73\x2e\x63\x6f\x6d\x2f\x74\x65\x73\x74\x2e\x53\x74\x72\x69\x6e\x67\x58\x61\x6c\x75\x65\x12\x0a\x0a\x08\x4a\x6f\x68\x6e\x20\x44\x6f\x65"; // validate the binary data is correct @@ -449,7 +456,7 @@ fn test_any_schema() -> anyhow::Result<()> { expect![[r#" [ id(#1): Int32, - any_value(#4): Jsonb, type_name: google.protobuf.Any, field_descs: [type_url(#2): Varchar, value(#3): Bytea], + any_value(#2): Jsonb, ]"#]], expect![[r#" Owned(Int32(12345)) @@ -600,14 +607,14 @@ fn test_all_types() -> anyhow::Result<()> { seconds: Int64, nanos: Int32, }, type_name: google.protobuf.Duration, field_descs: [seconds(#30): Int64, nanos(#31): Int32], - any_field(#35): Jsonb, type_name: google.protobuf.Any, field_descs: [type_url(#33): Varchar, value(#34): Bytea], - int32_value_field(#37): Struct { value: Int32 }, type_name: google.protobuf.Int32Value, field_descs: [value(#36): Int32], - string_value_field(#39): Struct { value: Varchar }, type_name: google.protobuf.StringValue, field_descs: [value(#38): Varchar], - map_struct_field(#44): Map(Varchar,Struct { id: Int32, name: Varchar }), type_name: all_types.AllTypes.MapStructFieldEntry, field_descs: [key(#40): Varchar, value(#43): Struct { + any_field(#33): Jsonb, + int32_value_field(#35): Struct { value: Int32 }, type_name: google.protobuf.Int32Value, field_descs: [value(#34): Int32], + string_value_field(#37): Struct { value: Varchar }, type_name: google.protobuf.StringValue, field_descs: [value(#36): Varchar], + map_struct_field(#42): Map(Varchar,Struct { id: Int32, name: Varchar }), type_name: all_types.AllTypes.MapStructFieldEntry, field_descs: [key(#38): Varchar, value(#41): Struct { id: Int32, name: Varchar, - }, type_name: all_types.AllTypes.NestedMessage, field_descs: [id(#41): Int32, name(#42): Varchar]], - map_enum_field(#47): Map(Int32,Varchar), type_name: all_types.AllTypes.MapEnumFieldEntry, field_descs: [key(#45): Int32, value(#46): Varchar], + }, type_name: all_types.AllTypes.NestedMessage, field_descs: [id(#39): Int32, name(#40): Varchar]], + map_enum_field(#45): Map(Int32,Varchar), type_name: all_types.AllTypes.MapEnumFieldEntry, field_descs: [key(#43): Int32, value(#44): Varchar], ]"#]], expect![[r#" Owned(Float64(OrderedFloat(1.2345))) @@ -710,7 +717,7 @@ fn test_recursive() -> anyhow::Result<()> { failed to map protobuf type Caused by: - circular reference detected: parent(recursive.ComplexRecursiveMessage.parent)->siblings(recursive.ComplexRecursiveMessage.Parent.siblings), conflict with parent(recursive.ComplexRecursiveMessage.parent), kind recursive.ComplexRecursiveMessage.Parent + circular reference detected: parent(recursive.ComplexRecursiveMessage.parent)->siblings(recursive.ComplexRecursiveMessage.Parent.siblings), conflict with parent(recursive.ComplexRecursiveMessage.parent), kind recursive.ComplexRecursiveMessage.Parent. Adding recursive.ComplexRecursiveMessage.Parent to "messages_as_jsonb" may help. "#]], expect![""], ); diff --git a/src/connector/codec/tests/test_data/opentelemetry_common.proto b/src/connector/codec/tests/test_data/opentelemetry_common.proto new file mode 100644 index 0000000000000..356ac1293ca29 --- /dev/null +++ b/src/connector/codec/tests/test_data/opentelemetry_common.proto @@ -0,0 +1,81 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.common.v1; + +option csharp_namespace = "OpenTelemetry.Proto.Common.V1"; +option java_multiple_files = true; +option java_package = "io.opentelemetry.proto.common.v1"; +option java_outer_classname = "CommonProto"; +option go_package = "go.opentelemetry.io/proto/otlp/common/v1"; + +// AnyValue is used to represent any type of attribute value. AnyValue may contain a +// primitive value such as a string or integer or it may contain an arbitrary nested +// object containing arrays, key-value lists and primitives. +message AnyValue { + // The value is one of the listed fields. It is valid for all values to be unspecified + // in which case this AnyValue is considered to be "empty". + oneof value { + string string_value = 1; + bool bool_value = 2; + int64 int_value = 3; + double double_value = 4; + ArrayValue array_value = 5; + KeyValueList kvlist_value = 6; + bytes bytes_value = 7; + } +} + +// ArrayValue is a list of AnyValue messages. We need ArrayValue as a message +// since oneof in AnyValue does not allow repeated fields. +message ArrayValue { + // Array of values. The array may be empty (contain 0 elements). + repeated AnyValue values = 1; +} + +// KeyValueList is a list of KeyValue messages. We need KeyValueList as a message +// since `oneof` in AnyValue does not allow repeated fields. Everywhere else where we need +// a list of KeyValue messages (e.g. in Span) we use `repeated KeyValue` directly to +// avoid unnecessary extra wrapping (which slows down the protocol). The 2 approaches +// are semantically equivalent. +message KeyValueList { + // A collection of key/value pairs of key-value pairs. The list may be empty (may + // contain 0 elements). + // The keys MUST be unique (it is not allowed to have more than one + // value with the same key). + repeated KeyValue values = 1; +} + +// KeyValue is a key-value pair that is used to store Span attributes, Link +// attributes, etc. +message KeyValue { + string key = 1; + AnyValue value = 2; +} + +// InstrumentationScope is a message representing the instrumentation scope information +// such as the fully qualified name and version. +message InstrumentationScope { + // An empty instrumentation scope name means the name is unknown. + string name = 1; + string version = 2; + + // Additional attributes that describe the scope. [Optional]. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + repeated KeyValue attributes = 3; + uint32 dropped_attributes_count = 4; +} \ No newline at end of file diff --git a/src/connector/src/parser/config.rs b/src/connector/src/parser/config.rs index 52355ae058635..eed05d71e4b20 100644 --- a/src/connector/src/parser/config.rs +++ b/src/connector/src/parser/config.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + use risingwave_common::bail; use risingwave_common::secret::LocalSecretManager; use risingwave_connector_codec::decoder::avro::MapHandling; @@ -21,6 +23,7 @@ use super::utils::get_kafka_topic; use super::{DebeziumProps, TimestamptzHandling}; use crate::connector_common::AwsAuthProps; use crate::error::ConnectorResult; +use crate::parser::PROTOBUF_MESSAGES_AS_JSONB; use crate::schema::schema_registry::SchemaRegistryAuth; use crate::schema::AWS_GLUE_SCHEMA_ARN_KEY; use crate::source::{extract_source_struct, SourceColumnDesc, SourceEncode, SourceFormat}; @@ -186,6 +189,15 @@ impl SpecificParserConfig { if info.row_schema_location.is_empty() { bail!("protobuf file location not provided"); } + let mut messages_as_jsonb = if let Some(messages_as_jsonb) = + format_encode_options_with_secret.get(PROTOBUF_MESSAGES_AS_JSONB) + { + messages_as_jsonb.split(',').map(|s| s.to_owned()).collect() + } else { + HashSet::new() + }; + messages_as_jsonb.insert("google.protobuf.Any".to_owned()); + let mut config = ProtobufProperties { message_name: info.proto_message_name.clone(), use_schema_registry: info.use_schema_registry, @@ -193,6 +205,7 @@ impl SpecificParserConfig { name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy) .unwrap(), key_message_name: info.key_message_name.clone(), + messages_as_jsonb, ..Default::default() }; if format == SourceFormat::Upsert { @@ -322,6 +335,7 @@ pub struct ProtobufProperties { pub topic: String, pub key_message_name: Option, pub name_strategy: PbSchemaRegistryNameStrategy, + pub messages_as_jsonb: HashSet, } #[derive(Debug, Default, Clone, Copy)] diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index a82bed6b69e17..1e9589c15a3c6 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + use anyhow::Context; use prost_reflect::{DescriptorPool, DynamicMessage, FileDescriptor, MessageDescriptor}; use risingwave_common::{bail, try_match_expand}; -pub use risingwave_connector_codec::decoder::protobuf::parser::*; +pub use risingwave_connector_codec::decoder::protobuf::parser::{PROTOBUF_MESSAGES_AS_JSONB, *}; use risingwave_connector_codec::decoder::protobuf::ProtobufAccess; use risingwave_pb::plan_common::ColumnDesc; @@ -30,6 +32,10 @@ use crate::schema::SchemaLoader; pub struct ProtobufAccessBuilder { confluent_wire_type: bool, message_descriptor: MessageDescriptor, + + // A HashSet containing protobuf message type full names (e.g. "google.protobuf.Any") + // that should be mapped to JSONB type when storing in RisingWave + messages_as_jsonb: HashSet, } impl AccessBuilder for ProtobufAccessBuilder { @@ -44,7 +50,10 @@ impl AccessBuilder for ProtobufAccessBuilder { let message = DynamicMessage::decode(self.message_descriptor.clone(), payload) .context("failed to parse message")?; - Ok(AccessImpl::Protobuf(ProtobufAccess::new(message))) + Ok(AccessImpl::Protobuf(ProtobufAccess::new( + message, + &self.messages_as_jsonb, + ))) } } @@ -53,11 +62,13 @@ impl ProtobufAccessBuilder { let ProtobufParserConfig { confluent_wire_type, message_descriptor, + messages_as_jsonb, } = config; Ok(Self { confluent_wire_type, message_descriptor, + messages_as_jsonb, }) } } @@ -66,6 +77,7 @@ impl ProtobufAccessBuilder { pub struct ProtobufParserConfig { confluent_wire_type: bool, pub(crate) message_descriptor: MessageDescriptor, + messages_as_jsonb: HashSet, } impl ProtobufParserConfig { @@ -110,12 +122,14 @@ impl ProtobufParserConfig { Ok(Self { message_descriptor, confluent_wire_type: protobuf_config.use_schema_registry, + messages_as_jsonb: protobuf_config.messages_as_jsonb, }) } /// Maps the protobuf schema to relational schema. pub fn map_to_columns(&self) -> ConnectorResult> { - pb_schema_to_column_descs(&self.message_descriptor).map_err(|e| e.into()) + pb_schema_to_column_descs(&self.message_descriptor, &self.messages_as_jsonb) + .map_err(|e| e.into()) } } diff --git a/src/connector/src/parser/unified/mod.rs b/src/connector/src/parser/unified/mod.rs index adf32df572307..6a7e1f6fd9371 100644 --- a/src/connector/src/parser/unified/mod.rs +++ b/src/connector/src/parser/unified/mod.rs @@ -35,7 +35,7 @@ pub mod util; pub enum AccessImpl<'a> { Avro(AvroAccess<'a>), Bytes(BytesAccess<'a>), - Protobuf(ProtobufAccess), + Protobuf(ProtobufAccess<'a>), Json(JsonAccess<'a>), MongoJson(MongoJsonAccess>), } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 2fd87cc4dcae6..4d1e085d96ad7 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -38,7 +38,7 @@ use risingwave_connector::parser::additional_columns::{ use risingwave_connector::parser::{ fetch_json_schema_and_map_to_columns, AvroParserConfig, DebeziumAvroParserConfig, ProtobufParserConfig, SchemaLocation, SpecificParserConfig, TimestamptzHandling, - DEBEZIUM_IGNORE_KEY, + DEBEZIUM_IGNORE_KEY, PROTOBUF_MESSAGES_AS_JSONB, }; use risingwave_connector::schema::schema_registry::{ name_strategy_from_str, SchemaRegistryAuth, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME, @@ -155,6 +155,7 @@ async fn extract_avro_table_schema( let parser_config = SpecificParserConfig::new(info, with_properties)?; try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME); try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD); + try_consume_string_from_options(format_encode_options, PROTOBUF_MESSAGES_AS_JSONB); consume_aws_config_from_options(format_encode_options); let vec_column_desc = if is_debezium {