Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add the tracing layer to enhance rest api metrics #4392

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
5 changes: 3 additions & 2 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ tower = { version = "0.4.13", features = [
"retry",
"util",
] }
tower-http = { version = "0.4.0", features = ["compression-gzip", "cors"] }
tower-http = { version = "0.4.0", features = ["compression-gzip", "cors", "trace"] }
tracing = "0.1.37"
tracing-opentelemetry = "0.20.0"
tracing-subscriber = { version = "0.3.16", features = [
Expand Down
124 changes: 121 additions & 3 deletions quickwit/quickwit-serve/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,142 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use hyper::{Request, Response};
use once_cell::sync::Lazy;
use quickwit_common::metrics::{new_counter, IntCounter};
use quickwit_common::metrics::{
new_counter, new_counter_vec, new_histogram_vec, Histogram, HistogramVec, IntCounter,
IntCounterVec,
};
use tower_http::classify::{ServerErrorsAsFailures, SharedClassifier};
use tower_http::trace::{
DefaultMakeSpan, DefaultOnBodyChunk, DefaultOnEos, OnFailure, OnRequest, OnResponse, TraceLayer,
};

pub struct RestMetrics {
pub http_requests_total: IntCounter,
pub http_requests_total: IntCounterVec<2>,
pub http_requests_error: IntCounter,
pub http_requests_duration_secs: HistogramVec<2>,
}

impl Default for RestMetrics {
fn default() -> Self {
RestMetrics {
http_requests_total: new_counter(
http_requests_total: new_counter_vec(
"http_requests_total",
"Total number of HTTP requests received",
"quickwit",
["method", "path"],
),
http_requests_error: new_counter(
"http_requests_error",
"Number of HTTP requests with errors",
"quickwit",
),
http_requests_duration_secs: new_histogram_vec(
"http_requests_duration_secs",
"Number of seconds required to run the HTTP request",
"quickwit",
["method", "path"],
),
}
}
}

pub type RestMetricsTraceLayer<B> = TraceLayer<
SharedClassifier<ServerErrorsAsFailures>,
DefaultMakeSpan,
RestMetricsRecorder<B>,
RestMetricsRecorder<B>,
DefaultOnBodyChunk,
DefaultOnEos,
RestMetricsRecorder<B>,
>;

/// Holds the state required for recording metrics on a given request.
pub struct RestMetricsRecorder<B> {
pub histogram: Arc<Mutex<Option<Histogram>>>,
_phantom: PhantomData<B>,
}

impl<B> Clone for RestMetricsRecorder<B> {
fn clone(&self) -> Self {
Self {
histogram: self.histogram.clone(),
_phantom: self._phantom,
}
}
}

impl<B> RestMetricsRecorder<B> {
pub fn new() -> Self {
Self {
histogram: Arc::new(Mutex::new(None)),
_phantom: PhantomData,
}
}
}

impl<B, FailureClass> OnFailure<FailureClass> for RestMetricsRecorder<B> {
fn on_failure(
&mut self,
_failure_classification: FailureClass,
latency: std::time::Duration,
_span: &tracing::Span,
) {
SERVE_METRICS.http_requests_error.inc();
self.clone().record_latency(latency);
}
}

impl<B> RestMetricsRecorder<B> {
fn record_latency(self, latency: Duration) {
if let Some(mutex) = Arc::into_inner(self.histogram) {
if let Some(histogram) = mutex.into_inner().expect("Failed to unlock histogram") {
histogram.observe(latency.as_secs_f64());
}
}
etolbakov marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl<B, RB> OnResponse<RB> for RestMetricsRecorder<B> {
fn on_response(
self,
_response: &Response<RB>,
latency: std::time::Duration,
_span: &tracing::Span,
) {
self.clone().record_latency(latency);
}
}

impl<B, RB> OnRequest<RB> for RestMetricsRecorder<B> {
fn on_request(&mut self, request: &Request<RB>, _span: &tracing::Span) {
let uri = request.uri().path();
if uri.starts_with("/api") {
SERVE_METRICS
.http_requests_total
.with_label_values([request.method().as_str(), uri])
.inc();
*self.histogram.lock().unwrap() = Some(
SERVE_METRICS
.http_requests_duration_secs
.with_label_values([request.method().as_str(), uri]),
);
}
}
}

pub fn make_rest_metrics_layer<B>() -> RestMetricsTraceLayer<B> {
let metrics_recorder = RestMetricsRecorder::new();
TraceLayer::new_for_http()
.on_request(metrics_recorder.clone())
.on_response(metrics_recorder.clone())
.on_failure(metrics_recorder.clone())
}

/// Serve counters exposes a bunch a set of metrics about the request received to quickwit.
pub static SERVE_METRICS: Lazy<RestMetrics> = Lazy::new(RestMetrics::default);
6 changes: 2 additions & 4 deletions quickwit/quickwit-serve/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::indexing_api::indexing_get_handler;
use crate::ingest_api::ingest_api_handlers;
use crate::jaeger_api::jaeger_api_handlers;
use crate::json_api_response::{ApiError, JsonApiResponse};
use crate::metrics::make_rest_metrics_layer;
use crate::metrics_api::metrics_handler;
use crate::node_info_handler::node_info_handler;
use crate::otlp_api::otlp_ingest_api_handlers;
Expand Down Expand Up @@ -70,9 +71,6 @@ pub(crate) async fn start_rest_server(
readiness_trigger: BoxFutureInfaillible<()>,
shutdown_signal: BoxFutureInfaillible<()>,
) -> anyhow::Result<()> {
let request_counter = warp::log::custom(|_| {
crate::SERVE_METRICS.http_requests_total.inc();
});
// Docs routes
let api_doc = warp::path("openapi.json")
.and(warp::get())
Expand Down Expand Up @@ -117,7 +115,6 @@ pub(crate) async fn start_rest_server(
.or(health_check_routes)
.or(metrics_routes)
.or(debugging_routes)
.with(request_counter)
.recover(recover_fn)
.with(extra_headers)
.boxed();
Expand All @@ -128,6 +125,7 @@ pub(crate) async fn start_rest_server(
let cors = build_cors(&quickwit_services.node_config.rest_config.cors_allow_origins);

let service = ServiceBuilder::new()
.layer(make_rest_metrics_layer::<hyper::Body>())
.layer(
CompressionLayer::new()
.gzip(true)
Expand Down