Skip to content

Commit

Permalink
chore: use new ceramic one metric for validation (#182)
Browse files Browse the repository at this point in the history
made count optional so we can tell if we should fallback to the old one
  • Loading branch information
dav1do authored Jun 18, 2024
1 parent 2e6c608 commit 03bec17
Showing 1 changed file with 43 additions and 13 deletions.
56 changes: 43 additions & 13 deletions runner/src/simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use reqwest::Client;

// FIXME: is it worth attaching metrics to the peer info?
const IPFS_SERVICE_METRICS_PORT: &str = "9465";
const EVENT_SYNC_METRIC_NAME: &str = "ceramic_store_key_value_insert_count_total";
const EVENT_SYNC_METRIC_NAME_OLD: &str = "ceramic_store_key_value_insert_count_total";
const EVENT_SYNC_METRIC_NAME: &str = "ceramic_one_store_key_value_insert_count_total";
const ANCHOR_REQUEST_MIDS_KEY: &str = "anchor_mids";
const CAS_ANCHOR_REQUEST_KEY: &str = "anchor_requests";

Expand Down Expand Up @@ -279,12 +280,12 @@ impl MetricsCollection for PromMetricCollector {
#[derive(Debug, Clone)]
struct PeerRequestMetricInfo {
pub name: String,
pub count: u64,
pub count: Option<u64>,
pub runtime: Option<u64>,
}

impl PeerRequestMetricInfo {
pub fn new(name: String, count: u64, runtime: Option<u64>) -> Self {
pub fn new(name: String, count: Option<u64>, runtime: Option<u64>) -> Self {
Self {
name,
count,
Expand All @@ -298,7 +299,7 @@ impl PeerRequestMetricInfo {
warn!("Runtime of 0 seconds is invalid for RPS calculation");
0.0
}
Some(runtime) => self.count as f64 / runtime as f64,
Some(runtime) => self.count.unwrap_or(0) as f64 / runtime as f64,
None => {
warn!("Runtime is missing for RPS calculation");
0.0
Expand Down Expand Up @@ -346,7 +347,12 @@ fn final_peer_req_metric_info(
if before.name == current.name {
Ok(PeerRequestMetricInfo::new(
before.name.to_owned(),
current.count - before.count,
Some(
current
.count
.unwrap_or(0)
.saturating_sub(before.count.unwrap_or(0)),
),
Some(run_time_seconds),
))
} else {
Expand Down Expand Up @@ -509,13 +515,13 @@ impl ScenarioState {
if let Some(m) = metric {
results.push(PeerRequestMetricInfo::new(
parse_base_url_to_pod(&addr)?,
m,
Some(m),
None,
));
} else {
results.push(PeerRequestMetricInfo::new(
parse_base_url_to_pod(&addr)?,
0, // Use 0 as the count if the metric is not found
None,
None,
));
}
Expand Down Expand Up @@ -545,10 +551,20 @@ impl ScenarioState {
Ok(())
}
Scenario::ReconEventSync => {
let peers = self
let mut peers = self
.get_peers_counter_metric(EVENT_SYNC_METRIC_NAME, IPFS_SERVICE_METRICS_PORT)
.await?;

// TEMP: fall back to old metric if new one is 0 jic we're on an old version of c1
if peers.iter().all(|p| p.count.is_none()) {
peers = self
.get_peers_counter_metric(
EVENT_SYNC_METRIC_NAME_OLD,
IPFS_SERVICE_METRICS_PORT,
)
.await?;
}

self.before_metrics = Some(peers);
Ok(())
}
Expand Down Expand Up @@ -591,7 +607,7 @@ impl ScenarioState {
for (name, count) in res {
let metric = PeerRequestMetricInfo::new(
name.clone(),
count as u64,
Some(count as u64),
Some(metrics.duration as u64),
);
let rps = metric.rps();
Expand Down Expand Up @@ -878,14 +894,28 @@ impl ScenarioState {
| Scenario::CeramicNewStreamsBenchmark => (CommandResult::Success, None),
Scenario::ReconEventSync => {
let default_rate = 300;
let metric_name = EVENT_SYNC_METRIC_NAME;
let mut metric_name = EVENT_SYNC_METRIC_NAME;

let peer_req_cnts = match self
.get_peers_counter_metric(metric_name, IPFS_SERVICE_METRICS_PORT)
.await
.map_err(CommandResult::Failure)
{
Ok(v) => v,
Ok(v) => {
if v.iter().all(|p| p.count.is_none()) {
metric_name = EVENT_SYNC_METRIC_NAME_OLD;
match self
.get_peers_counter_metric(metric_name, IPFS_SERVICE_METRICS_PORT)
.await
.map_err(CommandResult::Failure)
{
Ok(v) => v,
Err(e) => return (e, None),
}
} else {
v
}
}
Err(e) => return (e, None),
};

Expand Down Expand Up @@ -920,15 +950,15 @@ impl ScenarioState {
let errors = peer_metrics.iter().flat_map(|p| {
let rps = p.rps();
if rps < threshold {
warn!(current_req_cnt=%p.count, run_time_seconds=?p.runtime, %threshold, %rps, "rps less than threshold");
warn!(current_req_cnt=?p.count, run_time_seconds=?p.runtime, %threshold, %rps, "rps less than threshold");
Some(
format!(
"Peer {} RPS less than threshold: {} < {}",
p.name, rps, threshold
),
)
} else {
info!(current_req_cnt=%p.count, before=%p.count, run_time_seconds=?p.runtime, %threshold, %rps, "success! peer {} over the threshold", p.name);
info!(current_req_cnt=?p.count, before=?p.count, run_time_seconds=?p.runtime, %threshold, %rps, "success! peer {} over the threshold", p.name);
None
}
}
Expand Down

0 comments on commit 03bec17

Please sign in to comment.