Skip to content

Commit

Permalink
cdc: add more metrics about output events queue time (#16281) (#16288)
Browse files Browse the repository at this point in the history
close #16282

Signed-off-by: qupeng <[email protected]>

Co-authored-by: qupeng <[email protected]>
  • Loading branch information
ti-chi-bot and hicqu committed Apr 19, 2024
1 parent 8c3269c commit 829028b
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 10 deletions.
20 changes: 12 additions & 8 deletions components/cdc/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ macro_rules! impl_from_future_send_error {

impl_from_future_send_error! {
FuturesSendError,
TrySendError<(CdcEvent, usize)>,
TrySendError<(Instant, CdcEvent, usize)>,
}

impl From<MemoryQuotaExceeded> for SendError {
Expand All @@ -245,8 +245,8 @@ impl From<MemoryQuotaExceeded> for SendError {

#[derive(Clone)]
pub struct Sink {
unbounded_sender: UnboundedSender<(CdcEvent, usize)>,
bounded_sender: Sender<(CdcEvent, usize)>,
unbounded_sender: UnboundedSender<(Instant, CdcEvent, usize)>,
bounded_sender: Sender<(Instant, CdcEvent, usize)>,
memory_quota: Arc<MemoryQuota>,
}

Expand All @@ -257,7 +257,8 @@ impl Sink {
if bytes != 0 {
self.memory_quota.alloc(bytes)?;
}
match self.unbounded_sender.unbounded_send((event, bytes)) {
let now = Instant::now_coarse();
match self.unbounded_sender.unbounded_send((now, event, bytes)) {
Ok(_) => Ok(()),
Err(e) => {
// Free quota if send fails.
Expand All @@ -275,9 +276,11 @@ impl Sink {
total_bytes += bytes;
}
self.memory_quota.alloc(total_bytes as _)?;

let now = Instant::now_coarse();
for event in events {
let bytes = event.size() as usize;
if let Err(e) = self.bounded_sender.feed((event, bytes)).await {
if let Err(e) = self.bounded_sender.feed((now, event, bytes)).await {
// Free quota if send fails.
self.memory_quota.free(total_bytes as _);
return Err(SendError::from(e));
Expand All @@ -293,15 +296,16 @@ impl Sink {
}

pub struct Drain {
unbounded_receiver: UnboundedReceiver<(CdcEvent, usize)>,
bounded_receiver: Receiver<(CdcEvent, usize)>,
unbounded_receiver: UnboundedReceiver<(Instant, CdcEvent, usize)>,
bounded_receiver: Receiver<(Instant, CdcEvent, usize)>,
memory_quota: Arc<MemoryQuota>,
}

impl<'a> Drain {
pub fn drain(&'a mut self) -> impl Stream<Item = (CdcEvent, usize)> + 'a {
stream::select(&mut self.bounded_receiver, &mut self.unbounded_receiver).map(
|(mut event, size)| {
|(start, mut event, size)| {
CDC_EVENTS_PENDING_DURATION.observe(start.saturating_elapsed_secs() * 1000.0);
if let CdcEvent::Barrier(ref mut barrier) = event {
if let Some(barrier) = barrier.take() {
// Unset barrier when it is received.
Expand Down
8 changes: 6 additions & 2 deletions components/cdc/src/initializer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.
use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use api_version::ApiV2;
use crossbeam::atomic::AtomicCell;
Expand Down Expand Up @@ -38,7 +38,7 @@ use tikv_util::{
debug, defer, error, info,
memory::MemoryQuota,
sys::inspector::{self_thread_inspector, ThreadInspector},
time::{Instant, Limiter},
time::{duration_to_sec, Instant, Limiter},
warn,
worker::Scheduler,
Either,
Expand Down Expand Up @@ -260,6 +260,7 @@ impl<E: KvEngine> Initializer<E> {
fail_point!("cdc_incremental_scan_start");
let mut done = false;
let start = Instant::now_coarse();
let mut sink_time = Duration::default();

let curr_state = self.downstream_state.load();
assert!(matches!(
Expand All @@ -282,7 +283,9 @@ impl<E: KvEngine> Initializer<E> {
}
debug!("cdc scan entries"; "len" => entries.len(), "region_id" => region_id);
fail_point!("before_schedule_incremental_scan");
let start_sink = Instant::now_coarse();
self.sink_scan_events(entries, done).await?;
sink_time += start_sink.saturating_elapsed();
}

if !post_init_downstream(&self.downstream_state) {
Expand All @@ -301,6 +304,7 @@ impl<E: KvEngine> Initializer<E> {
}

CDC_SCAN_DURATION_HISTOGRAM.observe(takes.as_secs_f64());
CDC_SCAN_SINK_DURATION_HISTOGRAM.observe(duration_to_sec(sink_time));
Ok(())
}

Expand Down
12 changes: 12 additions & 0 deletions components/cdc/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ lazy_static! {
exponential_buckets(0.005, 2.0, 20).unwrap()
)
.unwrap();
pub static ref CDC_SCAN_SINK_DURATION_HISTOGRAM: Histogram = register_histogram!(
"tikv_cdc_scan_sink_duration_seconds",
"Bucketed histogram of cdc async scan sink time duration",
)
.unwrap();
pub static ref CDC_SCAN_BYTES: IntCounter = register_int_counter!(
"tikv_cdc_scan_bytes_total",
"Total fetched bytes of CDC incremental scan"
Expand Down Expand Up @@ -214,6 +219,13 @@ lazy_static! {

pub static ref CDC_ROCKSDB_PERF_COUNTER_STATIC: PerfCounter =
auto_flush_from!(CDC_ROCKSDB_PERF_COUNTER, PerfCounter);

pub static ref CDC_EVENTS_PENDING_DURATION: Histogram = register_histogram!(
"tikv_cdc_events_pending_duration",
"Pending duration for all events, in milliseconds",
exponential_buckets(0.01, 2.0, 17).unwrap(),
)
.unwrap();
}

thread_local! {
Expand Down

0 comments on commit 829028b

Please sign in to comment.