Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds an info and list command for getting more details on subscriptions #260

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/corro-admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ tokio-serde = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }
tripwire = { path = "../tripwire" }
rangemap = { workspace = true }
rangemap = { workspace = true }
uuid = { workspace = true }
67 changes: 67 additions & 0 deletions crates/corro-admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tokio_serde::{formats::Json, Framed};
use tokio_util::codec::LengthDelimitedCodec;
use tracing::{debug, error, info, warn};
use tripwire::Tripwire;
use uuid::Uuid;

#[derive(Debug, thiserror::Error)]
pub enum AdminError {
Expand Down Expand Up @@ -96,6 +97,7 @@ pub enum Command {
Locks { top: usize },
Cluster(ClusterCommand),
Actor(ActorCommand),
Subs(SubsCommand),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -104,6 +106,15 @@ pub enum SyncCommand {
ReconcileGaps,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SubsCommand {
Info {
hash: Option<String>,
id: Option<Uuid>,
},
List,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ClusterCommand {
Rejoin,
Expand Down Expand Up @@ -521,6 +532,62 @@ async fn handle_conn(

send_success(&mut stream).await;
}
Command::Subs(SubsCommand::List) => {
let handles = agent.subs_manager().get_handles();
let uuid_to_hash = handles
.iter()
.map(|(k, v)| {
json!({
"id": k,
"hash": v.hash(),
"sql": v.sql().lines().map(|c| c.trim()).collect::<Vec<_>>().join(" "),
})
})
.collect::<Vec<_>>();

send(&mut stream, Response::Json(serde_json::json!(uuid_to_hash))).await;
send_success(&mut stream).await;
}
Command::Subs(SubsCommand::Info { hash, id }) => {
let matcher_handle = match (hash, id) {
(Some(hash), _) => agent.subs_manager().get_by_hash(&hash),
(None, Some(id)) => agent.subs_manager().get(&id),
(None, None) => {
send_error(&mut stream, "specify hash or id for subscription").await;
continue;
}
};
match matcher_handle {
Some(matcher) => {
let statements = matcher
.cached_stmts()
.iter()
.map(|(table, stmts)| {
json!({
table: stmts.new_query(),
})
})
.collect::<Vec<_>>();
send(
&mut stream,
Response::Json(serde_json::json!({
"id": matcher.id(),
"hash": matcher.hash(),
"path": matcher.subs_path(),
"last_change_id": matcher.last_change_id_sent(),
"original_query": matcher.sql().lines().map(|c| c.trim()).collect::<Vec<_>>().join(" "),
"statements": statements,
})),
)
.await;
send_success(&mut stream).await;
}
None => {
send_error(&mut stream, "unknown subscription hash or id").await;
continue;
}
};
}
},
Ok(None) => {
debug!("done with admin conn");
Expand Down
38 changes: 38 additions & 0 deletions crates/corro-types/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ impl SubsManager {
self.0.read().get_by_query(sql)
}

pub fn get_by_hash(&self, hash: &str) -> Option<MatcherHandle> {
self.0.read().get_by_hash(hash)
}

pub fn get_handles(&self) -> BTreeMap<Uuid, MatcherHandle> {
self.0.read().handles.clone()
}

pub fn get_or_insert(
&self,
sql: &str,
Expand Down Expand Up @@ -328,6 +336,13 @@ impl InnerSubsManager {
.and_then(|id| self.handles.get(id).cloned())
}

pub fn get_by_hash(&self, hash: &str) -> Option<MatcherHandle> {
self.handles
.values()
.find(|x| x.inner.hash == hash)
.cloned()
}

fn remove(&mut self, id: &Uuid) -> Option<MatcherHandle> {
let handle = self.handles.remove(id)?;
self.queries.remove(&handle.inner.sql);
Expand Down Expand Up @@ -364,6 +379,9 @@ struct InnerMatcherHandle {
cancel: CancellationToken,
changes_tx: mpsc::Sender<(MatchCandidates, CrsqlDbVersion)>,
last_change_rx: watch::Receiver<ChangeId>,
// some state from the matcher so we can take a look later
subs_path: String,
cached_statements: HashMap<String, MatcherStmt>,
}

type MatchCandidates = IndexMap<TableName, IndexSet<Vec<u8>>>;
Expand All @@ -373,6 +391,10 @@ impl MatcherHandle {
self.inner.id
}

pub fn sql(&self) -> &String {
&self.inner.sql
}

pub fn hash(&self) -> &str {
&self.inner.hash
}
Expand All @@ -385,6 +407,14 @@ impl MatcherHandle {
&self.inner.col_names
}

pub fn subs_path(&self) -> &String {
&self.inner.subs_path
}

pub fn cached_stmts(&self) -> &HashMap<String, MatcherStmt> {
&self.inner.cached_statements
}

pub async fn cleanup(self) {
self.inner.cancel.cancel();
info!(sub_id = %self.inner.id, "Canceled subscription");
Expand Down Expand Up @@ -593,6 +623,12 @@ pub struct MatcherStmt {
temp_query: String,
}

impl MatcherStmt {
pub fn new_query(self: &Self) -> &String {
return &self.new_query;
}
}

const CHANGE_ID_COL: &str = "id";
const CHANGE_TYPE_COL: &str = "type";

Expand Down Expand Up @@ -819,6 +855,8 @@ impl Matcher {
cancel: cancel.clone(),
last_change_rx,
changes_tx,
cached_statements: statements.clone(),
subs_path: sub_path.to_string(),
}),
state: state.clone(),
};
Expand Down
1 change: 0 additions & 1 deletion crates/corro-types/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ impl From<SyncStateV1> for SyncMessage {
}
}


// generates a `SyncMessage` to tell another node what versions we're missing
#[tracing::instrument(skip_all, level = "debug")]
pub async fn generate_sync(bookie: &Bookie, self_actor_id: ActorId) -> SyncStateV1 {
Expand Down
30 changes: 30 additions & 0 deletions crates/corrosion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,19 @@ async fn process_cli(cli: Cli) -> eyre::Result<()> {
info!("Exited with code: {:?}", exit.code());
std::process::exit(exit.code().unwrap_or(1));
}
Command::Subs(SubsCommand::Info { hash, id }) => {
let mut conn = AdminConn::connect(cli.admin_path()).await?;
conn.send_command(corro_admin::Command::Subs(corro_admin::SubsCommand::Info {
hash: hash.clone(),
id: *id,
}))
.await?;
}
Command::Subs(SubsCommand::List) => {
let mut conn = AdminConn::connect(cli.admin_path()).await?;
conn.send_command(corro_admin::Command::Subs(corro_admin::SubsCommand::List))
.await?;
}
}

Ok(())
Expand Down Expand Up @@ -686,6 +699,10 @@ enum Command {
/// DB-related commands
#[command(subcommand)]
Db(DbCommand),

/// Subscription related commands
#[command(subcommand)]
Subs(SubsCommand),
}

#[derive(Subcommand)]
Expand Down Expand Up @@ -769,3 +786,16 @@ enum DbCommand {
/// Acquires the lock on the DB
Lock { cmd: String },
}

#[derive(Subcommand)]
enum SubsCommand {
/// List all subscriptions on a node
List,
/// Get information on a subscription
Info {
#[arg(long)]
hash: Option<String>,
#[arg(long)]
id: Option<Uuid>,
},
}
Loading