Skip to content

Commit

Permalink
chore: use "real" car files for events in scenarios (#171)
Browse files Browse the repository at this point in the history
* chore: update some dependencies

lots of breaking cid, multihash, ipld stuff between c1 versions in http/base. for now working around it

* chore: use real event carfiles for tests

* chore: use a random model in recon tests

don't hard code model to avoid syncing previous data when starting new test. this is a valid test, but different than what we've done so far.
  • Loading branch information
dav1do authored May 13, 2024
1 parent 8729912 commit 1b8968c
Show file tree
Hide file tree
Showing 12 changed files with 404 additions and 319 deletions.
335 changes: 193 additions & 142 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ env_logger = "0.10.0"
expect-patch = { path = "./expect-patch/" }
hex = "0.4.3"
keramik-common = { path = "./common/", default-features = false }
multiaddr = "0.17"
multiaddr = "0.18"
multibase = "0.9.1"
multihash = "0.18"
multihash = { version = "0.19" }
multihash-codetable = { version = "0.1", features = ["sha2", "sha3"] }
# multihash-derive = { version = "0.9" }
opentelemetry = { version = "0.21", features = ["metrics", "trace"] }
opentelemetry-otlp = { version = "0.14", features = [
"metrics",
Expand Down
1 change: 1 addition & 0 deletions operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ kube = { version = "0.88", features = [
multiaddr = { workspace = true, optional = true }
multibase = { workspace = true, optional = true }
multihash = { workspace = true, optional = true }
libp2p-identity = "0.2"
opentelemetry = { workspace = true, optional = true }
rand = { version = "0.8.5" }
reqwest = { workspace = true, optional = true }
Expand Down
8 changes: 4 additions & 4 deletions operator/src/network/ipfs_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ impl IpfsRpcClient for HttpRpcClient {
addresses: Vec<String>,
}
let data: Response = resp.json().await?;

let p2p_proto = Protocol::P2p(Multihash::from_bytes(
&multibase::Base::Base58Btc.decode(data.id.clone())?,
)?);
let hash = Multihash::from_bytes(&multibase::Base::Base58Btc.decode(data.id.clone())?)?;
let peer_id = libp2p_identity::PeerId::from_multihash(hash)
.map_err(|e| anyhow!("failed to build multiash: {:?}", e))?;
let p2p_proto = Protocol::P2p(peer_id);
// We expect to find at least one non loop back address
let p2p_addrs = data
.addresses
Expand Down
8 changes: 7 additions & 1 deletion runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ did-method-key = "0.2"
goose = { version = "0.16", features = ["gaggle"] }
hex.workspace = true
keramik-common = { workspace = true, features = ["telemetry", "tokio-console"] }
libipld = { version = "0.16.0", features = ["serde-codec"] }
ipld-core = "0.4"
ipld-dagpb = "0.2"
multihash.workspace = true
multihash-codetable.workspace = true
opentelemetry.workspace = true
rand = "0.8.5"
redis = { version = "0.24", features = ["tokio-comp"] }
reqwest.workspace = true
serde = { version = "1.0", features = ["derive"] }
serde_ipld_dagcbor = "0.6"
serde_ipld_dagjson = "0.2"
schemars.workspace = true
serde_json.workspace = true
tokio.workspace = true
Expand All @@ -35,5 +39,7 @@ uuid = { version = "1.6.1", features = ["v4"] }
chrono = "0.4.31"
ed25519-dalek = "2.1"

unsigned-varint = "0.8" # temporary until we can use our http client updated for new c1

[dev-dependencies]
test-log = "0.2"
60 changes: 12 additions & 48 deletions runner/src/scenario/ceramic/anchor.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
use anyhow::Result;
use base64::{engine::general_purpose, Engine};
use ceramic_http_client::ceramic_event::{
Cid, DidDocument, JwkSigner, Jws, StreamId, StreamIdType,
};
use ceramic_core::{Cid, DagCborEncoded};
use ceramic_http_client::ceramic_event::{DidDocument, JwkSigner, Jws, StreamId};
use chrono::Utc;
use goose::prelude::*;
use ipld_core::ipld;
use iroh_car::{CarHeader, CarWriter};
use libipld::{cbor::DagCborCodec, ipld, prelude::Codec, Ipld, IpldCodec};
use multihash::{Code::Sha2_256, MultihashDigest};
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use multihash_codetable::{Code, MultihashDigest};

use redis::{aio::MultiplexedConnection, AsyncCommands};
use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, sync::Arc};
use std::sync::Arc;
use tokio::sync::Mutex;
use uuid::Uuid;

use crate::scenario::get_redis_client;
use crate::scenario::{get_redis_client, util::DAG_CBOR_CODEC};

#[derive(Serialize, Deserialize)]
struct CasAuthPayload {
Expand Down Expand Up @@ -59,8 +57,8 @@ pub async fn stream_tip_car(
"tip": genesis_cid,
});

let ipld_bytes = DagCborCodec.encode(&root_block)?;
let root_cid = Cid::new_v1(IpldCodec::DagCbor.into(), Sha2_256.digest(&ipld_bytes));
let ipld_bytes = DagCborEncoded::new(&root_block)?;
let root_cid = Cid::new_v1(DAG_CBOR_CODEC, Code::Sha2_256.digest(ipld_bytes.as_ref()));
let car_header = CarHeader::new_v1(vec![root_cid]);
let mut car_writer = CarWriter::new(car_header, Vec::new());
car_writer.write(root_cid, ipld_bytes).await.unwrap();
Expand All @@ -80,14 +78,14 @@ pub async fn create_anchor_request_on_cas(
.unwrap_or_else(|_| "https://cas-dev.3boxlabs.com".to_string());
let node_controller = std::env::var("node_controller")
.unwrap_or_else(|_| "did:key:z6Mkh3pajt5brscshuDrCCber9nC9Ujpi7EcECveKtJPMEPo".to_string());
let (stream_id, genesis_cid, genesis_block) = create_stream(StreamIdType::Tile).unwrap();
let (stream_id, genesis_cid, genesis_block) = crate::scenario::util::create_stream().unwrap();

let (root_cid, car_bytes) = stream_tip_car(
stream_id.clone(),
genesis_cid,
genesis_block.clone(),
genesis_block.as_ref().to_vec(),
genesis_cid,
genesis_block,
genesis_block.as_ref().to_vec(),
)
.await
.unwrap();
Expand Down Expand Up @@ -131,37 +129,3 @@ pub async fn cas_benchmark() -> Result<Scenario, GooseError> {

Ok(scenario!("CeramicCasBenchmark").register_transaction(create_anchor_request))
}

/// Create a new Ceramic stream
pub fn create_stream(stream_type: StreamIdType) -> Result<(StreamId, Cid, Vec<u8>)> {
let controller: String = thread_rng()
.sample_iter(&Alphanumeric)
.take(32)
.map(char::from)
.collect();

let genesis_commit = ipld!({
"header": {
"unique": stream_unique_header(),
"controllers": [controller]
}
});
// Deserialize the genesis commit, encode it as CBOR, and compute the CID.
let ipld_map: BTreeMap<String, Ipld> = libipld::serde::from_ipld(genesis_commit)?;
let ipld_bytes = DagCborCodec.encode(&ipld_map)?;
let genesis_cid = Cid::new_v1(IpldCodec::DagCbor.into(), Sha2_256.digest(&ipld_bytes));
Ok((
StreamId {
r#type: stream_type,
cid: genesis_cid,
},
genesis_cid,
ipld_bytes,
))
}

fn stream_unique_header() -> String {
let mut data = [0u8; 8];
thread_rng().fill(&mut data);
general_purpose::STANDARD.encode(data)
}
15 changes: 7 additions & 8 deletions runner/src/scenario/ceramic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ mod models;
pub mod new_streams;
pub mod query;
pub mod simple;
pub mod util;
pub mod write_only;

use ceramic_core::ssi::did::{DIDMethod, DocumentBuilder, Source};
use ceramic_core::ssi::did::{DIDMethod, Document, DocumentBuilder, Source};
use ceramic_core::ssi::jwk::{self, Base64urlUInt, Params, JWK};
use ceramic_http_client::ceramic_event::{DidDocument, JwkSigner};
use ceramic_http_client::ceramic_event::JwkSigner;
use ceramic_http_client::CeramicHttpClient;

use models::RandomModelInstance;
Expand All @@ -22,12 +21,12 @@ pub type CeramicClient = CeramicHttpClient<JwkSigner>;
#[derive(Clone)]
pub struct Credentials {
pub signer: JwkSigner,
pub did: DidDocument,
pub did: Document,
}

impl Credentials {
pub async fn from_env() -> Result<Self, anyhow::Error> {
let did = DidDocument::new(&std::env::var("DID_KEY").expect("DID_KEY is required"));
let did = Document::new(&std::env::var("DID_KEY").expect("DID_KEY is required"));
let private_key = std::env::var("DID_PRIVATE_KEY").expect("DID_PRIVATE_KEY is required");
let signer = JwkSigner::new(did.clone(), &private_key).await?;
Ok(Self { signer, did })
Expand Down Expand Up @@ -60,12 +59,12 @@ impl Credentials {
Ok(Self { signer, did })
}

fn generate_did_for_jwk(key: &JWK) -> anyhow::Result<DidDocument> {
fn generate_did_for_jwk(key: &JWK) -> anyhow::Result<Document> {
let did = did_method_key::DIDKey
.generate(&Source::Key(key))
.ok_or_else(|| anyhow::anyhow!("Failed to generate DID"))?;

let doc: DidDocument = DocumentBuilder::default()
let doc = DocumentBuilder::default()
.id(did)
.build()
.map_err(|e| anyhow::anyhow!("failed to build DID document: {}", e))?;
Expand All @@ -74,7 +73,7 @@ impl Credentials {
}

/// Returns (Private Key, DID Document)
fn generate_did_and_pk() -> anyhow::Result<(String, DidDocument)> {
fn generate_did_and_pk() -> anyhow::Result<(String, Document)> {
let key = jwk::JWK::generate_ed25519()?;
let private_key = if let Params::OKP(params) = &key.params {
let pk = params
Expand Down
23 changes: 0 additions & 23 deletions runner/src/scenario/ceramic/util.rs

This file was deleted.

23 changes: 13 additions & 10 deletions runner/src/scenario/ipfs_block_fetch.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::{sync::Arc, time::Duration};

use anyhow::Result;
use ceramic_core::Cid;
use ceramic_core::{Cid, DagCborEncoded};
use goose::prelude::*;
use libipld::prelude::Codec;
use libipld::{ipld, json::DagJsonCodec};
use multihash::{Code, MultihashDigest};
use std::{sync::Arc, time::Duration};
use ipld_core::ipld;
use multihash_codetable::{Code, MultihashDigest};

use crate::simulate::Topology;

use super::util::DAG_CBOR_CODEC;

pub fn scenario(topo: Topology) -> Result<Scenario> {
let put: Transaction = Transaction::new(Arc::new(move |user| {
Box::pin(async move { put(topo, user).await })
Expand Down Expand Up @@ -40,22 +42,22 @@ fn global_user_id(user: usize, topo: Topology) -> u64 {
}

/// Produce DAG-JSON IPLD node that contains determisiticly unique data for the user.
fn user_data(local_user: usize, topo: Topology) -> (Cid, Vec<u8>) {
fn user_data(local_user: usize, topo: Topology) -> (Cid, DagCborEncoded) {
let id = global_user_id(local_user, topo);
let data = ipld!({
"user": id,
"nonce": topo.nonce,
});

let bytes = DagJsonCodec.encode(&data).unwrap();

let hash = Code::Sha2_256.digest(bytes.as_slice());
(Cid::new_v1(DagJsonCodec.into(), hash), bytes)
let bytes = DagCborEncoded::new(&data).unwrap();
let cid = Cid::new_v1(DAG_CBOR_CODEC, Code::Sha2_256.digest(bytes.as_ref()));
(cid, bytes)
}

// Generate determisitic random data and put it into IPFS
async fn put(topo: Topology, user: &mut GooseUser) -> TransactionResult {
let (cid, data) = user_data(user.weighted_users_index, topo);
let data = data.as_ref().to_vec();
println!(
"put id: {} user: {} nonce: {} cid: {}",
topo.target_worker, user.weighted_users_index, topo.nonce, cid,
Expand Down Expand Up @@ -118,6 +120,7 @@ async fn get(mut topo: Topology, user: &mut GooseUser) -> TransactionResult {
// Check that all written data is accounted for.
async fn check(topo: Topology, user: &mut GooseUser) -> TransactionResult {
let (cid, data) = user_data(user.weighted_users_index, topo);
let data = data.as_ref().to_vec();
println!(
"stop id: {} user: {} cid: {}",
topo.target_worker, user.weighted_users_index, cid,
Expand Down
3 changes: 2 additions & 1 deletion runner/src/scenario/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::scenario::ceramic::util::goose_error;
use crate::scenario::util::goose_error;
use goose::GooseError;

pub mod ceramic;
pub mod ipfs_block_fetch;
pub mod recon_sync;
pub mod util;

static FIRST_USER: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(true);

Expand Down
Loading

0 comments on commit 1b8968c

Please sign in to comment.