Skip to content

Commit

Permalink
feat: aes 84 validate correctness in keramik ceramic anchoring benchm…
Browse files Browse the repository at this point in the history
…ark (#184)

* feat: add simple validation logic in ceramic anchoring benchamrk

* feat: sample validation of anchors + making perf test more configurable through the yaml

* feat: tests for operator

* feat: pass cas_controller to cas-benchmark scenario

* chore: add TODO's for teh next PR

* fix: allow dead code for struct vars that we do not want to implement/read

* chore: inline comments

* chore: merge conflict

* fix: log errors while writing to redis

* fix: use statuc http client

* fix: use static client correctly

* fix: add cargo.lock

---------

Co-authored-by: Samika Kashyap <[email protected]>
  • Loading branch information
samika98 and Samika Kashyap authored Jun 18, 2024
1 parent dbf521e commit 2e6c608
Show file tree
Hide file tree
Showing 11 changed files with 396 additions and 43 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

102 changes: 102 additions & 0 deletions operator/src/simulation/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ async fn reconcile_(
throttle_requests: spec.throttle_requests,
success_request_target: spec.success_request_target,
log_level: spec.log_level.clone(),
anchor_wait_time: spec.anchor_wait_time,
cas_network: spec.cas_network.clone(),
cas_controller: spec.cas_controller.clone(),
};

apply_manager(cx.clone(), &ns, simulation.clone(), manager_config).await?;
Expand Down Expand Up @@ -765,4 +768,103 @@ mod tests {
.expect("reconciler");
timeout_after_1s(mocksrv).await;
}

#[tokio::test]
#[traced_test]
async fn reconcile_anchor_wait_time() {
let mock_rpc_client = MockIpfsRpcClientTest::new();
let (testctx, api_handle) = Context::test(mock_rpc_client);
let fakeserver = ApiServerVerifier::new(api_handle);
let simulation = Simulation::test().with_spec(SimulationSpec {
anchor_wait_time: Some(15), // Set anchor wait time to 15 minutes
..Default::default()
});
let mut stub = Stub::default();
stub.manager_job.patch(expect![[r#"
--- original
+++ modified
@@ -87,6 +87,10 @@
"name": "ceramic-admin"
}
}
+ },
+ {
+ "name": "SIMULATE_ANCHOR_WAIT_TIME",
+ "value": "15"
}
],
"image": "public.ecr.aws/r5b3e0r5/3box/keramik-runner:latest",
"#]]);
let mocksrv = stub.run(fakeserver);
reconcile(Arc::new(simulation), testctx)
.await
.expect("reconciler");
timeout_after_1s(mocksrv).await;
}

#[tokio::test]
#[traced_test]
async fn reconcile_cas_network() {
let mock_rpc_client = MockIpfsRpcClientTest::new();
let (testctx, api_handle) = Context::test(mock_rpc_client);
let fakeserver = ApiServerVerifier::new(api_handle);
let simulation = Simulation::test().with_spec(SimulationSpec {
cas_network: Some("test-cas-network".to_owned()),
..Default::default()
});
let mut stub = Stub::default();
stub.manager_job.patch(expect![[r#"
--- original
+++ modified
@@ -87,6 +87,10 @@
"name": "ceramic-admin"
}
}
+ },
+ {
+ "name": "SIMULATE_CAS_NETWORK",
+ "value": "test-cas-network"
}
],
"image": "public.ecr.aws/r5b3e0r5/3box/keramik-runner:latest",
"#]]);
let mocksrv = stub.run(fakeserver);
reconcile(Arc::new(simulation), testctx)
.await
.expect("reconciler");
timeout_after_1s(mocksrv).await;
}

#[tokio::test]
#[traced_test]
async fn reconcile_cas_controller() {
let mock_rpc_client = MockIpfsRpcClientTest::new();
let (testctx, api_handle) = Context::test(mock_rpc_client);
let fakeserver = ApiServerVerifier::new(api_handle);
let simulation = Simulation::test().with_spec(SimulationSpec {
cas_controller: Some("test-cas-controller".to_owned()),
..Default::default()
});
let mut stub = Stub::default();
stub.manager_job.patch(expect![[r#"
--- original
+++ modified
@@ -87,6 +87,10 @@
"name": "ceramic-admin"
}
}
+ },
+ {
+ "name": "SIMULATE_CAS_CONTROLLER",
+ "value": "test-cas-controller"
}
],
"image": "public.ecr.aws/r5b3e0r5/3box/keramik-runner:latest",
"#]]);
let mocksrv = stub.run(fakeserver);
reconcile(Arc::new(simulation), testctx)
.await
.expect("reconciler");
timeout_after_1s(mocksrv).await;
}
}
24 changes: 24 additions & 0 deletions operator/src/simulation/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ pub struct ManagerConfig {
pub job_image_config: JobImageConfig,
pub success_request_target: Option<usize>,
pub log_level: Option<String>,
pub anchor_wait_time: Option<u32>,
pub cas_network: Option<String>,
pub cas_controller: Option<String>,
}

pub fn manager_job_spec(config: ManagerConfig) -> JobSpec {
Expand Down Expand Up @@ -131,6 +134,27 @@ pub fn manager_job_spec(config: ManagerConfig) -> JobSpec {
..Default::default()
})
}
if let Some(anchor_wait_time) = config.anchor_wait_time {
env_vars.push(EnvVar {
name: "SIMULATE_ANCHOR_WAIT_TIME".to_owned(),
value: Some(anchor_wait_time.to_string()),
..Default::default()
})
}
if let Some(cas_network) = config.cas_network {
env_vars.push(EnvVar {
name: "SIMULATE_CAS_NETWORK".to_owned(),
value: Some(cas_network.to_owned()),
..Default::default()
})
}
if let Some(cas_controller) = config.cas_controller {
env_vars.push(EnvVar {
name: "SIMULATE_CAS_CONTROLLER".to_owned(),
value: Some(cas_controller.to_owned()),
..Default::default()
})
}
if let Some(log_level) = config.log_level {
env_vars.push(EnvVar {
name: "SIMULATE_LOG_LEVEL".to_owned(),
Expand Down
6 changes: 6 additions & 0 deletions operator/src/simulation/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ pub struct SimulationSpec {
/// which consumes RAM, and will disappear if there is no persistent volume when the pod exits.
/// Valid values: 'warn', 'info', 'debug', 'trace'. Defaults to None meaning no logging beyond RUST_LOG.
pub(crate) log_level: Option<String>,
/// Anchor wait time in seconds, use with ceramic-anchoring-benchmark scenario
pub anchor_wait_time: Option<u32>,
/// Network type to use for the simulation, use with cas-benchmark scenario
pub cas_network: Option<String>,
/// Controller DID for the simulation, use with cas-benchmark scenario
pub cas_controller: Option<String>,
}

impl Simulation {
Expand Down
1 change: 1 addition & 0 deletions runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ serde_ipld_dagcbor = "0.6"
serde_ipld_dagjson = "0.2"
schemars.workspace = true
serde_json.workspace = true
once_cell = "1.19.0"
tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
Expand Down
21 changes: 15 additions & 6 deletions runner/src/scenario/ceramic/anchor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ pub async fn stream_tip_car(
pub async fn create_anchor_request_on_cas(
user: &mut GooseUser,
conn: Arc<tokio::sync::Mutex<MultiplexedConnection>>,
cas_network: Option<String>,
cas_controller: Option<String>,
) -> TransactionResult {
let cas_service_url = std::env::var("CAS_SERVICE_URL")
.unwrap_or_else(|_| "https://cas-dev.3boxlabs.com".to_string());
let node_controller = std::env::var("node_controller")
.unwrap_or_else(|_| "did:key:z6Mkh3pajt5brscshuDrCCber9nC9Ujpi7EcECveKtJPMEPo".to_string());
let cas_service_url =
cas_network.unwrap_or_else(|| "https://cas-dev-direct.3boxlabs.com".to_string());
let node_controller = cas_controller
.unwrap_or_else(|| "did:key:z6Mkh3pajt5brscshuDrCCber9nC9Ujpi7EcECveKtJPMEPo".to_string());
let (stream_id, genesis_cid, genesis_block) = crate::scenario::util::create_stream().unwrap();

let (root_cid, car_bytes) = stream_tip_car(
Expand Down Expand Up @@ -118,13 +120,20 @@ pub async fn create_anchor_request_on_cas(
Ok(())
}

pub async fn cas_benchmark() -> Result<Scenario, GooseError> {
pub async fn cas_benchmark(
cas_network: Option<String>,
cas_controller: Option<String>,
) -> Result<Scenario, GooseError> {
let redis_cli = get_redis_client().await.unwrap();
let multiplexed_conn = redis_cli.get_multiplexed_tokio_connection().await.unwrap();
let conn_mutex = Arc::new(Mutex::new(multiplexed_conn));
let create_anchor_request = Transaction::new(Arc::new(move |user| {
let conn_mutex_clone = conn_mutex.clone();
Box::pin(async move { create_anchor_request_on_cas(user, conn_mutex_clone).await })
let cas_network = cas_network.clone();
let cas_controller = cas_controller.clone();
Box::pin(async move {
create_anchor_request_on_cas(user, conn_mutex_clone, cas_network, cas_controller).await
})
}))
.set_name("create_anchor_request");

Expand Down
1 change: 1 addition & 0 deletions runner/src/scenario/ceramic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub type CeramicClient = CeramicHttpClient<JwkSigner>;
#[derive(Clone)]
pub struct Credentials {
pub signer: JwkSigner,
#[allow(dead_code)]
pub did: Document,
}

Expand Down
1 change: 1 addition & 0 deletions runner/src/scenario/ceramic/model_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub struct EnvBasedConfig {

#[derive(Clone, Debug)]
pub struct GooseUserInfo {
#[allow(dead_code)]
pub global_leader: bool,
/// True if this user is the lead user on the worker
pub lead_user: bool,
Expand Down
74 changes: 58 additions & 16 deletions runner/src/scenario/ceramic/new_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tracing::info;

use crate::scenario::{
ceramic::{model_instance::CountResponse, models, RandomModelInstance},
get_redis_client,
get_redis_client, is_goose_lead_worker,
};

use super::{
Expand Down Expand Up @@ -144,7 +144,7 @@ pub async fn small_large_scenario(
let instantiate_small_model = Transaction::new(Arc::new(move |user| {
let small_model_conn_clone = small_model_conn.clone();
Box::pin(async move {
instantiate_small_model(user, params.store_mids, small_model_conn_clone).await
instantiate_small_model(user, params.store_mids, small_model_conn_clone, false).await
})
}))
.set_name("instantiate_small_model");
Expand All @@ -163,6 +163,36 @@ pub async fn small_large_scenario(
.register_transaction(instantiate_large_model))
}

pub async fn small_scenario(params: CeramicScenarioParameters) -> Result<Scenario, GooseError> {
let redis_cli = get_redis_client().await.unwrap();
let multiplexed_conn = redis_cli.get_multiplexed_tokio_connection().await.unwrap();
let shared_conn = Arc::new(Mutex::new(multiplexed_conn));
let config = CeramicModelInstanceTestUser::prep_scenario(params.clone())
.await
.unwrap();

let test_start = Transaction::new(Arc::new(move |user| {
Box::pin(CeramicModelInstanceTestUser::setup_mid_scenario(
user,
config.clone(),
))
}))
.set_name("setup")
.set_on_start();

let instantiate_small_model = Transaction::new(Arc::new(move |user| {
let small_model_conn_clone = shared_conn.clone();
Box::pin(async move {
instantiate_small_model(user, params.store_mids, small_model_conn_clone, true).await
})
}))
.set_name("instantiate_small_model");

Ok(scenario!("CeramicAnchoringBenchmark")
.register_transaction(test_start)
.register_transaction(instantiate_small_model))
}

// the nonce is used to ensure that the metrics stored in redis for the run are unique
pub async fn benchmark_scenario(
params: CeramicScenarioParameters,
Expand Down Expand Up @@ -207,20 +237,28 @@ async fn instantiate_small_model(
user: &mut GooseUser,
store_in_redis: bool,
conn: Arc<tokio::sync::Mutex<MultiplexedConnection>>,
one_writer_per_network: bool,
) -> TransactionResult {
let user_data = CeramicModelInstanceTestUser::user_data(user).to_owned();
let response = ModelInstanceRequests::create_model_instance(
user,
user_data.user_cli(),
&user_data.small_model_id,
"instantiate_small_model_instance",
&models::SmallModel::random(),
)
.await?;
if store_in_redis {
let mut conn: tokio::sync::MutexGuard<'_, MultiplexedConnection> = conn.lock().await;
let stream_id_string = response.to_string();
let _: () = conn.sadd("anchor_mids", stream_id_string).await.unwrap();
let should_create = !one_writer_per_network || is_goose_lead_worker();
if should_create {
let user_data = CeramicModelInstanceTestUser::user_data(user).to_owned();
let response = ModelInstanceRequests::create_model_instance(
user,
user_data.user_cli(),
&user_data.small_model_id,
"instantiate_small_model_instance",
&models::SmallModel::random(),
)
.await?;
if store_in_redis {
let mut conn: tokio::sync::MutexGuard<'_, MultiplexedConnection> = conn.lock().await;
let stream_id_string = response.to_string();
let result: Result<usize, redis::RedisError> =
conn.sadd("anchor_mids", stream_id_string).await;
if let Err(e) = result {
tracing::error!("Failed to add to anchor_mids: {}", e);
}
}
}
Ok(())
}
Expand All @@ -242,7 +280,11 @@ async fn instantiate_large_model(
if store_in_redis {
let mut conn: tokio::sync::MutexGuard<'_, MultiplexedConnection> = conn.lock().await;
let stream_id_string = response.to_string();
let _: () = conn.sadd("anchor_mids", stream_id_string).await.unwrap();
let result: Result<usize, redis::RedisError> =
conn.sadd("anchor_mids", stream_id_string).await;
if let Err(e) = result {
tracing::error!("Failed to add to anchor_mids: {}", e);
}
}
Ok(())
}
Expand Down
Loading

0 comments on commit 2e6c608

Please sign in to comment.