Skip to content

Commit

Permalink
fix: format stream name for _bulk api (#3371)
Browse files Browse the repository at this point in the history
  • Loading branch information
oasisk committed Apr 28, 2024
1 parent d5bfef2 commit 15f9788
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 7 deletions.
16 changes: 13 additions & 3 deletions src/handler/http/request/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{
};

use actix_web::{delete, get, http, put, web, HttpRequest, HttpResponse, Responder};
use config::meta::stream::{StreamSettings, StreamType};
use config::meta::stream::{RoutingCondition, StreamSettings, StreamType};

use crate::{
common::{
Expand All @@ -30,7 +30,7 @@ use crate::{
},
utils::http::get_stream_type_from_request,
},
service::stream,
service::{format_stream_name, stream},
};

/// GetSchema
Expand Down Expand Up @@ -93,10 +93,11 @@ async fn schema(
#[put("/{org_id}/streams/{stream_name}/settings")]
async fn settings(
path: web::Path<(String, String)>,
settings: web::Json<StreamSettings>,
mut settings: web::Json<StreamSettings>,
req: HttpRequest,
) -> Result<HttpResponse, Error> {
let (org_id, stream_name) = path.into_inner();
let stream_name = format_stream_name(&stream_name);
let query = web::Query::<HashMap<String, String>>::from_query(req.query_string()).unwrap();
let stream_type = match get_stream_type_from_request(&query) {
Ok(v) => {
Expand All @@ -123,6 +124,15 @@ async fn settings(
);
}
};
// if routing is provided format the key using format_stream_name
if let Some(ref mut routing_map) = settings.routing {
let new_routing: hashbrown::HashMap<String, Vec<RoutingCondition>> = routing_map
.drain()
.map(|(key, value)| (format_stream_name(&key), value))
.collect();
settings.routing = Some(new_routing);
}

let stream_type = stream_type.unwrap_or(StreamType::Logs);
stream::save_stream_settings(&org_id, &stream_name, stream_type, settings.into_inner()).await
}
Expand Down
2 changes: 1 addition & 1 deletion src/handler/http/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ async fn audit_middleware(
if res.response().error().is_none() {
let body = if path.ends_with("/settings/logo") {
// Binary data, encode it with base64
general_purpose::STANDARD.encode(request_body.to_vec())
general_purpose::STANDARD.encode(&request_body)
} else {
String::from_utf8(request_body.to_vec()).unwrap()
};
Expand Down
4 changes: 2 additions & 2 deletions src/service/ingestion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::{
},
utils::functions::get_vrl_compiler_config,
},
service::{db, format_partition_key},
service::{db, format_partition_key, format_stream_name},
};

pub mod grpc;
Expand Down Expand Up @@ -433,7 +433,7 @@ pub async fn get_stream_routing(
.unwrap_or_default()
.iter()
.map(|(k, v)| Routing {
destination: k.to_string(),
destination: format_stream_name(k),
routing: v.clone(),
})
.collect();
Expand Down
4 changes: 3 additions & 1 deletion src/service/logs/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::{
stream::StreamParams,
},
service::{
db,
db, format_stream_name,
ingestion::{evaluate_trigger, write_file, TriggerAlertData},
metadata::{distinct_values::DvItem, write, MetadataItem, MetadataType},
schema::{
Expand Down Expand Up @@ -132,6 +132,8 @@ pub async fn ingest(
}
(action, stream_name, doc_id) = ret.unwrap();

stream_name = format_stream_name(&stream_name);

// skip blocked streams
let key = format!("{org_id}/{}/{stream_name}", StreamType::Logs);
if BLOCKED_STREAMS.contains(&key.as_str()) {
Expand Down

0 comments on commit 15f9788

Please sign in to comment.