Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add some debug log #3432

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
125af68
feat: add some debug log
domyway May 8, 2024
b07e3fa
Merge branch 'main' into feat/malformed_header
hengfeiyang May 8, 2024
ed0db9d
feat: add some debug log
domyway May 8, 2024
1c64af9
feat: add some debug log
domyway May 9, 2024
4f99852
feat: add some debug log
domyway May 9, 2024
d7bea41
feat: add debug log and traces/metrics message size config
domyway May 9, 2024
bad900d
feat: remove some log
domyway May 9, 2024
8e443a1
feat: update
domyway May 9, 2024
bbcecbf
force run codebuild
domyway May 9, 2024
920c11c
Merge remote-tracking branch 'origin/feat/malformed_header' into feat…
domyway May 9, 2024
30bcc14
feat: add some log
domyway May 10, 2024
7d731a5
feat: add some log
domyway May 10, 2024
de5734e
feat: add log
domyway May 10, 2024
771beb5
feat: add some debug log
domyway May 11, 2024
adb0649
Merge branch 'main' into feat/malformed_header
hengfeiyang May 11, 2024
87c1573
Merge branch 'main' into feat/malformed_header
hengfeiyang May 12, 2024
5e42c3e
feat: add some log
domyway May 12, 2024
e7a847c
Merge remote-tracking branch 'origin/feat/malformed_header' into feat…
domyway May 12, 2024
66525d1
feat: remove wal write
domyway May 13, 2024
25d7536
Merge branch 'main' into feat/malformed_header
hengfeiyang May 13, 2024
2a96cdb
feat: remove wal sync
domyway May 13, 2024
4aa152a
Merge remote-tracking branch 'origin/feat/malformed_header' into feat…
domyway May 13, 2024
babdd8f
feat: add some debug log
domyway May 13, 2024
f4023d3
feat: change mutext lib
domyway May 13, 2024
1f4f6bc
feat: recover wal write
domyway May 13, 2024
da82866
Merge branch 'main' into feat/malformed_header
hengfeiyang May 13, 2024
40588b0
feat: add wal write metric
domyway May 13, 2024
ee60dfe
Merge remote-tracking branch 'origin/feat/malformed_header' into feat…
domyway May 13, 2024
25eba81
feat: update wal metrics bucket
domyway May 13, 2024
4fba6b6
feat: as_millis
domyway May 13, 2024
f029fe1
feat: remove debug log
domyway May 15, 2024
be54e20
feat: update metric bucket
domyway May 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 78 additions & 1 deletion src/config/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,32 @@ pub static INGEST_MEMTABLE_FILES: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("Metric created")
});

pub static INGEST_MEMTABLE_LOCK_TIME: Lazy<HistogramVec> = Lazy::new(|| {
HistogramVec::new(
HistogramOpts::new("ingest_memtable_lock_time", "ingest memtable lock time")
.namespace(NAMESPACE)
.buckets(vec![
0.2, 0.5, 1.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0,
])
.const_labels(create_const_labels()),
&["organization"],
)
.expect("Metric created")
});

pub static INGEST_WAL_LOCK_TIME: Lazy<HistogramVec> = Lazy::new(|| {
HistogramVec::new(
HistogramOpts::new("ingest_wal_lock_time", "ingest wal lock time")
.namespace(NAMESPACE)
.buckets(vec![
0.2, 0.5, 1.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0,
])
.const_labels(create_const_labels()),
&["organization"],
)
.expect("Metric created")
});

// querier memory cache stats
pub static QUERY_MEMORY_CACHE_LIMIT_BYTES: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Expand Down Expand Up @@ -540,6 +566,44 @@ pub static MEMORY_USAGE: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("Metric created")
});

pub static SPAN_CALLS: Lazy<IntCounterVec> = Lazy::new(|| {
IntCounterVec::new(
Opts::new("span_calls", "span calls total")
.namespace(NAMESPACE)
.const_labels(create_const_labels()),
&[
"organization",
"stream",
"service_name",
"operation_name",
"status_code",
"span_kind",
],
)
.expect("Metric created")
});

pub static SPAN_DURATION_MILLISECONDS: Lazy<HistogramVec> = Lazy::new(|| {
HistogramVec::new(
HistogramOpts::new("span_duration_milliseconds", "span duration milliseconds")
.namespace(NAMESPACE)
.buckets(vec![
0.5, 1.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0,
10000.0, 60000.0,
])
.const_labels(create_const_labels()),
&[
"organization",
"stream",
"service_name",
"operation_name",
"status_code",
"span_kind",
],
)
.expect("Metric created")
});

fn register_metrics(registry: &Registry) {
// http latency
registry
Expand Down Expand Up @@ -582,7 +646,12 @@ fn register_metrics(registry: &Registry) {
registry
.register(Box::new(INGEST_MEMTABLE_FILES.clone()))
.expect("Metric registered");

registry
.register(Box::new(INGEST_MEMTABLE_LOCK_TIME.clone()))
.expect("Metric registered");
registry
.register(Box::new(INGEST_WAL_LOCK_TIME.clone()))
.expect("Metric registered");
// querier stats
registry
.register(Box::new(QUERY_MEMORY_CACHE_LIMIT_BYTES.clone()))
Expand Down Expand Up @@ -679,6 +748,14 @@ fn register_metrics(registry: &Registry) {
registry
.register(Box::new(MEMORY_USAGE.clone()))
.expect("Metric registered");

// span metrics
registry
.register(Box::new(SPAN_CALLS.clone()))
.expect("Metric registered");
registry
.register(Box::new(SPAN_DURATION_MILLISECONDS.clone()))
.expect("Metric registered");
}

fn create_const_labels() -> HashMap<String, String> {
Expand Down
9 changes: 8 additions & 1 deletion src/handler/grpc/request/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use config::CONFIG;
use config::{ider, CONFIG};
use opentelemetry_proto::tonic::collector::trace::v1::{
trace_service_server::TraceService, ExportTraceServiceRequest, ExportTraceServiceResponse,
};
Expand Down Expand Up @@ -51,12 +51,19 @@ impl TraceService for TraceServer {
in_stream_name = Some(stream_name.to_str().unwrap());
};

let default_session_id = tonic::metadata::MetadataValue::try_from(ider::uuid()).unwrap();
let session_id = metadata
.get("session_id")
.unwrap_or(&default_session_id)
.to_str()
.unwrap();
let resp = handle_trace_request(
org_id.unwrap().to_str().unwrap(),
0,
in_req,
true,
in_stream_name,
session_id,
)
.await;
if resp.is_ok() {
Expand Down
2 changes: 1 addition & 1 deletion src/ingester/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ serde_json.workspace = true
snafu.workspace = true
tokio.workspace = true
async-walkdir.workspace = true

futures-locks = "0.7.1"
[[bin]]
name = "wal-reader"
path = "bin/wal-reader.rs"
3 changes: 3 additions & 0 deletions src/ingester/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct Entry {
pub partition_key: Arc<str>, // 2023/12/18/00/country=US/state=CA
pub data: Vec<Arc<serde_json::Value>>,
pub data_size: usize,
pub session_id: Arc<str>,
}

impl Entry {
Expand All @@ -45,6 +46,7 @@ impl Entry {
partition_key: "".into(),
data: Vec::new(),
data_size: 0,
session_id: "".into(),
}
}
pub fn into_bytes(&mut self) -> Result<Vec<u8>> {
Expand Down Expand Up @@ -96,6 +98,7 @@ impl Entry {
partition_key: partition_key.into(),
data,
data_size: data_len as usize,
session_id: "".into(),
})
}

Expand Down
23 changes: 20 additions & 3 deletions src/ingester/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ use std::{
use arrow_schema::Schema;
use chrono::{Duration, Utc};
use config::{metrics, CONFIG};
use futures::lock::Mutex;
use once_cell::sync::Lazy;
use snafu::ResultExt;
use tokio::sync::{Mutex, RwLock};
use tokio::sync::RwLock;
use wal::Writer as WalWriter;

use crate::{
Expand Down Expand Up @@ -178,6 +179,7 @@ impl Writer {
if entry.data.is_empty() && !check_ttl {
return Ok(());
}
let session_id = entry.session_id.to_string();
let entry_bytes = if !check_ttl {
entry.into_bytes()?
} else {
Expand Down Expand Up @@ -223,21 +225,36 @@ impl Writer {
let key = self.key.clone();
let path = old_wal.path().clone();
let path_str = path.display().to_string();
let sid = session_id.clone();
tokio::task::spawn(async move {
log::info!("[INGESTER:WAL] start add to IMMUTABLES, file: {}", path_str,);
log::info!(
"[INGESTER:WAL] [{sid}] start add to IMMUTABLES, file: {}",
path_str,
);
IMMUTABLES.write().await.insert(
path,
Arc::new(immutable::Immutable::new(thread_id, key.clone(), old_mem)),
);
log::info!("[INGESTER:WAL] dones add to IMMUTABLES, file: {}", path_str);
log::info!(
"[INGESTER:WAL] [{sid}] dones add to IMMUTABLES, file: {}",
path_str
);
});
}

if !check_ttl {
// write into wal
let start = std::time::Instant::now();
wal.write(&entry_bytes, false).context(WalSnafu)?;
metrics::INGEST_WAL_LOCK_TIME
.with_label_values(&[&self.key.org_id])
.observe(start.elapsed().as_millis() as f64);
// write into memtable
let start = std::time::Instant::now();
mem.write(schema, entry).await?;
metrics::INGEST_MEMTABLE_LOCK_TIME
.with_label_values(&[&self.key.org_id])
.observe(start.elapsed().as_millis() as f64);
}

Ok(())
Expand Down
8 changes: 6 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,14 @@ fn init_router_grpc_server(
.max_encoding_message_size(CONFIG.grpc.max_message_size * 1024 * 1024);
let metrics_svc = MetricsServiceServer::new(router::grpc::ingest::metrics::MetricsServer)
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip);
.accept_compressed(CompressionEncoding::Gzip)
.max_decoding_message_size(CONFIG.grpc.max_message_size * 1024 * 1024)
.max_encoding_message_size(CONFIG.grpc.max_message_size * 1024 * 1024);
let traces_svc = TraceServiceServer::new(router::grpc::ingest::traces::TraceServer)
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip);
.accept_compressed(CompressionEncoding::Gzip)
.max_decoding_message_size(CONFIG.grpc.max_message_size * 1024 * 1024)
.max_encoding_message_size(CONFIG.grpc.max_message_size * 1024 * 1024);

tokio::task::spawn(async move {
log::info!("starting gRPC server at {}", gaddr);
Expand Down
25 changes: 22 additions & 3 deletions src/router/grpc/ingest/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use async_trait::async_trait;
use config::CONFIG;
use config::{ider, CONFIG};
use opentelemetry_proto::tonic::collector::trace::v1::{
trace_service_client::TraceServiceClient, trace_service_server::TraceService,
ExportTraceServiceRequest, ExportTraceServiceResponse,
Expand All @@ -33,7 +33,7 @@ impl TraceService for TraceServer {
&self,
request: Request<ExportTraceServiceRequest>,
) -> Result<Response<ExportTraceServiceResponse>, Status> {
let (metadata, extensions, message) = request.into_parts();
let (mut metadata, extensions, message) = request.into_parts();

// basic validation
if !metadata.contains_key(&CONFIG.grpc.org_header_key) {
Expand All @@ -42,6 +42,8 @@ impl TraceService for TraceServer {
&CONFIG.grpc.org_header_key
)));
}
let session_id = ider::uuid();
metadata.insert("session_id", session_id.parse().unwrap());

// call ingester
let mut request = Request::from_parts(metadata, extensions, message);
Expand All @@ -60,10 +62,27 @@ impl TraceService for TraceServer {
req.metadata_mut().insert("authorization", token.clone());
Ok(req)
});
client

let start = std::time::Instant::now();
match client
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip)
.max_decoding_message_size(CONFIG.grpc.max_message_size * 1024 * 1024)
.max_encoding_message_size(CONFIG.grpc.max_message_size * 1024 * 1024)
.export(request)
.await
{
Ok(res) => {
if res.get_ref().partial_success.is_some() {
log::error!("export trace partial_success response:{:?}", res.get_ref());
}
Ok(res)
}
Err(e) => {
let time = start.elapsed().as_secs_f64();
log::error!("[{session_id}]export trace status: {e}, elapsed: {time}");
Err(e)
}
}
}
}
33 changes: 33 additions & 0 deletions src/service/ingestion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,13 @@ pub async fn write_file(
buf: HashMap<String, SchemaRecords>,
) -> RequestStats {
let mut req_stats = RequestStats::default();
let (stream_name, seesion_id) =
if let [stream_name, seesion_id] = stream_name.split('^').collect::<Vec<_>>().as_slice() {
(*stream_name, *seesion_id)
} else {
(stream_name, "")
};

for (hour_key, entry) in buf {
if entry.records.is_empty() {
continue;
Expand All @@ -378,6 +385,7 @@ pub async fn write_file(
partition_key: Arc::from(hour_key.as_str()),
data: entry.records,
data_size: entry.records_size,
session_id: Arc::from(seesion_id),
},
false,
)
Expand Down Expand Up @@ -625,4 +633,29 @@ mod tests {
);
assert!(result.is_err())
}

#[tokio::test]
async fn test_stream_name_with_sid() {
let stream_name = "default^123456";
let (stream_name, seesion_id) = if let [stream_name, seesion_id] =
stream_name.split('^').collect::<Vec<_>>().as_slice()
{
(*stream_name, *seesion_id)
} else {
(stream_name, "")
};
assert_eq!(stream_name, "default");
assert_eq!(seesion_id, "123456");

let stream_name = "default";
let (stream_name, seesion_id) = if let [stream_name, seesion_id] =
stream_name.split('^').collect::<Vec<_>>().as_slice()
{
(*stream_name, *seesion_id)
} else {
(stream_name, "")
};
assert_eq!(stream_name, "default");
assert_eq!(seesion_id, "");
}
}