Skip to content

Commit

Permalink
feat(profiles): Add dataset configurations and processor for profile …
Browse files Browse the repository at this point in the history
…chunks (#5897)
  • Loading branch information
phacops committed May 16, 2024
1 parent c885744 commit 219fd2c
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 0 deletions.
2 changes: 2 additions & 0 deletions rust_snuba/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod functions;
mod generic_metrics;
mod metrics_summaries;
mod outcomes;
mod profile_chunks;
mod profiles;
mod querylog;
mod release_health_metrics;
Expand Down Expand Up @@ -62,6 +63,7 @@ define_processing_functions! {
("GenericGaugesMetricsProcessor", "snuba-generic-metrics", ProcessingFunctionType::ProcessingFunction(generic_metrics::process_gauge_message)),
("PolymorphicMetricsProcessor", "snuba-metrics", ProcessingFunctionType::ProcessingFunction(release_health_metrics::process_metrics_message)),
("ErrorsProcessor", "events", ProcessingFunctionType::ProcessingFunctionWithReplacements(errors::process_message_with_replacement)),
("ProfileChunksProcessor", "snuba-profile-chunks", ProcessingFunctionType::ProcessingFunction(profile_chunks::process_message)),
}

// COGS is recorded for these processors
Expand Down
91 changes: 91 additions & 0 deletions rust_snuba/src/processors/profile_chunks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use crate::config::ProcessorConfig;
use anyhow::Context;
use chrono::DateTime;
use rust_arroyo::backends::kafka::types::KafkaPayload;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::processors::utils::enforce_retention;
use crate::types::{InsertBatch, KafkaMessageMetadata};

pub fn process_message(
payload: KafkaPayload,
metadata: KafkaMessageMetadata,
config: &ProcessorConfig,
) -> anyhow::Result<InsertBatch> {
let payload_bytes = payload.payload().context("Expected payload")?;
let msg: InputMessage = serde_json::from_slice(payload_bytes)?;

let mut row = Chunk {
chunk: msg,
offset: metadata.offset,
partition: metadata.partition,
};

row.chunk.retention_days = Some(enforce_retention(
row.chunk.retention_days,
&config.env_config,
));

let origin_timestamp = DateTime::from_timestamp(row.chunk.received, 0);

InsertBatch::from_rows([row], origin_timestamp)
}

#[derive(Debug, Deserialize, Serialize, JsonSchema)]
struct InputMessage {
project_id: u64,
profiler_id: Uuid,
chunk_id: Uuid,
start_timestamp: f64,
end_timestamp: f64,
received: i64,
retention_days: Option<u16>,
}

#[derive(Debug, Deserialize, Serialize, JsonSchema)]
struct Chunk {
#[serde(flatten)]
chunk: InputMessage,

#[serde(default)]
offset: u64,
#[serde(default)]
partition: u16,
}

#[cfg(test)]
mod tests {
use std::time::SystemTime;

use crate::processors::tests::run_schema_type_test;

use super::*;

#[test]
fn test_chunk() {
let data = r#"{
"chunk_id": "0432a0a4c25f4697bf9f0a2fcbe6a814",
"end_timestamp": 1710805689.1234567,
"profiler_id": "4d229f1d3807421ba62a5f8bc295d836",
"project_id": 1,
"received": 1694357860,
"retention_days": 30,
"start_timestamp": 1710805688.1234567
}"#;
let payload = KafkaPayload::new(None, None, Some(data.as_bytes().to_vec()));
let meta = KafkaMessageMetadata {
partition: 0,
offset: 1,
timestamp: DateTime::from(SystemTime::now()),
};
process_message(payload, meta, &ProcessorConfig::default())
.expect("The message should be processed");
}

#[test]
fn schema() {
run_schema_type_test::<InputMessage>("snuba-profile-chunks", None);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
source: src/processors/mod.rs
description: "{\n \"project_id\": 1,\n \"profiler_id\": \"4d229f1d3807421ba62a5f8bc295d836\",\n \"chunk_id\": \"0432a0a4c25f4697bf9f0a2fcbe6a814\",\n \"start_timestamp\": 1710805688.1234567,\n \"end_timestamp\": 1710805689.1234567,\n \"received\": 1694357860,\n \"retention_days\": 30\n}\n"
expression: snapshot_payload
---
[
{
"chunk_id": "0432a0a4-c25f-4697-bf9f-0a2fcbe6a814",
"end_timestamp": 1710805689.1234567,
"offset": 1,
"partition": 0,
"profiler_id": "4d229f1d-3807-421b-a62a-5f8bc295d836",
"project_id": 1,
"received": 1694357860,
"retention_days": 30,
"start_timestamp": 1710805688.1234567
}
]
55 changes: 55 additions & 0 deletions snuba/datasets/configuration/profiles/storages/chunks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
version: v1
kind: writable_storage
name: profile_chunks
storage:
key: profile_chunks
set_key: profile_chunks
readiness_state: partial
schema:
columns:
[
{ name: project_id, type: UInt, args: { size: 64 } },
{ name: profiler_id, type: UUID },
{ name: chunk_id, type: UUID },
{ name: start_timestamp, type: DateTime64, args: { precision: 6 } },
{ name: end_timestamp, type: DateTime64, args: { precision: 6 } },
{ name: retention_days, type: UInt, args: { size: 16 } },
{ name: partition, type: UInt, args: { size: 16 } },
{ name: offset, type: UInt, args: { size: 64 } },
]
local_table_name: profile_chunks_local
dist_table_name: profile_chunks_dist
query_processors:
- processor: UUIDColumnProcessor
args:
columns: !!set
profiler_id: null
chunk_id: null
- processor: TableRateLimit
allocation_policies:
- name: ConcurrentRateLimitAllocationPolicy
args:
required_tenant_types:
- referrer
- project_id
default_config_overrides:
is_enforced: 1
- name: BytesScannedWindowAllocationPolicy
args:
required_tenant_types:
- referrer
default_config_overrides:
is_enforced: 1
throttled_thread_number: 1
org_limit_bytes_scanned: 100000
- name: ReferrerGuardRailPolicy
args:
required_tenant_types:
- referrer
default_config_overrides:
is_enforced: 1
mandatory_condition_checkers:
- condition: ProjectIdEnforcer
stream_loader:
processor: ProfileChunksProcessor
default_topic: snuba-profile-chunks
6 changes: 6 additions & 0 deletions snuba/datasets/processors/profile_chunks_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from snuba.datasets.processors.rust_compat_processor import RustCompatProcessor


class ProfileChunksProcessor(RustCompatProcessor):
def __init__(self) -> None:
super().__init__("ProfileChunksProcessor")
3 changes: 3 additions & 0 deletions snuba/utils/streams/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ class Topic(Enum):
SUBSCRIPTION_RESULTS_GENERIC_METRICS = "generic-metrics-subscription-results"

QUERYLOG = "snuba-queries"

PROFILES = "processed-profiles"
PROFILES_FUNCTIONS = "profiles-call-tree"
PROFILE_CHUNKS = "snuba-profile-chunks"

REPLAYEVENTS = "ingest-replay-events"
GENERIC_METRICS = "snuba-generic-metrics"
GENERIC_METRICS_SETS_COMMIT_LOG = "snuba-generic-metrics-sets-commit-log"
Expand Down

0 comments on commit 219fd2c

Please sign in to comment.