From 16a0d4417d644cf5e16847228f417c900a4fa8f7 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Fri, 27 Jan 2023 17:24:41 +0100 Subject: [PATCH] ref(replay): More efficient deserialization (#1782) After deploying https://github.com/getsentry/relay/pull/1678, we saw a rise in memory consumption. We narrowed down the reason to deserialization of replay recordings, so this PR attempts to replace those deserializers with more efficient versions that do not parse an entire `serde_json::Value` to get the tag (`type`, `source`) of the enum. A custom deserializer is necessary because serde does not support [integer tags for internally tagged enums](https://github.com/serde-rs/serde/issues/745). - [x] Custom deserializer for `NodeVariant`, based on serde's own `derive(Deserialize)` of internally tagged enums. - [x] Custom deserializer for `recording::Event`, based on serde's own `derive(Deserialize)` of internally tagged enums. - [x] Custom deserializer for `IncrementalSourceDataVariant`, based on serde's own `derive(Deserialize)` of internally tagged enums. - [x] Box all enum variants. ### Benchmark comparison Ran a criterion benchmark on `rrweb.json`. It does not tell us anything about memory consumption, but the reduced cpu usage points to simpler deserialization: #### Before ``` rrweb/1 time: [142.37 ms 148.17 ms 155.61 ms] ``` #### After ``` rrweb/1 time: [31.474 ms 31.801 ms 32.137 ms] ``` #skip-changelog --------- Co-authored-by: Colton Allen Co-authored-by: Oleksandr <1931331+olksdr@users.noreply.github.com> --- Cargo.lock | 131 ++++++---- relay-general/Cargo.toml | 2 +- relay-metrics/Cargo.toml | 2 +- relay-replays/Cargo.toml | 5 + relay-replays/benches/benchmarks.rs | 14 + .../src/{recording.rs => recording/mod.rs} | 198 ++++---------- relay-replays/src/recording/serialization.rs | 246 ++++++++++++++++++ relay-server/src/actors/processor.rs | 4 +- tests/integration/test_replay_recordings.py | 14 +- 9 files changed, 411 insertions(+), 205 deletions(-) create mode 100644 relay-replays/benches/benchmarks.rs rename relay-replays/src/{recording.rs => recording/mod.rs} (76%) create mode 100644 relay-replays/src/recording/serialization.rs diff --git a/Cargo.lock b/Cargo.lock index d1f9eb1452..a602dc86b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -169,6 +169,12 @@ dependencies = [ "serde", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "ansi_term" version = "0.12.1" @@ -307,7 +313,7 @@ dependencies = [ "bitflags", "cexpr", "clang-sys", - "clap", + "clap 2.34.0", "env_logger 0.9.0", "lazy_static", "lazycell", @@ -389,7 +395,6 @@ dependencies = [ "lazy_static", "memchr", "regex-automata", - "serde", ] [[package]] @@ -450,12 +455,9 @@ dependencies = [ [[package]] name = "cast" -version = "0.2.7" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c24dab4283a142afa2fdca129b80ad2c6284e073930f964c3a1293c225ee39a" -dependencies = [ - "rustc_version 0.4.0", -] +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" @@ -501,6 +503,33 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "ciborium" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c137568cc60b904a7724001b35ce2630fd00d5d84805fbb608ab89509d788f" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "346de753af073cc87b52b2083a506b38ac176a44cfb05497b622e27be899b369" + +[[package]] +name = "ciborium-ll" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213030a2b5a4e0c0892b6652260cf6ccac84827b83a85a534e178e3906c4cf1b" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "clang-sys" version = "1.3.3" @@ -523,11 +552,32 @@ dependencies = [ "bitflags", "strsim", "term_size", - "textwrap", + "textwrap 0.11.0", "unicode-width", "vec_map", ] +[[package]] +name = "clap" +version = "3.2.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71655c45cb9845d3270c9d6df84ebe72b4dad3c2ba3f7023ad47c144e4e473a5" +dependencies = [ + "bitflags", + "clap_lex", + "indexmap", + "textwrap 0.16.0", +] + +[[package]] +name = "clap_lex" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5" +dependencies = [ + "os_str_bytes", +] + [[package]] name = "clear_on_drop" version = "0.2.5" @@ -667,15 +717,16 @@ dependencies = [ [[package]] name = "criterion" -version = "0.3.5" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1604dafd25fba2fe2d5895a9da139f8dc9b319a5fe5354ca137cbbce4e178d10" +checksum = "e7c76e09c1aae2bc52b3d2f29e13c6572553b30c4aa1b8a49fd70de6412654cb" dependencies = [ + "anes", "atty", "cast", - "clap", + "ciborium", + "clap 3.2.23", "criterion-plot", - "csv", "itertools 0.10.3", "lazy_static", "num-traits 0.2.15", @@ -684,7 +735,6 @@ dependencies = [ "rayon", "regex", "serde", - "serde_cbor", "serde_derive", "serde_json", "tinytemplate", @@ -693,9 +743,9 @@ dependencies = [ [[package]] name = "criterion-plot" -version = "0.4.4" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d00996de9f2f7559f7f4dc286073197f83e92256a59ed395f9aac01fe717da57" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" dependencies = [ "cast", "itertools 0.10.3", @@ -823,28 +873,6 @@ dependencies = [ "subtle 1.0.0", ] -[[package]] -name = "csv" -version = "1.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1" -dependencies = [ - "bstr", - "csv-core", - "itoa 0.4.8", - "ryu", - "serde", -] - -[[package]] -name = "csv-core" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" -dependencies = [ - "memchr", -] - [[package]] name = "curve25519-dalek" version = "1.2.6" @@ -2427,6 +2455,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "os_str_bytes" +version = "6.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" + [[package]] name = "owning_ref" version = "0.4.1" @@ -3150,7 +3184,7 @@ name = "relay" version = "23.1.1" dependencies = [ "anyhow", - "clap", + "clap 2.34.0", "console 0.10.3", "dialoguer", "futures 0.1.31", @@ -3434,6 +3468,7 @@ name = "relay-replays" version = "23.1.1" dependencies = [ "assert-json-diff", + "criterion", "flate2", "insta", "rand 0.8.5", @@ -3474,7 +3509,7 @@ dependencies = [ "brotli2", "bytes 0.4.12", "chrono", - "clap", + "clap 2.34.0", "data-encoding", "failure", "flate2", @@ -3977,16 +4012,6 @@ dependencies = [ "serde_derive", ] -[[package]] -name = "serde_cbor" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5" -dependencies = [ - "half", - "serde", -] - [[package]] name = "serde_derive" version = "1.0.137" @@ -4242,7 +4267,7 @@ version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c6b5c64445ba8094a6ab0c3cd2ad323e07171012d9c98b0b15651daf1787a10" dependencies = [ - "clap", + "clap 2.34.0", "lazy_static", "paw", "structopt-derive", @@ -4412,6 +4437,12 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "textwrap" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" + [[package]] name = "thiserror" version = "1.0.37" diff --git a/relay-general/Cargo.toml b/relay-general/Cargo.toml index ee31f7d113..536ed282a8 100644 --- a/relay-general/Cargo.toml +++ b/relay-general/Cargo.toml @@ -42,7 +42,7 @@ utf16string = "0.2.0" uuid = { version = "0.8.1", features = ["v4", "serde"] } [dev-dependencies] -criterion = "0.3" +criterion = "0.4" insta = { version = "1.19.0", features = ["json", "redactions", "ron", "yaml"] } pretty-hex = "0.2.0" similar-asserts = "1.4.2" diff --git a/relay-metrics/Cargo.toml b/relay-metrics/Cargo.toml index 5edd461a7d..3382f94be2 100644 --- a/relay-metrics/Cargo.toml +++ b/relay-metrics/Cargo.toml @@ -23,7 +23,7 @@ thiserror = "1.0.20" tokio = { version = "1.0", features = ["macros", "time"] } [dev-dependencies] -criterion = "0.3" +criterion = "0.4" insta = "1.19.0" relay-statsd = { path = "../relay-statsd", features = ["test"] } relay-test = { path = "../relay-test" } diff --git a/relay-replays/Cargo.toml b/relay-replays/Cargo.toml index 27c59f7b5a..acc4ed3011 100644 --- a/relay-replays/Cargo.toml +++ b/relay-replays/Cargo.toml @@ -22,5 +22,10 @@ unicase = "2.6.0" flate2 = "1.0.19" [dev-dependencies] +criterion = "0.4" insta = { version = "1.1.0", features = ["ron"] } assert-json-diff = "2.0.2" + +[[bench]] +name = "benchmarks" +harness = false diff --git a/relay-replays/benches/benchmarks.rs b/relay-replays/benches/benchmarks.rs new file mode 100644 index 0000000000..de7514fb3d --- /dev/null +++ b/relay-replays/benches/benchmarks.rs @@ -0,0 +1,14 @@ +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; + +use relay_replays::recording::_deserialize_event; + +fn bench_recording(c: &mut Criterion) { + let payload = include_bytes!("../tests/fixtures/rrweb.json"); + + c.bench_with_input(BenchmarkId::new("rrweb", 1), &payload, |b, &_| { + b.iter(|| _deserialize_event(payload)); + }); +} + +criterion_group!(benches, bench_recording); +criterion_main!(benches); diff --git a/relay-replays/src/recording.rs b/relay-replays/src/recording/mod.rs similarity index 76% rename from relay-replays/src/recording.rs rename to relay-replays/src/recording/mod.rs index d8d122dd41..00d48da348 100644 --- a/relay-replays/src/recording.rs +++ b/relay-replays/src/recording/mod.rs @@ -10,9 +10,11 @@ use relay_general::processor::{ use relay_general::types::{Meta, ProcessingAction}; use flate2::{read::ZlibDecoder, write::ZlibEncoder, Compression}; -use serde::{de::Error as DError, Deserialize, Serialize}; +use serde::{Deserialize, Serialize}; use serde_json::Value; +mod serialization; + /// Parses compressed replay recording payloads and applies data scrubbers. /// /// `limit` controls the maximum size in bytes during decompression. This function returns an `Err` @@ -158,9 +160,10 @@ impl RecordingProcessor<'_> { } } NodeVariant::T2(element) => self.recurse_element(element)?, - NodeVariant::Rest(text) => { + NodeVariant::T3(text) | NodeVariant::T4(text) | NodeVariant::T5(text) => { self.strip_pii(&mut text.text_content)?; } + _ => {} } @@ -232,56 +235,19 @@ impl RecordingProcessor<'_> { /// -> CUSTOM = 5 /// -> PLUGIN = 6 -#[derive(Debug, Serialize)] -#[serde(untagged)] +#[derive(Debug)] enum Event { - T2(FullSnapshotEvent), - T3(IncrementalSnapshotEvent), - T4(MetaEvent), - T5(CustomEvent), - Default(Value), - // 0: DOMContentLoadedEvent, - // 1: LoadEvent, - // 6: PluginEvent, -} - -impl<'de> serde::Deserialize<'de> for Event { - fn deserialize>(d: D) -> Result { - let value = Value::deserialize(d)?; - - match value.get("type") { - Some(val) => match Value::as_u64(val) { - Some(v) => match v { - 2 => match FullSnapshotEvent::deserialize(value) { - Ok(event) => Ok(Event::T2(event)), - Err(_) => Err(DError::custom("could not parse snapshot event")), - }, - 3 => match IncrementalSnapshotEvent::deserialize(value) { - Ok(event) => Ok(Event::T3(event)), - Err(_) => Err(DError::custom("could not parse incremental snapshot event")), - }, - 4 => match MetaEvent::deserialize(value) { - Ok(event) => Ok(Event::T4(event)), - Err(_) => Err(DError::custom("could not parse meta event")), - }, - 5 => match CustomEvent::deserialize(value) { - Ok(event) => Ok(Event::T5(event)), - Err(e) => Err(DError::custom(e.to_string())), - }, - 0 | 1 | 6 => Ok(Event::Default(value)), - _ => Err(DError::custom("invalid type value")), - }, - None => Err(DError::custom("type field must be an integer")), - }, - None => Err(DError::missing_field("type")), - } - } + T0(Value), // 0: DOMContentLoadedEvent, + T1(Value), // 1: LoadEvent, + T2(Box), + T3(Box), + T4(Box), + T5(Box), + T6(Value), // 6: PluginEvent, } #[derive(Debug, Serialize, Deserialize)] struct FullSnapshotEvent { - #[serde(rename = "type")] - ty: u8, timestamp: u64, data: FullSnapshotEventData, } @@ -295,24 +261,18 @@ struct FullSnapshotEventData { #[derive(Debug, Serialize, Deserialize)] struct IncrementalSnapshotEvent { - #[serde(rename = "type")] - ty: u8, timestamp: u64, data: IncrementalSourceDataVariant, } #[derive(Debug, Serialize, Deserialize)] struct MetaEvent { - #[serde(rename = "type")] - ty: u8, timestamp: u64, data: Value, } #[derive(Debug, Serialize, Deserialize)] struct CustomEvent { - #[serde(rename = "type")] - ty: u8, timestamp: f64, data: CustomEventDataVariant, } @@ -321,9 +281,9 @@ struct CustomEvent { #[serde(untagged)] enum CustomEventDataVariant { #[serde(rename = "breadcrumb")] - Breadcrumb(Breadcrumb), + Breadcrumb(Box), #[serde(rename = "performanceSpan")] - PerformanceSpan(PerformanceSpan), + PerformanceSpan(Box), } #[derive(Debug, Serialize, Deserialize)] @@ -391,52 +351,20 @@ struct Node { variant: NodeVariant, } -#[derive(Debug, Serialize)] -#[serde(untagged)] +#[derive(Debug)] + enum NodeVariant { - T0(DocumentNode), - T1(DocumentTypeNode), - T2(ElementNode), - Rest(TextNode), // types 3 (text), 4 (cdata), 5 (comment) -} - -impl<'de> serde::Deserialize<'de> for NodeVariant { - fn deserialize>(d: D) -> Result { - let value = Value::deserialize(d)?; - - match value.get("type") { - Some(val) => match Value::as_u64(val) { - Some(v) => match v { - 0 => match DocumentNode::deserialize(value) { - Ok(document) => Ok(NodeVariant::T0(document)), - Err(_) => Err(DError::custom("could not parse document object.")), - }, - 1 => match DocumentTypeNode::deserialize(value) { - Ok(document_type) => Ok(NodeVariant::T1(document_type)), - Err(_) => Err(DError::custom("could not parse document-type object")), - }, - 2 => match ElementNode::deserialize(value) { - Ok(element) => Ok(NodeVariant::T2(element)), - Err(_) => Err(DError::custom("could not parse element object")), - }, - 3 | 4 | 5 => match TextNode::deserialize(value) { - Ok(text) => Ok(NodeVariant::Rest(text)), - Err(_) => Err(DError::custom("could not parse text object")), - }, - _ => Err(DError::custom("invalid type value")), - }, - None => Err(DError::custom("type field must be an integer")), - }, - None => Err(DError::missing_field("type")), - } - } + T0(Box), + T1(Box), + T2(Box), + T3(Box), // text + T4(Box), // cdata + T5(Box), // comment } #[derive(Debug, Serialize, Deserialize)] struct DocumentNode { id: i32, - #[serde(rename = "type")] - ty: u8, #[serde(rename = "childNodes")] child_nodes: Vec, } @@ -444,8 +372,6 @@ struct DocumentNode { #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct DocumentTypeNode { - #[serde(rename = "type")] - ty: u8, id: Value, public_id: Value, system_id: Value, @@ -456,8 +382,6 @@ struct DocumentTypeNode { #[serde(rename_all = "camelCase")] struct ElementNode { id: Value, - #[serde(rename = "type")] - ty: u8, attributes: HashMap, tag_name: String, child_nodes: Vec, @@ -471,8 +395,6 @@ struct ElementNode { #[serde(rename_all = "camelCase")] struct TextNode { id: Value, - #[serde(rename = "type")] - ty: u8, text_content: String, #[serde(skip_serializing_if = "Option::is_none")] is_style: Option, @@ -499,42 +421,16 @@ struct TextNode { /// -> DRAG = 12 /// -> STYLEDECLARATION = 13 -#[derive(Debug, Serialize)] -#[serde(untagged)] +#[derive(Debug)] enum IncrementalSourceDataVariant { - Mutation(MutationIncrementalSourceData), - Input(InputIncrementalSourceData), - Default(Value), -} - -impl<'de> serde::Deserialize<'de> for IncrementalSourceDataVariant { - fn deserialize>(d: D) -> Result { - let value = Value::deserialize(d)?; - - match value.get("source") { - Some(val) => match Value::as_u64(val) { - Some(v) => match v { - 0 => match MutationIncrementalSourceData::deserialize(value) { - Ok(document) => Ok(IncrementalSourceDataVariant::Mutation(document)), - Err(_) => Err(DError::custom("could not parse mutation object.")), - }, - 5 => match InputIncrementalSourceData::deserialize(value) { - Ok(document_type) => Ok(IncrementalSourceDataVariant::Input(document_type)), - Err(_) => Err(DError::custom("could not parse input object")), - }, - _ => Ok(IncrementalSourceDataVariant::Default(value)), - }, - None => Err(DError::custom("type field must be an integer")), - }, - None => Err(DError::missing_field("type")), - } - } + Mutation(Box), + Input(Box), + Default(Box), } #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct InputIncrementalSourceData { - source: u8, id: i32, text: String, is_checked: Value, @@ -546,7 +442,6 @@ struct InputIncrementalSourceData { #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct MutationIncrementalSourceData { - source: u8, texts: Vec, attributes: Vec, removes: Vec, @@ -555,6 +450,12 @@ struct MutationIncrementalSourceData { is_attach_iframe: Option, } +#[derive(Debug)] +struct DefaultIncrementalSourceData { + pub source: u8, + pub value: Value, +} + #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct MutationAdditionIncrementalSourceData { @@ -658,7 +559,7 @@ mod tests { #[test] fn test_pii_credit_card_removal() { - let payload = include_bytes!("../tests/fixtures/rrweb-pii.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb-pii.json"); let mut events: Vec = serde_json::from_slice(payload).unwrap(); recording::strip_pii(&mut events).unwrap(); @@ -669,7 +570,7 @@ mod tests { let dd = cc.adds.pop().unwrap(); if let recording::NodeVariant::T2(mut ee) = dd.node.variant { let ff = ee.child_nodes.pop().unwrap(); - if let recording::NodeVariant::Rest(gg) = ff.variant { + if let recording::NodeVariant::T3(gg) = ff.variant { assert_eq!(gg.text_content, "[creditcard]"); return; } @@ -681,7 +582,7 @@ mod tests { #[test] fn test_scrub_pii_navigation() { - let payload = include_bytes!("../tests/fixtures/rrweb-performance-navigation.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb-performance-navigation.json"); let mut events: Vec = serde_json::from_slice(payload).unwrap(); recording::strip_pii(&mut events).unwrap(); @@ -702,7 +603,7 @@ mod tests { #[test] fn test_scrub_pii_resource() { - let payload = include_bytes!("../tests/fixtures/rrweb-performance-resource.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb-performance-resource.json"); let mut events: Vec = serde_json::from_slice(payload).unwrap(); recording::strip_pii(&mut events).unwrap(); @@ -723,7 +624,7 @@ mod tests { #[test] fn test_pii_ip_address_removal() { - let payload = include_bytes!("../tests/fixtures/rrweb-pii-ip-address.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb-pii-ip-address.json"); let mut events: Vec = serde_json::from_slice(payload).unwrap(); recording::strip_pii(&mut events).unwrap(); @@ -734,7 +635,7 @@ mod tests { let dd = cc.adds.pop().unwrap(); if let recording::NodeVariant::T2(mut ee) = dd.node.variant { let ff = ee.child_nodes.pop().unwrap(); - if let recording::NodeVariant::Rest(gg) = ff.variant { + if let recording::NodeVariant::T3(gg) = ff.variant { assert_eq!(gg.text_content, "[ip]"); return; } @@ -746,7 +647,7 @@ mod tests { #[test] fn test_rrweb_snapshot_parsing() { - let payload = include_bytes!("../tests/fixtures/rrweb.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb.json"); let input_parsed = loads(payload).unwrap(); let input_raw: Value = serde_json::from_slice(payload).unwrap(); @@ -755,7 +656,7 @@ mod tests { #[test] fn test_rrweb_incremental_source_parsing() { - let payload = include_bytes!("../tests/fixtures/rrweb-diff.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb-diff.json"); let input_parsed = loads(payload).unwrap(); let input_raw: Value = serde_json::from_slice(payload).unwrap(); @@ -765,7 +666,7 @@ mod tests { // Node coverage #[test] fn test_rrweb_node_2_parsing() { - let payload = include_bytes!("../tests/fixtures/rrweb-node-2.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb-node-2.json"); let input_parsed: recording::NodeVariant = serde_json::from_slice(payload).unwrap(); let input_raw: Value = serde_json::from_slice(payload).unwrap(); @@ -774,18 +675,19 @@ mod tests { #[test] fn test_rrweb_node_2_style_parsing() { - let payload = include_bytes!("../tests/fixtures/rrweb-node-2-style.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb-node-2-style.json"); let input_parsed: recording::NodeVariant = serde_json::from_slice(payload).unwrap(); let input_raw: Value = serde_json::from_slice(payload).unwrap(); - assert_json_eq!(input_parsed, input_raw) + serde_json::to_string_pretty(&input_parsed).unwrap(); + assert_json_eq!(input_parsed, input_raw); } // Event coverage #[test] fn test_rrweb_event_3_parsing() { - let payload = include_bytes!("../tests/fixtures/rrweb-event-3.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb-event-3.json"); let input_parsed: recording::Event = serde_json::from_slice(payload).unwrap(); let input_raw: Value = serde_json::from_slice(payload).unwrap(); @@ -794,10 +696,16 @@ mod tests { #[test] fn test_rrweb_event_5_parsing() { - let payload = include_bytes!("../tests/fixtures/rrweb-event-5.json"); + let payload = include_bytes!("../../tests/fixtures/rrweb-event-5.json"); let input_parsed: Vec = serde_json::from_slice(payload).unwrap(); let input_raw: Value = serde_json::from_slice(payload).unwrap(); assert_json_eq!(input_parsed, input_raw); } } + +#[doc(hidden)] +/// Only used in benchmarks. +pub fn _deserialize_event(payload: &[u8]) { + let _: Vec = serde_json::from_slice(payload).unwrap(); +} diff --git a/relay-replays/src/recording/serialization.rs b/relay-replays/src/recording/serialization.rs new file mode 100644 index 0000000000..b42d955907 --- /dev/null +++ b/relay-replays/src/recording/serialization.rs @@ -0,0 +1,246 @@ +use serde::{Deserialize, Deserializer, Serialize}; +use serde_json::Value; + +use crate::recording::*; + +/// Implementation tweaked from serde's `derive(Deserialize)` for internally tagged enums, +/// in order to work with integer tags. +impl<'de> Deserialize<'de> for Event { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let tagged = match Deserializer::deserialize_any( + d, + // NOTE: Use of this private API is discouraged by serde, but we need it for + // efficient deserialization of these large, recursive structures into + // internally tagged enums with integer tags. + // Ideally, we would write our own `derive` for this, or contribute to serde + // to support integer tags out of the box. + serde::__private::de::TaggedContentVisitor::::new( + "type", + "internally tagged enum Event", + ), + ) { + Ok(val) => val, + Err(err) => return Err(err), + }; + let content_deserializer = + serde::__private::de::ContentDeserializer::::new(tagged.content); + match tagged.tag { + 0 => Value::deserialize(content_deserializer).map(Event::T0), + 1 => Value::deserialize(content_deserializer).map(Event::T1), + 2 => Box::::deserialize(content_deserializer).map(Event::T2), + 3 => Box::::deserialize(content_deserializer).map(Event::T3), + 4 => Box::::deserialize(content_deserializer).map(Event::T4), + 5 => Box::::deserialize(content_deserializer).map(Event::T5), + 6 => Value::deserialize(content_deserializer).map(Event::T6), + value => Err(serde::de::Error::invalid_value( + serde::de::Unexpected::Unsigned(value as u64), + &"type id 0 <= i < 7", + )), + } + } +} + +/// Helper for [`Event`] serialization. +#[derive(Serialize)] +#[serde(untagged)] +enum InnerEvent<'a> { + T0(&'a Value), // 0: DOMContentLoadedEvent, + T1(&'a Value), // 1: LoadEvent, + T2(&'a FullSnapshotEvent), + T3(&'a IncrementalSnapshotEvent), + T4(&'a MetaEvent), + T5(&'a CustomEvent), + T6(&'a Value), // 6: PluginEvent, +} + +/// Helper for [`Event`] serialization. +#[derive(Serialize)] +struct OuterEvent<'a> { + #[serde(rename = "type")] + ty: u8, + #[serde(flatten)] + inner: InnerEvent<'a>, +} + +impl<'a> OuterEvent<'a> { + fn new(ty: u8, inner: InnerEvent<'a>) -> Self { + Self { ty, inner } + } +} + +impl Serialize for Event { + fn serialize(&self, s: S) -> Result + where + S: serde::Serializer, + { + match self { + Event::T0(c) => OuterEvent::new(0, InnerEvent::T0(c)), + Event::T1(c) => OuterEvent::new(1, InnerEvent::T1(c)), + Event::T2(c) => OuterEvent::new(2, InnerEvent::T2(c)), + Event::T3(c) => OuterEvent::new(3, InnerEvent::T3(c)), + Event::T4(c) => OuterEvent::new(4, InnerEvent::T4(c)), + Event::T5(c) => OuterEvent::new(5, InnerEvent::T5(c)), + Event::T6(c) => OuterEvent::new(6, InnerEvent::T6(c)), + } + .serialize(s) + } +} + +/// Implementation tweaked from serde's `derive(Deserialize)` for internally tagged enums, +/// in order to work with integer tags. +impl<'de> Deserialize<'de> for NodeVariant { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let tagged = match Deserializer::deserialize_any( + d, + serde::__private::de::TaggedContentVisitor::::new( + "type", + "internally tagged enum NodeVariant", + ), + ) { + Ok(val) => val, + Err(err) => return Err(err), + }; + + let content_deserializer = + serde::__private::de::ContentDeserializer::::new(tagged.content); + match tagged.tag { + 0 => Box::::deserialize(content_deserializer).map(NodeVariant::T0), + 1 => Box::::deserialize(content_deserializer).map(NodeVariant::T1), + 2 => Box::::deserialize(content_deserializer).map(NodeVariant::T2), + 3 => Box::::deserialize(content_deserializer).map(NodeVariant::T3), + 4 => Box::::deserialize(content_deserializer).map(NodeVariant::T4), + 5 => Box::::deserialize(content_deserializer).map(NodeVariant::T5), + value => Err(serde::de::Error::invalid_value( + serde::de::Unexpected::Unsigned(value as u64), + &"type id 0 <= i < 6", + )), + } + } +} + +/// Helper for [`NodeVariant`] serialization. +#[derive(Serialize)] +#[serde(untagged)] +enum InnerNodeVariant<'a> { + T0(&'a DocumentNode), + T1(&'a DocumentTypeNode), + T2(&'a ElementNode), + T3(&'a TextNode), // text + T4(&'a TextNode), // cdata + T5(&'a TextNode), // comment +} + +/// Helper for [`NodeVariant`] serialization. +#[derive(Serialize)] +struct OuterNodeVariant<'a> { + #[serde(rename = "type")] + ty: u8, + #[serde(flatten)] + inner: InnerNodeVariant<'a>, +} + +impl<'a> OuterNodeVariant<'a> { + fn new(ty: u8, inner: InnerNodeVariant<'a>) -> Self { + Self { ty, inner } + } +} + +impl Serialize for NodeVariant { + fn serialize(&self, s: S) -> Result + where + S: serde::Serializer, + { + match self { + NodeVariant::T0(c) => OuterNodeVariant::new(0, InnerNodeVariant::T0(c)), + NodeVariant::T1(c) => OuterNodeVariant::new(1, InnerNodeVariant::T1(c)), + NodeVariant::T2(c) => OuterNodeVariant::new(2, InnerNodeVariant::T2(c)), + NodeVariant::T3(c) => OuterNodeVariant::new(3, InnerNodeVariant::T3(c)), + NodeVariant::T4(c) => OuterNodeVariant::new(4, InnerNodeVariant::T4(c)), + NodeVariant::T5(c) => OuterNodeVariant::new(5, InnerNodeVariant::T5(c)), + } + .serialize(s) + } +} + +/// Implementation tweaked from serde's `derive(Deserialize)` for internally tagged enums, +/// in order to work with integer tags. +impl<'de> Deserialize<'de> for IncrementalSourceDataVariant { + fn deserialize(d: D) -> Result + where + D: Deserializer<'de>, + { + let tagged = match Deserializer::deserialize_any( + d, + serde::__private::de::TaggedContentVisitor::::new( + "source", + "internally tagged enum IncrementalSourceDataVariant", + ), + ) { + Ok(val) => val, + Err(err) => return Err(err), + }; + let content_deserializer = + serde::__private::de::ContentDeserializer::::new(tagged.content); + match tagged.tag { + 0 => Box::::deserialize(content_deserializer) + .map(IncrementalSourceDataVariant::Mutation), + 5 => Box::::deserialize(content_deserializer) + .map(IncrementalSourceDataVariant::Input), + source => Value::deserialize(content_deserializer).map(|value| { + IncrementalSourceDataVariant::Default(Box::new(DefaultIncrementalSourceData { + source, + value, + })) + }), + } + } +} + +/// Helper for [`IncrementalSourceDataVariant`] serialization. +#[derive(Serialize)] +#[serde(untagged)] +enum InnerISDV<'a> { + Mutation(&'a MutationIncrementalSourceData), + Input(&'a InputIncrementalSourceData), + Default(&'a Value), +} + +/// Helper for [`IncrementalSourceDataVariant`] serialization. +#[derive(Serialize)] +struct OuterISDV<'a> { + source: u8, + #[serde(flatten)] + inner: InnerISDV<'a>, +} + +impl<'a> OuterISDV<'a> { + fn new(source: u8, inner: InnerISDV<'a>) -> Self { + Self { source, inner } + } +} + +impl Serialize for IncrementalSourceDataVariant { + fn serialize(&self, s: S) -> Result + where + S: serde::Serializer, + { + match self { + IncrementalSourceDataVariant::Mutation(m) => { + OuterISDV::new(0, InnerISDV::Mutation(m.as_ref())) + } + IncrementalSourceDataVariant::Input(i) => { + OuterISDV::new(5, InnerISDV::Input(i.as_ref())) + } + IncrementalSourceDataVariant::Default(v) => { + OuterISDV::new(v.source, InnerISDV::Default(&v.value)) + } + } + .serialize(s) + } +} diff --git a/relay-server/src/actors/processor.rs b/relay-server/src/actors/processor.rs index 020d117f1d..aac5fbbe54 100644 --- a/relay-server/src/actors/processor.rs +++ b/relay-server/src/actors/processor.rs @@ -1093,9 +1093,7 @@ impl EnvelopeProcessorService { } } ItemType::ReplayRecording => { - // XXX: Temporarily, only the Sentry org will be allowed to parse replays while - // we measure the impact of this change. - if replays_enabled && state.project_state.organization_id == Some(1) { + if replays_enabled { // Limit expansion of recordings to the max replay size. The payload is // decompressed temporarily and then immediately re-compressed. However, to // limit memory pressure, we use the replay limit as a good overall limit for diff --git a/tests/integration/test_replay_recordings.py b/tests/integration/test_replay_recordings.py index 3aca9ebc68..72c6f4775b 100644 --- a/tests/integration/test_replay_recordings.py +++ b/tests/integration/test_replay_recordings.py @@ -1,4 +1,4 @@ -import time +import zlib from sentry_sdk.envelope import Envelope, Item, PayloadRef @@ -92,8 +92,6 @@ def test_chunked_replay_recordings_processing( assert replay_recording["received"] assert type(replay_recording["received"]) == int - outcomes_consumer.assert_empty() - def test_nonchunked_replay_recordings_processing( mini_sentry, relay_with_processing, replay_recordings_consumer, outcomes_consumer @@ -117,7 +115,8 @@ def test_nonchunked_replay_recordings_processing( ["attachment_type", "replay_recording"], ] ) - envelope.add_item(Item(payload=PayloadRef(bytes=b"test"), type="replay_recording")) + payload = recording_payload(b"[]") + envelope.add_item(Item(payload=PayloadRef(bytes=payload), type="replay_recording")) relay.send_envelope(project_id, envelope) @@ -129,7 +128,12 @@ def test_nonchunked_replay_recordings_processing( assert replay_recording["org_id"] == org_id assert type(replay_recording["received"]) == int assert replay_recording["retention_days"] == 90 - assert replay_recording["payload"] == b"test" + assert replay_recording["payload"] == payload assert replay_recording["type"] == "replay_recording_not_chunked" outcomes_consumer.assert_empty() + + +def recording_payload(bits: bytes): + compressed_payload = zlib.compress(bits) + return b'{"segment_id": 0}\n' + compressed_payload