Skip to content

Commit

Permalink
feat: CloudWatch metrics ingestion (#3322)
Browse files Browse the repository at this point in the history
  • Loading branch information
taimingl committed Apr 25, 2024
1 parent 15b964e commit 4296f61
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 86 deletions.
38 changes: 32 additions & 6 deletions src/common/meta/ingestion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,9 @@ pub struct KFHRecordRequest {
pub data: String,
}

#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct KinesisFHData {
pub struct KinesisFHLogData {
pub log_events: Vec<KinesisFHLogEvent>,
pub log_group: String,
pub log_stream: String,
Expand All @@ -228,13 +228,38 @@ pub struct KinesisFHData {
pub subscription_filters: Vec<String>,
}

#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, ToSchema)]
pub struct KinesisFHLogEvent {
pub message: json::Value,
pub id: String,
pub timestamp: Option<i64>,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct KinesisFHMetricData {
#[serde(rename = "metric_stream_name")]
pub metric_stream_name: String,
#[serde(rename = "account_id")]
pub account_id: String,
pub region: String,
pub namespace: String,
#[serde(rename = "metric_name")]
pub metric_name: String,
pub dimensions: json::Value,
pub timestamp: i64,
pub value: KinesisFHMetricValue,
pub unit: String,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, ToSchema)]
pub struct KinesisFHMetricValue {
pub count: f32,
pub sum: f32,
pub max: f32,
pub min: f32,
}

#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct KinesisFHIngestionResponse {
Expand All @@ -244,11 +269,12 @@ pub struct KinesisFHIngestionResponse {
pub timestamp: i64,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
#[serde(untagged)]
pub enum AWSRecordType {
JSON,
Cloudwatch,
KinesisFHLogs(KinesisFHLogData),
KinesisFHMetrics(KinesisFHMetricData),
}

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
Expand Down
232 changes: 152 additions & 80 deletions src/service/logs/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ use crate::{
functions::{StreamTransform, VRLResultResolver},
ingestion::{
AWSRecordType, GCPIngestionResponse, IngestionData, IngestionDataIter, IngestionError,
IngestionRequest, IngestionResponse, KinesisFHData, KinesisFHIngestionResponse,
StreamStatus,
IngestionRequest, IngestionResponse, KinesisFHIngestionResponse, StreamStatus,
},
stream::{SchemaRecords, StreamParams},
},
Expand Down Expand Up @@ -376,7 +375,7 @@ impl<'a> IngestionData<'a> {
let request_id = &request.message.message_id;
let req_timestamp = &request.message.publish_time;
match decode_and_decompress(data) {
Ok((decompressed_data, _)) => {
Ok(decompressed_data) => {
let value: json::Value = json::from_str(&decompressed_data).unwrap();
IngestionDataIter::GCP(vec![value].into_iter(), None)
}
Expand Down Expand Up @@ -407,77 +406,20 @@ impl<'a> IngestionData<'a> {
}),
);
}
Ok((decompressed_data, record_type)) => {
let mut value;
// let mut timestamp;
if record_type.eq(&AWSRecordType::Cloudwatch) {
let kfh_data: KinesisFHData =
json::from_str(&decompressed_data).unwrap();

for event in kfh_data.log_events.iter() {
value = json::to_value(event).unwrap();
let local_val = value.as_object_mut().unwrap();

local_val.insert(
"requestId".to_owned(),
request.request_id.clone().into(),
Ok(decompressed_data) => {
match deserialize_from_str(&decompressed_data, request_id) {
Ok(parsed_events) => events.extend(parsed_events),
Err(err) => {
return IngestionDataIter::KinesisFH(
events.into_iter(),
Some(KinesisFHIngestionResponse {
request_id: request_id.to_string(),
error_message: Some(err.to_string()),
timestamp: req_timestamp,
}),
);
local_val.insert(
"messageType".to_owned(),
kfh_data.message_type.clone().into(),
);
local_val
.insert("owner".to_owned(), kfh_data.owner.clone().into());
local_val.insert(
"logGroup".to_owned(),
kfh_data.log_group.clone().into(),
);
local_val.insert(
"logStream".to_owned(),
kfh_data.log_stream.clone().into(),
);
local_val.insert(
"subscriptionFilters".to_owned(),
kfh_data.subscription_filters.clone().into(),
);

let local_msg = event.message.as_str().unwrap();

if local_msg.starts_with('{') && local_msg.ends_with('}') {
let result: Result<json::Value, json::Error> =
json::from_str(local_msg);

match result {
Err(_e) => {
local_val.insert(
"message".to_owned(),
event.message.clone(),
);
}
Ok(message_val) => {
local_val.insert(
"message".to_owned(),
message_val.clone(),
);
}
}
} else {
local_val.insert("message".to_owned(), local_msg.into());
}

local_val.insert(
CONFIG.common.column_timestamp.clone(),
event.timestamp.into(),
);

value = local_val.clone().into();

events.push(value);
}
} else {
value = json::from_str(&decompressed_data).unwrap();
events.push(value);
};
}
}
}
}
Expand All @@ -487,37 +429,137 @@ impl<'a> IngestionData<'a> {
}
}

pub fn decode_and_decompress(
encoded_data: &str,
) -> Result<(String, AWSRecordType), Box<dyn std::error::Error>> {
pub fn decode_and_decompress(encoded_data: &str) -> Result<String, Box<dyn std::error::Error>> {
let decoded_data = config::utils::base64::decode_raw(encoded_data)?;
let mut gz = GzDecoder::new(&decoded_data[..]);
let mut decompressed_data = String::new();
match gz.read_to_string(&mut decompressed_data) {
Ok(_) => Ok((decompressed_data, AWSRecordType::Cloudwatch)),
Err(_) => Ok((String::from_utf8(decoded_data)?, AWSRecordType::JSON)),
Ok(_) => Ok(decompressed_data),
Err(_) => Ok(String::from_utf8(decoded_data)?),
}
}

fn deserialize_from_str(data: &str, request_id: &str) -> Result<Vec<json::Value>> {
let mut events = vec![];
let mut value;
for line in data.lines() {
match json::from_str(line) {
Ok(AWSRecordType::KinesisFHLogs(kfh_log_data)) => {
for event in kfh_log_data.log_events.iter() {
value = json::to_value(event)?;
let local_val = value
.as_object_mut()
.ok_or(anyhow::anyhow!("Error to convert Value to object"))?;

local_val.insert("requestId".to_owned(), request_id.into());
local_val.insert(
"messageType".to_owned(),
kfh_log_data.message_type.clone().into(),
);
local_val.insert("owner".to_owned(), kfh_log_data.owner.clone().into());
local_val.insert("logGroup".to_owned(), kfh_log_data.log_group.clone().into());
local_val.insert(
"logStream".to_owned(),
kfh_log_data.log_stream.clone().into(),
);
local_val.insert(
"subscriptionFilters".to_owned(),
kfh_log_data.subscription_filters.clone().into(),
);

let local_msg = event.message.as_str().unwrap();

if local_msg.starts_with('{') && local_msg.ends_with('}') {
let result: Result<json::Value, json::Error> = json::from_str(local_msg);

match result {
Err(_e) => {
local_val.insert("message".to_owned(), event.message.clone());
}
Ok(message_val) => {
local_val.insert("message".to_owned(), message_val.clone());
}
}
} else {
local_val.insert("message".to_owned(), local_msg.into());
}

local_val.insert(
CONFIG.common.column_timestamp.clone(),
event.timestamp.into(),
);

value = local_val.clone().into();
events.push(value);
}
}
Ok(AWSRecordType::KinesisFHMetrics(kfh_metric_data)) => {
// Parse "dimensions" and "values" fields from KinesisFHMetricData
let values = json::to_value(kfh_metric_data.value.clone())?;
let dimensions = kfh_metric_data.dimensions.clone();
let timestamp = kfh_metric_data.timestamp;

let mut parsed_metric_value = json::to_value(kfh_metric_data)?;
let local_parsed_metric_value = parsed_metric_value.as_object_mut().ok_or(
anyhow::anyhow!("CloudWatch metrics failed to parse Metric Object"),
)?;

for (value_name, value_val) in values.as_object().ok_or(anyhow::anyhow!(
"CloudWatch metrics failed to Metric Value Object"
))? {
local_parsed_metric_value.insert(value_name.to_owned(), value_val.to_owned());
}
local_parsed_metric_value.remove("value");

let metric_dimensions = dimensions
.as_object()
.ok_or(anyhow::anyhow!(
"CloudWatch metrics failed to Metric dimensions Object"
))?
.iter()
.map(|(k, v)| format!("{}=[{}]", k, v))
.collect::<Vec<_>>()
.join(", ");

local_parsed_metric_value
.insert("metric_dimensions".to_owned(), metric_dimensions.into());
local_parsed_metric_value.remove("dimensions");

local_parsed_metric_value
.insert(CONFIG.common.column_timestamp.clone(), timestamp.into());
local_parsed_metric_value.remove("timestamp");

value = local_parsed_metric_value.clone().into();
events.push(value);
}
_ => {
value = json::from_str(line)?;
events.push(value);
}
}
}
Ok(events)
}

#[cfg(test)]
mod tests {
use super::decode_and_decompress;
use super::{decode_and_decompress, deserialize_from_str};

#[test]
fn test_decode_and_decompress_success() {
let encoded_data = "H4sIAAAAAAAAADWO0QqCMBiFX2XsOkKJZHkXot5YQgpdhMTSPzfSTbaZhPjuzbTLj3M45xtxC1rTGvJPB9jHQXrOL2lyP4VZdoxDvMFyEKDmpJF9NVBTskTW2gaNrGMl+85mC2VGAW0X1P1Dl4p3hksR8caA0ti/Fb9e+AZhZhwxr5a64VbD0NaOuR5xPLJzycEh+81fbxa4JmjVQ6uejwIG5YuLGjGgjWFIPlFll7ig8zOKuAImNWzxVExfL8ipzewAAAA=";
let expected = "{\"messageType\":\"CONTROL_MESSAGE\",\"owner\":\"CloudwatchLogs\",\"logGroup\":\"\",\"logStream\":\"\",\"subscriptionFilters\":[],\"logEvents\":[{\"id\":\"\",\"timestamp\":1680683189085,\"message\":\"CWL CONTROL MESSAGE: Checking health of destination Firehose.\"}]}";
let result =
decode_and_decompress(encoded_data).expect("Failed to decode and decompress data");
assert_eq!(result.0, expected);
assert_eq!(result, expected);
}

#[test]
fn test_decode_success() {
let encoded_data = "eyJtZXNzYWdlIjoiMiAwNTg2OTQ4NTY0NzYgZW5pLTAzYzBmNWJhNzlhNjZlZjE3IDEwLjMuMTY2LjcxIDEwLjMuMTQxLjIwOSA0NDMgMzg2MzQgNiAxMDMgNDI5MjYgMTY4MDgzODU1NiAxNjgwODM4NTc4IEFDQ0VQVCBPSyJ9Cg==";
let expected = "{\"message\":\"2 058694856476 eni-03c0f5ba79a66ef17 10.3.166.71 10.3.141.209 443 38634 6 103 42926 1680838556 1680838578 ACCEPT OK\"}\n";
let result = decode_and_decompress(encoded_data).expect("Failed to decode data");
assert_eq!(result.0, expected);
assert_eq!(result, expected);
}

#[test]
Expand All @@ -529,4 +571,34 @@ mod tests {
"Expected an error due to invalid base64 input"
);
}

#[test]
fn test_deserialize_from_str_metrics() {
let encoded_data = "eyJtZXRyaWNfc3RyZWFtX25hbWUiOiJDdXN0b21QYXJ0aWFsLUJDbjVjQSIsImFjY291bnRfaWQiOiI3MzkxNDcyMjI5ODkiLCJyZWdpb24iOiJ1cy1lYXN0LTIiLCJuYW1lc3BhY2UiOiJBV1MvVXNhZ2UiLCJtZXRyaWNfbmFtZSI6IkNhbGxDb3VudCIsImRpbWVuc2lvbnMiOnsiQ2xhc3MiOiJOb25lIiwiUmVzb3VyY2UiOiJHZXRNZXRyaWNEYXRhIiwiU2VydmljZSI6IkNsb3VkV2F0Y2giLCJUeXBlIjoiQVBJIn0sInRpbWVzdGFtcCI6MTcxMzkwMjcwMDAwMCwidmFsdWUiOnsibWF4IjoxLjAsIm1pbiI6MS4wLCJzdW0iOjMuMCwiY291bnQiOjMuMH0sInVuaXQiOiJOb25lIn0KeyJtZXRyaWNfc3RyZWFtX25hbWUiOiJDdXN0b21QYXJ0aWFsLUJDbjVjQSIsImFjY291bnRfaWQiOiI3MzkxNDcyMjI5ODkiLCJyZWdpb24iOiJ1cy1lYXN0LTIiLCJuYW1lc3BhY2UiOiJBV1MvRmlyZWhvc2UiLCJtZXRyaWNfbmFtZSI6IktNU0tleUludmFsaWRTdGF0ZSIsImRpbWVuc2lvbnMiOnsiRGVsaXZlcnlTdHJlYW1OYW1lIjoiUFVULUhUUC1SZFFXOCJ9LCJ0aW1lc3RhbXAiOjE3MTM5MDI2NDAwMDAsInZhbHVlIjp7Im1heCI6MC4wLCJtaW4iOjAuMCwic3VtIjowLjAsImNvdW50Ijo2MC4wfSwidW5pdCI6IkNvdW50In0KeyJtZXRyaWNfc3RyZWFtX25hbWUiOiJDdXN0b21QYXJ0aWFsLUJDbjVjQSIsImFjY291bnRfaWQiOiI3MzkxNDcyMjI5ODkiLCJyZWdpb24iOiJ1cy1lYXN0LTIiLCJuYW1lc3BhY2UiOiJBV1MvRmlyZWhvc2UiLCJtZXRyaWNfbmFtZSI6IktNU0tleU5vdEZvdW5kIiwiZGltZW5zaW9ucyI6eyJEZWxpdmVyeVN0cmVhbU5hbWUiOiJQVVQtSFRQLVJkUVc4In0sInRpbWVzdGFtcCI6MTcxMzkwMjY0MDAwMCwidmFsdWUiOnsibWF4IjowLjAsIm1pbiI6MC4wLCJzdW0iOjAuMCwiY291bnQiOjYwLjB9LCJ1bml0IjoiQ291bnQifQo=";
let decoded = decode_and_decompress(encoded_data);
assert!(decoded.is_ok());
let decoded = decoded.unwrap();
let request_id = "test_id".to_string();
let result = deserialize_from_str(&decoded, &request_id);
assert!(result.is_ok());
let value = result.unwrap();
for val in value {
assert_eq!(val.get("account_id").unwrap(), "739147222989");
}
}

#[test]
fn test_deserialize_from_str_logs() {
let encoded_data = "eyJtZXNzYWdlVHlwZSI6IkRBVEFfTUVTU0FHRSIsIm93bmVyIjoiMTIzNDU2Nzg5MDEyIiwibG9nR3JvdXAiOiJsb2dfZ3JvdXBfbmFtZSIsImxvZ1N0cmVhbSI6ImxvZ19zdHJlYW1fbmFtZSIsInN1YnNjcmlwdGlvbkZpbHRlcnMiOlsic3Vic2NyaXB0aW9uX2ZpbHRlcl9uYW1lIl0sImxvZ0V2ZW50cyI6W3siaWQiOiIwMTIzNDU2Nzg5MDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTIzNDU2Nzg5MDEyMzQ1IiwidGltZXN0YW1wIjoxNzEzOTgzNDQ2LCJtZXNzYWdlIjoibG9nbWVzc2FnZTEifSx7ImlkIjoiMDEyMzQ1Njc4OTAxMjM0NTY3ODkwMTIzNDU2Nzg5MDEyMzQ1Njc4OTAxMjM0NSIsInRpbWVzdGFtcCI6IDE3MTM5ODM0NDYsIm1lc3NhZ2UiOiJsb2dtZXNzYWdlMiJ9XX0=";
let decoded = decode_and_decompress(encoded_data);
assert!(decoded.is_ok());
let decoded = decoded.unwrap();
let request_id = "test_id".to_string();
let result = deserialize_from_str(&decoded, &request_id);
assert!(result.is_ok());
let result = result.unwrap();
for val in result {
assert_eq!(val.get("owner").unwrap(), "123456789012");
}
}
}

0 comments on commit 4296f61

Please sign in to comment.