Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: message_as_jsonb to handle circle dep in protobuf #19935

Merged
merged 22 commits into from
Dec 31, 2024
26 changes: 17 additions & 9 deletions src/connector/codec/src/decoder/protobuf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

pub mod parser;
use std::borrow::Cow;
use std::collections::HashSet;
use std::sync::LazyLock;

use parser::from_protobuf_value;
Expand All @@ -24,13 +25,17 @@ use thiserror_ext::AsReport;

use super::{uncategorized, Access, AccessResult};

pub struct ProtobufAccess {
pub struct ProtobufAccess<'a> {
message: DynamicMessage,
struct_as_jsonb: &'a HashSet<String>,
}

impl ProtobufAccess {
pub fn new(message: DynamicMessage) -> Self {
Self { message }
impl<'a> ProtobufAccess<'a> {
pub fn new(message: DynamicMessage, struct_as_jsonb: &'a HashSet<String>) -> Self {
Self {
message,
struct_as_jsonb,
}
}

#[cfg(test)]
Expand All @@ -39,7 +44,7 @@ impl ProtobufAccess {
}
}

impl Access for ProtobufAccess {
impl Access for ProtobufAccess<'_> {
fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
debug_assert_eq!(1, path.len());
let field_desc = self
Expand All @@ -56,12 +61,15 @@ impl Access for ProtobufAccess {
})?;

match self.message.get_field(&field_desc) {
Cow::Borrowed(value) => from_protobuf_value(&field_desc, value, type_expected),
Cow::Borrowed(value) => {
from_protobuf_value(&field_desc, value, type_expected, self.struct_as_jsonb)
}

// `Owned` variant occurs only if there's no such field and the default value is returned.
Cow::Owned(value) => from_protobuf_value(&field_desc, &value, type_expected)
// enforce `Owned` variant to avoid returning a reference to a temporary value
.map(|d| d.to_owned_datum().into()),
Cow::Owned(value) => {
from_protobuf_value(&field_desc, &value, type_expected, self.struct_as_jsonb)
.map(|d| d.to_owned_datum().into())
}
}
}
}
24 changes: 20 additions & 4 deletions src/connector/codec/src/decoder/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use anyhow::Context;
use itertools::Itertools;
use prost_reflect::{Cardinality, FieldDescriptor, Kind, MessageDescriptor, ReflectMessage, Value};
Expand Down Expand Up @@ -106,6 +108,7 @@ pub fn from_protobuf_value<'a>(
field_desc: &FieldDescriptor,
value: &'a Value,
type_expected: &DataType,
struct_as_jsonb: &'a HashSet<String>,
) -> AccessResult<DatumCow<'a>> {
let kind = field_desc.kind();

Expand Down Expand Up @@ -136,7 +139,8 @@ pub fn from_protobuf_value<'a>(
ScalarImpl::Utf8(enum_symbol.name().into())
}
Value::Message(dyn_msg) => {
if dyn_msg.descriptor().full_name() == "google.protobuf.Any" {
let msg_full_name = dyn_msg.descriptor().full_name().to_owned();
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
if struct_as_jsonb.contains(&msg_full_name) {
ScalarImpl::Jsonb(JsonbVal::from(
serde_json::to_value(dyn_msg).map_err(AccessError::ProtobufAnyToJson)?,
))
Expand All @@ -159,8 +163,13 @@ pub fn from_protobuf_value<'a>(
};
let value = dyn_msg.get_field(&field_desc);
rw_values.push(
from_protobuf_value(&field_desc, &value, expected_field_type)?
.to_owned_datum(),
from_protobuf_value(
&field_desc,
&value,
expected_field_type,
struct_as_jsonb,
)?
.to_owned_datum(),
);
}
ScalarImpl::Struct(StructValue::new(rw_values))
Expand All @@ -176,7 +185,12 @@ pub fn from_protobuf_value<'a>(
};
let mut builder = element_type.create_array_builder(values.len());
for value in values {
builder.append(from_protobuf_value(field_desc, value, element_type)?);
builder.append(from_protobuf_value(
field_desc,
value,
element_type,
struct_as_jsonb,
)?);
}
ScalarImpl::List(ListValue::new(builder.finish()))
}
Expand Down Expand Up @@ -209,11 +223,13 @@ pub fn from_protobuf_value<'a>(
&map_desc.map_entry_key_field(),
&key.clone().into(),
map_type.key(),
struct_as_jsonb,
)?);
value_builder.append(from_protobuf_value(
&map_desc.map_entry_value_field(),
value,
map_type.value(),
struct_as_jsonb,
)?);
}
let keys = key_builder.finish();
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ pub struct ProtobufProperties {
pub topic: String,
pub key_message_name: Option<String>,
pub name_strategy: PbSchemaRegistryNameStrategy,
pub message_as_jsonb: Option<String>,
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Debug, Default, Clone, Copy)]
Expand Down
18 changes: 17 additions & 1 deletion src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use anyhow::Context;
use prost_reflect::{DescriptorPool, DynamicMessage, FileDescriptor, MessageDescriptor};
use risingwave_common::{bail, try_match_expand};
Expand All @@ -30,6 +32,7 @@ use crate::schema::SchemaLoader;
pub struct ProtobufAccessBuilder {
confluent_wire_type: bool,
message_descriptor: MessageDescriptor,
struct_as_jsonb: HashSet<String>,
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
}

impl AccessBuilder for ProtobufAccessBuilder {
Expand All @@ -44,7 +47,10 @@ impl AccessBuilder for ProtobufAccessBuilder {
let message = DynamicMessage::decode(self.message_descriptor.clone(), payload)
.context("failed to parse message")?;

Ok(AccessImpl::Protobuf(ProtobufAccess::new(message)))
Ok(AccessImpl::Protobuf(ProtobufAccess::new(
message,
&self.struct_as_jsonb,
)))
}
}

Expand All @@ -53,11 +59,19 @@ impl ProtobufAccessBuilder {
let ProtobufParserConfig {
confluent_wire_type,
message_descriptor,
struct_as_jsonb_str,
} = config;

let mut struct_as_jsonb: HashSet<String> = struct_as_jsonb_str
.split(',')
.map(|s| s.to_owned())
.collect();
struct_as_jsonb.insert("google.protobuf.Any".to_owned());

Ok(Self {
confluent_wire_type,
message_descriptor,
struct_as_jsonb,
})
}
}
Expand All @@ -66,6 +80,7 @@ impl ProtobufAccessBuilder {
pub struct ProtobufParserConfig {
confluent_wire_type: bool,
pub(crate) message_descriptor: MessageDescriptor,
struct_as_jsonb_str: String,
}

impl ProtobufParserConfig {
Expand Down Expand Up @@ -110,6 +125,7 @@ impl ProtobufParserConfig {
Ok(Self {
message_descriptor,
confluent_wire_type: protobuf_config.use_schema_registry,
struct_as_jsonb_str: protobuf_config.message_as_jsonb.unwrap_or_default(),
})
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/unified/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub mod util;
pub enum AccessImpl<'a> {
Avro(AvroAccess<'a>),
Bytes(BytesAccess<'a>),
Protobuf(ProtobufAccess),
Protobuf(ProtobufAccess<'a>),
Json(JsonAccess<'a>),
MongoJson(MongoJsonAccess<JsonAccess<'a>>),
}
Expand Down
Loading