Skip to content

Commit

Permalink
backend/tracing: add trace id (#565)
Browse files Browse the repository at this point in the history
* add tracing for request/response

* refactor tracing
  • Loading branch information
vnghia authored Dec 10, 2024
1 parent 84de79b commit 04c753c
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 67 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions nghe-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ tempfile = { version = "3.14.0" }
[target.'cfg(not(any(target_env = "musl", all(target_arch = "aarch64", target_os = "linux"))))'.dev-dependencies]
diesel = { version = "2.2.5", features = ["postgres"] }

[build-dependencies]
built = { version = "0.7.5" }

[package.metadata.vcpkg]
dependencies = [
"ffmpeg[avcodec,avfilter,avformat,avresample,mp3lame,opus,soxr,swresample,vorbis,openssl]",
Expand Down
2 changes: 2 additions & 0 deletions nghe-backend/build.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
fn main() {
built::write_built_file().expect("Could not acquire build-time information");

println!("cargo::rustc-check-cfg=cfg(hearing_test)");
if std::env::var("NGHE_HEARING_TEST_INPUT").is_ok_and(|s| !s.is_empty())
&& std::env::var("NGHE_HEARING_TEST_OUTPUT").is_ok_and(|s| !s.is_empty())
Expand Down
5 changes: 5 additions & 0 deletions nghe-backend/src/constant.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod built_info {
include!(concat!(env!("OUT_DIR"), "/built.rs"));
}

pub use built_info::PKG_NAME;
21 changes: 8 additions & 13 deletions nghe-backend/src/integration/informant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ impl Informant {
)
}

#[tracing::instrument(skip(self, database, config, id), ret(level = "trace"))]
async fn upsert_artist(
&self,
database: &Database,
Expand Down Expand Up @@ -84,18 +83,14 @@ impl Informant {
artist: &artists::Artist<'_>,
) -> Result<(), Error> {
let id = artist.id;
self.upsert_artist(
database,
config,
id,
if let Some(ref client) = self.spotify {
client.search_artist(&artist.data.name).await?
} else {
None
}
.as_ref(),
)
.await
let spotify = if let Some(ref client) = self.spotify {
client.search_artist(&artist.data.name).await?
} else {
None
};

tracing::debug!(?spotify);
self.upsert_artist(database, config, id, spotify.as_ref()).await
}

pub async fn search_and_upsert_artists(
Expand Down
45 changes: 27 additions & 18 deletions nghe-backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#[coverage(off)]
pub mod config;
mod constant;
mod database;
#[coverage(off)]
mod error;
Expand Down Expand Up @@ -52,22 +53,17 @@ static GLOBAL: MiMalloc = MiMalloc;
pub fn init_tracing() -> Result<(), Error> {
color_eyre::install()?;

let tracing = tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
if cfg!(test) { "debug" } else { const_format::concatc!(constant::PKG_NAME, "=info") }
.into()
}))
.with(tracing_error::ErrorLayer::default());

if cfg!(test) {
let _ = tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info".into()),
)
.with_test_writer()
.try_init();
tracing.with(tracing_subscriber::fmt::layer().with_test_writer()).try_init()?;
} else {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
["nghe_backend=info".to_owned(), "tower_http=info".to_owned()].join(",").into()
}))
.with(tracing_subscriber::fmt::layer().with_target(false))
.with(tracing_error::ErrorLayer::default())
.try_init()?;
tracing.with(tracing_subscriber::fmt::layer().with_target(false).compact()).try_init()?;
}

Ok(())
Expand Down Expand Up @@ -107,10 +103,23 @@ pub async fn build(config: config::Config) -> Router {
.merge(route::search::router())
.merge(route::system::router())
.with_state(database::Database::new(&config.database))
.layer(TraceLayer::new_for_http().make_span_with(|_: &axum::extract::Request| {
let id = Uuid::new_v4();
tracing::info_span!("request", ?id)
}))
.layer(
TraceLayer::new_for_http()
.make_span_with(|_: &axum::extract::Request| {
let id = Uuid::new_v4();
tracing::info_span!(nghe_api::constant::SERVER_NAME, trace = %id)
})
.on_request(|request: &axum::extract::Request, _: &tracing::Span| {
tracing::info!(method = request.method().as_str(), path = request.uri().path());
})
.on_response(
|response: &axum::response::Response,
latency: std::time::Duration,
_: &tracing::Span| {
tracing::info!(status = response.status().as_u16(), took = ?latency);
},
),
)
.layer(CorsLayer::permissive())
.layer(CompressionLayer::new().br(true).gzip(true).zstd(true))
}
20 changes: 14 additions & 6 deletions nghe-backend/src/scan/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,12 @@ impl<'db, 'fs, 'mf> Scanner<'db, 'fs, 'mf> {
.map_err(Error::from)
}

#[instrument(skip(self, started_at), ret(level = "debug"), err(Debug))]
#[instrument(
skip_all,
fields(path = %entry.path, last_modified = ?entry.last_modified),
ret(level = "debug"),
err(Debug)
)]
async fn one(&self, entry: &Entry, started_at: time::OffsetDateTime) -> Result<(), Error> {
let database = &self.database;

Expand Down Expand Up @@ -232,17 +237,16 @@ impl<'db, 'fs, 'mf> Scanner<'db, 'fs, 'mf> {
Ok(())
}

#[instrument(
skip(self), fields(music_folder_data = ?self.music_folder.data, started_at), ret, err(Debug)
)]
#[instrument(skip_all, err(Debug))]
pub async fn run(&self) -> Result<(), Error> {
let span = tracing::Span::current();
tracing::info!(music_folder = ?self.music_folder);
let started_at = crate::time::now().await;
span.record("started_at", tracing::field::display(&started_at));
tracing::info!(?started_at);

let (scan_handle, permit, rx) = self.init();
let mut join_set = tokio::task::JoinSet::new();

let span = tracing::Span::current();
while let Ok(entry) = rx.recv_async().await {
let permit = permit.clone().acquire_owned().await?;
let scan = self.clone().into_owned();
Expand All @@ -264,6 +268,10 @@ impl<'db, 'fs, 'mf> Scanner<'db, 'fs, 'mf> {

self.database.upsert_config(&self.config.index).await?;
self.informant.search_and_upsert_artists(&self.database, &self.config.cover_art).await?;

let latency: std::time::Duration =
(time::OffsetDateTime::now_utc() - started_at).try_into()?;
tracing::info!(took = ?latency);
Ok(())
}
}
Expand Down
10 changes: 5 additions & 5 deletions nghe-backend/src/transcode/lock.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Debug;
use std::fmt::Display;
use std::io::Seek;

use tracing::instrument;
Expand All @@ -19,8 +19,8 @@ impl Lock {
.map_err(Error::from)
}

#[instrument(err(Debug, level = "debug"))]
pub fn lock_read(path: impl AsRef<Utf8NativePath> + Debug) -> Result<std::fs::File, Error> {
#[instrument(skip_all, fields(%path), err(Debug, level = "trace"))]
pub fn lock_read(path: impl AsRef<Utf8NativePath> + Display) -> Result<std::fs::File, Error> {
let mut file = Self::open_read(path)?;
// The read lock might be acquired with an empty file since creating and locking exclusively
// a file are two separate operations. We need to check if the file is empty before trying
Expand All @@ -37,8 +37,8 @@ impl Lock {
}
}

#[instrument(err(Debug, level = "debug"))]
pub fn lock_write(path: impl AsRef<Utf8NativePath> + Debug) -> Result<std::fs::File, Error> {
#[instrument(skip_all, fields(%path), err(Debug, level = "trace"))]
pub fn lock_write(path: impl AsRef<Utf8NativePath> + Display) -> Result<std::fs::File, Error> {
let file = std::fs::OpenOptions::new().write(true).create_new(true).open(path.as_ref())?;
if file.try_lock()? { Ok(file) } else { error::Kind::FileAlreadyLocked.into() }
}
Expand Down
4 changes: 2 additions & 2 deletions nghe-backend/src/transcode/sink.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::ffi::CStr;
use std::fmt::Debug;
use std::fmt::Display;
use std::io::Write;

use educe::Educe;
Expand Down Expand Up @@ -27,7 +27,7 @@ impl Sink {
pub async fn new(
config: &config::Transcode,
format: format::Transcode,
output: Option<impl AsRef<Utf8NativePath> + Debug + Send + 'static>,
output: Option<impl AsRef<Utf8NativePath> + Display + Send + 'static>,
) -> Result<(Self, Receiver<Vec<u8>>), Error> {
let (tx, rx) = crate::sync::channel(config.channel_size);
// It will fail in two cases:
Expand Down
9 changes: 5 additions & 4 deletions nghe-backend/src/transcode/transcoder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::borrow::Cow;
use std::ffi::{CStr, CString};
use std::fmt::Debug;
use std::fmt::Display;

use concat_string::concat_string;
use rsmpeg::avcodec::{AVCodec, AVCodecContext};
Expand Down Expand Up @@ -219,13 +219,14 @@ impl<'graph> Filter<'graph> {
}

impl Transcoder {
#[instrument(err(Debug))]
#[instrument(skip_all, err(Debug))]
pub fn spawn(
input: impl Into<String> + Debug,
input: impl Into<String> + Display,
sink: Sink,
bitrate: u32,
offset: u32,
) -> Result<tokio::task::JoinHandle<Result<(), Error>>, Error> {
tracing::debug!(%input, ?sink, %bitrate, %offset);
let mut transcoder = Self::new(&CString::new(input.into())?, sink, bitrate, offset)?;

let span = tracing::Span::current();
Expand Down Expand Up @@ -295,7 +296,7 @@ mod test {

impl Transcoder {
pub async fn spawn_collect(
input: impl Into<String> + Debug,
input: impl Into<String> + Display,
config: &config::Transcode,
format: format::Transcode,
bitrate: u32,
Expand Down
42 changes: 23 additions & 19 deletions nghe-proc-macro/src/backend/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ enum Arg {
Database { ident: syn::Ident, use_database: bool },
User(syn::Ident),
Request,
Extension { ident: syn::Ident, ty: syn::TypePath, reference: bool, skip_debug: bool },
Extension { ident: syn::Ident, ty: syn::TypePath, reference: bool },
Header { ident: syn::Ident, ty: syn::TypePath },
}

Expand Down Expand Up @@ -94,10 +94,7 @@ impl Arg {
if config.header {
Ok(Self::Header { ident: pat.ident.clone(), ty })
} else {
let skip_debug = pat.ident == "config"
|| pat.ident == "informant"
|| pat.ident == "filesystem";
Ok(Self::Extension { ident: pat.ident.clone(), ty, reference, skip_debug })
Ok(Self::Extension { ident: pat.ident.clone(), ty, reference })
}
}
}
Expand All @@ -109,16 +106,14 @@ impl Arg {
}
}

fn to_skip_debug(&self) -> Option<&syn::Ident> {
fn to_tracing(&self) -> Option<TokenStream> {
match self {
Arg::Database { ident, use_database } => {
if *use_database {
Some(ident)
} else {
None
}
Arg::User(ident) => {
let field = format_ident!("user_{ident}");
Some(parse_quote!(user.#ident = ?#field))
}
Arg::Extension { ident, skip_debug, .. } if *skip_debug => Some(ident),
Arg::Header { ident, .. } => Some(parse_quote!(#ident = ?#ident)),
Arg::Request => Some(parse_quote!(request = ?request)),
_ => None,
}
}
Expand Down Expand Up @@ -331,7 +326,7 @@ impl Handler {
} else {
let source_dir = source_path.parent().unwrap().file_name().unwrap().to_str().unwrap();
let source_stem = source_path.file_stem().unwrap().to_str().unwrap();
concat_string!(source_dir, "::", source_stem)
concat_string!(source_dir, ":", source_stem)
};

let mut sig = self.item.sig.clone();
Expand All @@ -350,18 +345,24 @@ impl Handler {
})
.try_collect()?;

let skip_debugs: Punctuated<&syn::Ident, syn::Token![,]> =
self.args.value.iter().filter_map(Arg::to_skip_debug).collect();
let mut tracing_args = Punctuated::<syn::Meta, syn::Token![,]>::default();
tracing_args.push(parse_quote!(name = #tracing_name));
tracing_args.push(parse_quote!(skip(#skip_debugs)));
tracing_args.push(parse_quote!(skip_all));
if self.is_result_binary.is_some() {
tracing_args.push(parse_quote!(ret(level = "trace")));
tracing_args.push(parse_quote!(err(Debug)));
}

let traced_args: Punctuated<TokenStream, syn::Token![,]> =
self.args.value.iter().filter_map(Arg::to_tracing).collect();
let traced_expr_macro: Option<TokenStream> = if traced_args.is_empty() {
None
} else {
Some(quote!(tracing::debug!(#traced_args);))
};

let handler_ident = &self.item.sig.ident;
let source: syn::Expr = if sig.asyncness.is_some() {
let handler_expr_call: syn::Expr = if sig.asyncness.is_some() {
parse_quote!(#handler_ident(#args).await)
} else {
parse_quote!(#handler_ident(#args))
Expand All @@ -371,7 +372,10 @@ impl Handler {
#[coverage(off)]
#[inline(always)]
#[tracing::instrument(#tracing_args)]
#sig { #source }
#sig {
#traced_expr_macro
#handler_expr_call
}
})
}

Expand Down

0 comments on commit 04c753c

Please sign in to comment.