Skip to content

Commit

Permalink
chore: optimized tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
AH-dark committed Apr 3, 2024
1 parent 2677355 commit 971d3b0
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 42 deletions.
24 changes: 14 additions & 10 deletions makefile
Original file line number Diff line number Diff line change
@@ -1,31 +1,35 @@
GO=go
CARGO=cargo

GOFLAGS=-ldflags="-s -w"
ifeq ($(shell uname -s),Darwin)
GOFLAGS+=-buildmode=pie
endif

COMPONENTS=$(shell find components -type d -depth 1 -exec basename {} \;)
GO_COMPONENTS=$(shell find components -type d -depth 1 -exec basename {} \;)
RUST_COMPONENTS=$(shell find rust-components -type d -depth 1 -exec basename {} \;)

build-%:
build-go-%:
$(GO) build $(GOFLAGS) -o bin/$* components/$*/cmd/main.go

build-rust-%:
$(CARGO) build --release --manifest-path=rust-components/$*/Cargo.toml

build:
@for component in $(COMPONENTS); do \
$(MAKE) build-$$component; \
@for component in $(GO_COMPONENTS); do \
$(MAKE) build-go-$$component; \
done

run-%:
$(GO) run components/$*/cmd/main.go
@for component in $(RUST_COMPONENTS); do \
$(MAKE) build-rust-$$component; \
done

test:
$(GO) test -v ./...

deps:
$(GO) mod download
$(CARGO) test

work-init:
$(GO) work init
$(GO) work use . components/*

.PHONY: build-% build run-% test deps work-init
.PHONY: build-go-% build-rust-% build test deps work-init
36 changes: 20 additions & 16 deletions rust-common/src/bot/channel.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::pin::Pin;

use futures::{FutureExt, StreamExt};
use lapin::options::BasicCancelOptions;
use lapin::protocol::constants::REPLY_SUCCESS;
use opentelemetry::trace::Span;
use opentelemetry::trace::TraceContextExt;
use teloxide::prelude::Update;
use teloxide::stop::{mk_stop_token, StopFlag, StopToken};
use teloxide::update_listeners::{AsUpdateStream, UpdateListener};
Expand All @@ -20,11 +22,11 @@ pub struct MqUpdateListener {
impl<'a> AsUpdateStream<'a> for MqUpdateListener {
type StreamErr = lapin::Error;
type Stream =
Box<dyn futures::Stream<Item = Result<Update, Self::StreamErr>> + Unpin + Send + 'a>;
Pin<Box<dyn futures::Stream<Item = Result<Update, Self::StreamErr>> + Unpin + Send + 'a>>;

fn as_stream(&'a mut self) -> Self::Stream {
let flag = self.flag.clone();
let stream = self.consumer.clone().filter_map(move |delivery| {
Box::pin(self.consumer.clone().filter_map(move |delivery| {
assert!(!flag.is_stopped(), "Update listener stopped");
if self.consumer.state() != lapin::ConsumerState::Active
&& self.consumer.state() != lapin::ConsumerState::ActiveWithDelegate
Expand All @@ -34,27 +36,29 @@ impl<'a> AsUpdateStream<'a> for MqUpdateListener {

async move {
match delivery {
Ok(delivery) => match serde_json::from_slice::<Update>(&delivery.data) {
Ok(mut update) => {
let cx = extract_span_from_delivery(&delivery);
update.cx = Some(cx);
Some(Ok(update))
}
Err(e) => {
log::error!("Error deserializing message: {}", e);
None
Ok(delivery) => {
let cx = extract_span_from_delivery(&delivery);

match serde_json::from_slice::<Update>(&delivery.data) {
Ok(mut update) => {
update.cx = Some(cx);
Some(Ok(update))
}
Err(e) => {
log::error!("Error deserializing message: {}", e);
cx.span().record_error(&e);
None
}
}
},
}
Err(e) => {
log::error!("Error receiving message: {}", e);
None
}
}
}
.boxed()
});

Box::new(stream)
}))
}
}

Expand Down
30 changes: 18 additions & 12 deletions rust-components/network-functions-handler/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::time::Duration;

use fast_qr::convert::Builder;
use fastping_rs::PingResult::{Idle, Receive};
use fastping_rs::Pinger;
use fastping_rs::PingResult::{Idle, Receive};
use moka::future::Cache;
use opentelemetry::trace::{Span, SpanKind, Tracer};
use opentelemetry::{global, KeyValue};
use opentelemetry::trace::{SpanKind, TraceContextExt, Tracer};
use teloxide::prelude::*;
use teloxide::types::InputFile;
use teloxide::utils::command::BotCommands;
Expand Down Expand Up @@ -50,15 +50,17 @@ pub(crate) async fn qrcode_handler(
cache: Cache<String, Vec<u8>>,
) -> anyhow::Result<()> {
let tracer = global::tracer("pegasus/rust-components/network-functions-handler/handlers");
let mut span = tracer
let parent_cx = update.cx.unwrap_or_default();
let span = tracer
.span_builder("qrcode_handler")
.with_kind(SpanKind::Internal)
.start_with_context(&tracer, &update.cx.unwrap_or_default());
.start_with_context(&tracer, &parent_cx);
let cx = parent_cx.with_span(span);

if text.is_empty() {
let err = anyhow::anyhow!("Text is empty");
send_error_message!(bot, message, "Text is empty");
span.record_error(err.as_ref());
cx.span().record_error(err.as_ref());
return Err(err);
}

Expand All @@ -74,7 +76,7 @@ pub(crate) async fn qrcode_handler(
.send()
.await?;

span.set_attribute(KeyValue::new("cache_hit", true));
cx.span().set_attribute(KeyValue::new("cache_hit", true));

return Ok(());
}
Expand Down Expand Up @@ -119,20 +121,22 @@ pub(crate) async fn ping_handler(
target: String,
) -> anyhow::Result<()> {
let tracer = global::tracer("pegasus/rust-components/network-functions-handler/handlers");
let mut span = tracer
let parent_tx = update.cx.unwrap_or_default();
let span = tracer
.span_builder("ping_handler")
.with_kind(SpanKind::Internal)
.start_with_context(&tracer, &update.cx.unwrap_or_default());
.start_with_context(&tracer, &parent_tx);
let cx = parent_tx.with_span(span);

if target.is_empty() {
let err = anyhow::anyhow!("Target is empty");
send_error_message!(bot, message, "Usage: /ping <target>");
span.record_error(err.as_ref());
cx.span().record_error(err.as_ref());
return Err(err);
}

let target_ip = match_error!(
parse_target(&target).await,
parse_target(cx.clone(), &target).await,
bot,
message,
"Failed to parse target: {}"
Expand All @@ -153,7 +157,8 @@ pub(crate) async fn ping_handler(
Idle { addr } => {
let err = format!("Failed to ping target: {}", addr);
send_error_message!(bot, message, &err);
span.record_error(anyhow::anyhow!("Failed to ping target: {}", addr).as_ref());
cx.span()
.record_error(anyhow::anyhow!("Failed to ping target: {}", addr).as_ref());
return Err(anyhow::anyhow!("Failed to ping target: {}", err));
}
Receive { addr, rtt } => {
Expand All @@ -169,7 +174,8 @@ pub(crate) async fn ping_handler(
Err(e) => {
let err = format!("Failed to receive result: {}", e);
send_error_message!(bot, message, err.clone());
span.record_error(anyhow::anyhow!(err.clone()).as_ref());
cx.span()
.record_error(anyhow::anyhow!(err.clone()).as_ref());
return Err(anyhow::anyhow!(err));
}
}
Expand Down
22 changes: 18 additions & 4 deletions rust-components/network-functions-handler/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};

use lazy_static::lazy_static;
use opentelemetry::{Context, global, KeyValue};
use opentelemetry::trace::{TraceContextExt, Tracer};
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
use trust_dns_resolver::TokioAsyncResolver;

Expand All @@ -9,7 +11,17 @@ lazy_static! {
TokioAsyncResolver::tokio(ResolverConfig::default(), ResolverOpts::default());
}

pub(crate) async fn parse_target(target: &str) -> anyhow::Result<IpAddr> {
pub(crate) async fn parse_target(parent_cx: Context, target: &str) -> anyhow::Result<IpAddr> {
let tracer = global::tracer("pegasus/rust-components/network-functions-handler/utils");
let cx = parent_cx.with_span(
tracer
.span_builder("parse_target")
.start_with_context(&tracer, &parent_cx),
);

cx.span()
.set_attribute(KeyValue::new("target", target.to_string()));

if let Ok(ip_addr) = target.parse::<Ipv4Addr>() {
Ok(IpAddr::V4(ip_addr))
} else if let Ok(ip_addr) = target.parse::<Ipv6Addr>() {
Expand Down Expand Up @@ -39,15 +51,17 @@ mod tests {
#[tokio::test]
async fn test_parse_target() {
assert_eq!(
parse_target("127.0.0.1").await.unwrap(),
parse_target(Context::default(), "127.0.0.1").await.unwrap(),
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
);

assert_eq!(
parse_target("::1").await.unwrap(),
parse_target(Context::default(), "::1").await.unwrap(),
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1))
);

assert!(parse_target("example.com").await.is_ok());
assert!(parse_target(Context::default(), "example.com")
.await
.is_ok());
}
}

0 comments on commit 971d3b0

Please sign in to comment.