diff --git a/Cargo.lock b/Cargo.lock index 10b8c429..84b94d90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -830,6 +830,7 @@ dependencies = [ "consul-client", "corro-admin", "corro-agent", + "corro-api-types", "corro-client", "corro-tests", "corro-tpl", diff --git a/crates/corro-api-types/src/lib.rs b/crates/corro-api-types/src/lib.rs index 145ad730..c40a1212 100644 --- a/crates/corro-api-types/src/lib.rs +++ b/crates/corro-api-types/src/lib.rs @@ -168,6 +168,33 @@ impl ColumnType { _ => return None, }) } + + pub fn from_str(s: &str) -> Option { + Some(match s { + "INTEGER" => Self::Integer, + "REAL" => Self::Float, + "TEXT" => Self::Text, + "BLOB" => Self::Blob, + _ => return None, + }) + } +} + +impl FromSql for ColumnType { + fn column_result(value: ValueRef<'_>) -> rusqlite::types::FromSqlResult { + match value { + ValueRef::Text(s) => Ok(match String::from_utf8_lossy(s).as_ref() { + "INTEGER" => Self::Integer, + "REAL" => Self::Float, + "TEXT" => Self::Text, + "BLOB" => Self::Blob, + _ => { + return Err(FromSqlError::InvalidType); + } + }), + _ => Err(FromSqlError::InvalidType), + } + } } #[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)] diff --git a/crates/corro-types/src/config.rs b/crates/corro-types/src/config.rs index 08ceb01b..4c709b0b 100644 --- a/crates/corro-types/src/config.rs +++ b/crates/corro-types/src/config.rs @@ -273,10 +273,5 @@ pub enum LogFormat { #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(rename_all = "kebab-case")] pub struct ConsulConfig { - #[serde(default)] - pub extra_services_columns: Vec, - #[serde(default)] - pub extra_statements: Vec, - pub client: consul_client::Config, } diff --git a/crates/corrosion/Cargo.toml b/crates/corrosion/Cargo.toml index 4f4e4da5..7035813c 100644 --- a/crates/corrosion/Cargo.toml +++ b/crates/corrosion/Cargo.toml @@ -20,6 +20,7 @@ corro-admin = { path = "../corro-admin" } corro-agent = { path = "../corro-agent" } corro-client = { path = "../corro-client" } corro-types = { path = "../corro-types" } +corro-api-types = { path = "../corro-api-types" } crc32fast = { workspace = true } eyre = { workspace = true } fallible-iterator = { workspace = true } diff --git a/crates/corrosion/src/command/consul/sync.rs b/crates/corrosion/src/command/consul/sync.rs index 6e44ed86..e370aa65 100644 --- a/crates/corrosion/src/command/consul/sync.rs +++ b/crates/corrosion/src/command/consul/sync.rs @@ -1,4 +1,5 @@ use consul_client::{AgentCheck, AgentService, Client}; +use corro_api_types::ColumnType; use corro_client::CorrosionClient; use corro_types::{api::Statement, config::ConsulConfig}; use metrics::{histogram, increment_counter}; @@ -35,9 +36,7 @@ pub async fn run>( info!("Setting up corrosion for consul sync"); setup( - &corrosion, - config.extra_services_columns.as_slice(), - config.extra_statements.as_slice(), + &corrosion ) .await?; @@ -118,12 +117,9 @@ pub async fn run>( async fn setup( corrosion: &CorrosionClient, - extra_services_columns: &[String], - extra_statements: &[String], ) -> eyre::Result<()> { + let mut conn = corrosion.pool().get().await?; { - let mut conn = corrosion.pool().get().await?; - let tx = conn.transaction()?; info!("Creating internal tables"); @@ -143,51 +139,51 @@ async fn setup( tx.commit()?; } info!("Ensuring schema..."); - corrosion - .schema(&build_schema(extra_services_columns, extra_statements)) - .await?; - Ok(()) -} -fn build_schema(extra_services_columns: &[String], extra_statements: &[String]) -> Vec { - let extra = extra_services_columns.join(","); - let mut statements = vec![ - Statement::Simple(format!("CREATE TABLE consul_services ( - node TEXT NOT NULL, - id TEXT NOT NULL, - name TEXT NOT NULL DEFAULT '', - tags TEXT NOT NULL DEFAULT '[]', - meta TEXT NOT NULL DEFAULT '{{}}', - port INTEGER NOT NULL DEFAULT 0, - address TEXT NOT NULL DEFAULT '', - updated_at INTEGER NOT NULL DEFAULT 0, - - {} - - PRIMARY KEY (node, id) - ) WITHOUT ROWID;", if extra.is_empty() { String::new() } else {format!("{extra},")})), - Statement::Simple("CREATE INDEX consul_services_node_id_updated_at ON consul_services (node, id, updated_at);".to_string()), - - Statement::Simple("CREATE TABLE consul_checks ( - node TEXT NOT NULL, - id TEXT NOT NULL, - service_id TEXT NOT NULL DEFAULT '', - service_name TEXT NOT NULL DEFAULT '', - name TEXT NOT NULL DEFAULT '', - status TEXT NOT NULL DEFAULT '', - output TEXT NOT NULL DEFAULT '', - updated_at INTEGER NOT NULL DEFAULT 0, - PRIMARY KEY (node, id) - ) WITHOUT ROWID;".to_string()), - Statement::Simple("CREATE INDEX consul_checks_node_id_updated_at ON consul_checks (node, id, updated_at);".to_string()), - Statement::Simple("CREATE INDEX consul_checks_node_service_id ON consul_checks (node, service_id);".to_string()), + struct ColumnInfo { + name: String, + kind: corro_api_types::ColumnType + } + + let col_infos: Vec = conn.prepare("PRAGMA table_info(consul_services)")?.query_map([], |row| Ok(ColumnInfo { name: row.get(1)?, kind: row.get(2)? })).map_err(|e| eyre::eyre!("could not query consul_services' table_info: {e}"))?.collect::, _>>()?; + + let expected_cols = [ + ("node", ColumnType::Text), + ("id", ColumnType::Text), + ("name", ColumnType::Text), + ("tags", ColumnType::Text), + ("meta", ColumnType::Text), + ("port", ColumnType::Integer), + ("address", ColumnType::Text), + ("updated_at", ColumnType::Integer), + ]; + + for (name, kind) in expected_cols { + if col_infos.iter().find(|info| info.name == name && info.kind == kind ).is_none() { + eyre::bail!("expected a column consul_services.{name} w/ type {kind:?}"); + } + } + + let col_infos: Vec = conn.prepare("PRAGMA table_info(consul_checks)")?.query_map([], |row| Ok(ColumnInfo { name: row.get(1)?, kind: row.get(2)? })).map_err(|e| eyre::eyre!("could not query consul_checks' table_info: {e}"))?.collect::, _>>()?; + + let expected_cols = [ + ("node", ColumnType::Text), + ("id", ColumnType::Text), + ("service_id", ColumnType::Text), + ("service_name", ColumnType::Text), + ("name", ColumnType::Text), + ("status", ColumnType::Text), + ("output", ColumnType::Text), + ("updated_at", ColumnType::Integer), ]; - for s in extra_statements { - statements.push(Statement::Simple(s.clone())); + for (name, kind) in expected_cols { + if col_infos.iter().find(|info| info.name == name && info.kind == kind ).is_none() { + eyre::bail!("expected a column consul_checks.{name} w/ type {kind:?}"); + } } - statements + Ok(()) } #[derive(Debug, Serialize, Deserialize)] @@ -582,10 +578,39 @@ mod tests { _ = tracing_subscriber::fmt::try_init(); let (tripwire, tripwire_worker, tripwire_tx) = Tripwire::new_simple(); - let ta1 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?; + let tmpdir = tempfile::TempDir::new()?; + tokio::fs::write(tmpdir.path().join("consul.sql"), b" + CREATE TABLE consul_services ( + node TEXT NOT NULL, + id TEXT NOT NULL, + name TEXT NOT NULL DEFAULT '', + tags TEXT NOT NULL DEFAULT '[]', + meta TEXT NOT NULL DEFAULT '{}', + port INTEGER NOT NULL DEFAULT 0, + address TEXT NOT NULL DEFAULT '', + updated_at INTEGER NOT NULL DEFAULT 0, + app_id INTEGER AS (CAST(JSON_EXTRACT(meta, '$.app_id') AS INTEGER)), + + PRIMARY KEY (node, id) + ); + + CREATE TABLE consul_checks ( + node TEXT NOT NULL, + id TEXT NOT NULL, + service_id TEXT NOT NULL DEFAULT '', + service_name TEXT NOT NULL DEFAULT '', + name TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT '', + output TEXT NOT NULL DEFAULT '', + updated_at INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (node, id) + ); + ").await?; + + let ta1 = launch_test_agent(|conf| conf.add_schema_path(tmpdir.path().display().to_string()).build(), tripwire.clone()).await?; let ta2 = launch_test_agent( |conf| { - conf.bootstrap(vec![ta1.agent.gossip_addr().to_string()]) + conf.bootstrap(vec![ta1.agent.gossip_addr().to_string()]).add_schema_path(tmpdir.path().display().to_string()) .build() }, tripwire.clone(), @@ -596,8 +621,6 @@ mod tests { setup( &ta1_client, - &["app_id INTEGER AS (CAST (JSON_EXTRACT (meta, '$.app_id') AS INTEGER))".into()], - &[], ) .await?; @@ -653,8 +676,6 @@ mod tests { setup( &ta2_client, - &["app_id INTEGER AS (CAST (JSON_EXTRACT (meta, '$.app_id') AS INTEGER))".into()], - &[], ) .await?;