From 971d3b0e4a80cacfeafcb0a1a7c9f4e1796c64d9 Mon Sep 17 00:00:00 2001 From: AH-dark Date: Thu, 4 Apr 2024 00:43:07 +0800 Subject: [PATCH] chore: optimized tracing --- makefile | 24 +++++++------ rust-common/src/bot/channel.rs | 36 ++++++++++--------- .../network-functions-handler/src/handlers.rs | 30 +++++++++------- .../network-functions-handler/src/utils.rs | 22 +++++++++--- 4 files changed, 70 insertions(+), 42 deletions(-) diff --git a/makefile b/makefile index a5e2947..c558e7e 100644 --- a/makefile +++ b/makefile @@ -1,31 +1,35 @@ GO=go +CARGO=cargo GOFLAGS=-ldflags="-s -w" ifeq ($(shell uname -s),Darwin) GOFLAGS+=-buildmode=pie endif -COMPONENTS=$(shell find components -type d -depth 1 -exec basename {} \;) +GO_COMPONENTS=$(shell find components -type d -depth 1 -exec basename {} \;) +RUST_COMPONENTS=$(shell find rust-components -type d -depth 1 -exec basename {} \;) -build-%: +build-go-%: $(GO) build $(GOFLAGS) -o bin/$* components/$*/cmd/main.go +build-rust-%: + $(CARGO) build --release --manifest-path=rust-components/$*/Cargo.toml + build: - @for component in $(COMPONENTS); do \ - $(MAKE) build-$$component; \ + @for component in $(GO_COMPONENTS); do \ + $(MAKE) build-go-$$component; \ done -run-%: - $(GO) run components/$*/cmd/main.go + @for component in $(RUST_COMPONENTS); do \ + $(MAKE) build-rust-$$component; \ + done test: $(GO) test -v ./... - -deps: - $(GO) mod download + $(CARGO) test work-init: $(GO) work init $(GO) work use . components/* -.PHONY: build-% build run-% test deps work-init +.PHONY: build-go-% build-rust-% build test deps work-init diff --git a/rust-common/src/bot/channel.rs b/rust-common/src/bot/channel.rs index 4aa6a5c..e0684d0 100644 --- a/rust-common/src/bot/channel.rs +++ b/rust-common/src/bot/channel.rs @@ -1,7 +1,9 @@ +use std::pin::Pin; + use futures::{FutureExt, StreamExt}; use lapin::options::BasicCancelOptions; use lapin::protocol::constants::REPLY_SUCCESS; -use opentelemetry::trace::Span; +use opentelemetry::trace::TraceContextExt; use teloxide::prelude::Update; use teloxide::stop::{mk_stop_token, StopFlag, StopToken}; use teloxide::update_listeners::{AsUpdateStream, UpdateListener}; @@ -20,11 +22,11 @@ pub struct MqUpdateListener { impl<'a> AsUpdateStream<'a> for MqUpdateListener { type StreamErr = lapin::Error; type Stream = - Box> + Unpin + Send + 'a>; + Pin> + Unpin + Send + 'a>>; fn as_stream(&'a mut self) -> Self::Stream { let flag = self.flag.clone(); - let stream = self.consumer.clone().filter_map(move |delivery| { + Box::pin(self.consumer.clone().filter_map(move |delivery| { assert!(!flag.is_stopped(), "Update listener stopped"); if self.consumer.state() != lapin::ConsumerState::Active && self.consumer.state() != lapin::ConsumerState::ActiveWithDelegate @@ -34,17 +36,21 @@ impl<'a> AsUpdateStream<'a> for MqUpdateListener { async move { match delivery { - Ok(delivery) => match serde_json::from_slice::(&delivery.data) { - Ok(mut update) => { - let cx = extract_span_from_delivery(&delivery); - update.cx = Some(cx); - Some(Ok(update)) - } - Err(e) => { - log::error!("Error deserializing message: {}", e); - None + Ok(delivery) => { + let cx = extract_span_from_delivery(&delivery); + + match serde_json::from_slice::(&delivery.data) { + Ok(mut update) => { + update.cx = Some(cx); + Some(Ok(update)) + } + Err(e) => { + log::error!("Error deserializing message: {}", e); + cx.span().record_error(&e); + None + } } - }, + } Err(e) => { log::error!("Error receiving message: {}", e); None @@ -52,9 +58,7 @@ impl<'a> AsUpdateStream<'a> for MqUpdateListener { } } .boxed() - }); - - Box::new(stream) + })) } } diff --git a/rust-components/network-functions-handler/src/handlers.rs b/rust-components/network-functions-handler/src/handlers.rs index b3b6f73..f692a2c 100644 --- a/rust-components/network-functions-handler/src/handlers.rs +++ b/rust-components/network-functions-handler/src/handlers.rs @@ -1,11 +1,11 @@ use std::time::Duration; use fast_qr::convert::Builder; -use fastping_rs::PingResult::{Idle, Receive}; use fastping_rs::Pinger; +use fastping_rs::PingResult::{Idle, Receive}; use moka::future::Cache; -use opentelemetry::trace::{Span, SpanKind, Tracer}; use opentelemetry::{global, KeyValue}; +use opentelemetry::trace::{SpanKind, TraceContextExt, Tracer}; use teloxide::prelude::*; use teloxide::types::InputFile; use teloxide::utils::command::BotCommands; @@ -50,15 +50,17 @@ pub(crate) async fn qrcode_handler( cache: Cache>, ) -> anyhow::Result<()> { let tracer = global::tracer("pegasus/rust-components/network-functions-handler/handlers"); - let mut span = tracer + let parent_cx = update.cx.unwrap_or_default(); + let span = tracer .span_builder("qrcode_handler") .with_kind(SpanKind::Internal) - .start_with_context(&tracer, &update.cx.unwrap_or_default()); + .start_with_context(&tracer, &parent_cx); + let cx = parent_cx.with_span(span); if text.is_empty() { let err = anyhow::anyhow!("Text is empty"); send_error_message!(bot, message, "Text is empty"); - span.record_error(err.as_ref()); + cx.span().record_error(err.as_ref()); return Err(err); } @@ -74,7 +76,7 @@ pub(crate) async fn qrcode_handler( .send() .await?; - span.set_attribute(KeyValue::new("cache_hit", true)); + cx.span().set_attribute(KeyValue::new("cache_hit", true)); return Ok(()); } @@ -119,20 +121,22 @@ pub(crate) async fn ping_handler( target: String, ) -> anyhow::Result<()> { let tracer = global::tracer("pegasus/rust-components/network-functions-handler/handlers"); - let mut span = tracer + let parent_tx = update.cx.unwrap_or_default(); + let span = tracer .span_builder("ping_handler") .with_kind(SpanKind::Internal) - .start_with_context(&tracer, &update.cx.unwrap_or_default()); + .start_with_context(&tracer, &parent_tx); + let cx = parent_tx.with_span(span); if target.is_empty() { let err = anyhow::anyhow!("Target is empty"); send_error_message!(bot, message, "Usage: /ping "); - span.record_error(err.as_ref()); + cx.span().record_error(err.as_ref()); return Err(err); } let target_ip = match_error!( - parse_target(&target).await, + parse_target(cx.clone(), &target).await, bot, message, "Failed to parse target: {}" @@ -153,7 +157,8 @@ pub(crate) async fn ping_handler( Idle { addr } => { let err = format!("Failed to ping target: {}", addr); send_error_message!(bot, message, &err); - span.record_error(anyhow::anyhow!("Failed to ping target: {}", addr).as_ref()); + cx.span() + .record_error(anyhow::anyhow!("Failed to ping target: {}", addr).as_ref()); return Err(anyhow::anyhow!("Failed to ping target: {}", err)); } Receive { addr, rtt } => { @@ -169,7 +174,8 @@ pub(crate) async fn ping_handler( Err(e) => { let err = format!("Failed to receive result: {}", e); send_error_message!(bot, message, err.clone()); - span.record_error(anyhow::anyhow!(err.clone()).as_ref()); + cx.span() + .record_error(anyhow::anyhow!(err.clone()).as_ref()); return Err(anyhow::anyhow!(err)); } } diff --git a/rust-components/network-functions-handler/src/utils.rs b/rust-components/network-functions-handler/src/utils.rs index af41eca..01e6ec2 100644 --- a/rust-components/network-functions-handler/src/utils.rs +++ b/rust-components/network-functions-handler/src/utils.rs @@ -1,6 +1,8 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use lazy_static::lazy_static; +use opentelemetry::{Context, global, KeyValue}; +use opentelemetry::trace::{TraceContextExt, Tracer}; use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; use trust_dns_resolver::TokioAsyncResolver; @@ -9,7 +11,17 @@ lazy_static! { TokioAsyncResolver::tokio(ResolverConfig::default(), ResolverOpts::default()); } -pub(crate) async fn parse_target(target: &str) -> anyhow::Result { +pub(crate) async fn parse_target(parent_cx: Context, target: &str) -> anyhow::Result { + let tracer = global::tracer("pegasus/rust-components/network-functions-handler/utils"); + let cx = parent_cx.with_span( + tracer + .span_builder("parse_target") + .start_with_context(&tracer, &parent_cx), + ); + + cx.span() + .set_attribute(KeyValue::new("target", target.to_string())); + if let Ok(ip_addr) = target.parse::() { Ok(IpAddr::V4(ip_addr)) } else if let Ok(ip_addr) = target.parse::() { @@ -39,15 +51,17 @@ mod tests { #[tokio::test] async fn test_parse_target() { assert_eq!( - parse_target("127.0.0.1").await.unwrap(), + parse_target(Context::default(), "127.0.0.1").await.unwrap(), IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) ); assert_eq!( - parse_target("::1").await.unwrap(), + parse_target(Context::default(), "::1").await.unwrap(), IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)) ); - assert!(parse_target("example.com").await.is_ok()); + assert!(parse_target(Context::default(), "example.com") + .await + .is_ok()); } }