Skip to content

Commit

Permalink
don't build the schema from corro-consul, leave that to corrosion
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromegn committed Aug 21, 2023
1 parent 22401ea commit c265cf9
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 58 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions crates/corro-api-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,33 @@ impl ColumnType {
_ => return None,
})
}

pub fn from_str(s: &str) -> Option<Self> {
Some(match s {
"INTEGER" => Self::Integer,
"REAL" => Self::Float,
"TEXT" => Self::Text,
"BLOB" => Self::Blob,
_ => return None,
})
}
}

impl FromSql for ColumnType {
fn column_result(value: ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
match value {
ValueRef::Text(s) => Ok(match String::from_utf8_lossy(s).as_ref() {
"INTEGER" => Self::Integer,
"REAL" => Self::Float,
"TEXT" => Self::Text,
"BLOB" => Self::Blob,
_ => {
return Err(FromSqlError::InvalidType);
}
}),
_ => Err(FromSqlError::InvalidType),
}
}
}

#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
Expand Down
5 changes: 0 additions & 5 deletions crates/corro-types/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,5 @@ pub enum LogFormat {
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct ConsulConfig {
#[serde(default)]
pub extra_services_columns: Vec<String>,
#[serde(default)]
pub extra_statements: Vec<String>,

pub client: consul_client::Config,
}
1 change: 1 addition & 0 deletions crates/corrosion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ corro-admin = { path = "../corro-admin" }
corro-agent = { path = "../corro-agent" }
corro-client = { path = "../corro-client" }
corro-types = { path = "../corro-types" }
corro-api-types = { path = "../corro-api-types" }
crc32fast = { workspace = true }
eyre = { workspace = true }
fallible-iterator = { workspace = true }
Expand Down
127 changes: 74 additions & 53 deletions crates/corrosion/src/command/consul/sync.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use consul_client::{AgentCheck, AgentService, Client};
use corro_api_types::ColumnType;
use corro_client::CorrosionClient;
use corro_types::{api::Statement, config::ConsulConfig};
use metrics::{histogram, increment_counter};
Expand Down Expand Up @@ -35,9 +36,7 @@ pub async fn run<P: AsRef<Path>>(

info!("Setting up corrosion for consul sync");
setup(
&corrosion,
config.extra_services_columns.as_slice(),
config.extra_statements.as_slice(),
&corrosion
)
.await?;

Expand Down Expand Up @@ -118,12 +117,9 @@ pub async fn run<P: AsRef<Path>>(

async fn setup(
corrosion: &CorrosionClient,
extra_services_columns: &[String],
extra_statements: &[String],
) -> eyre::Result<()> {
let mut conn = corrosion.pool().get().await?;
{
let mut conn = corrosion.pool().get().await?;

let tx = conn.transaction()?;

info!("Creating internal tables");
Expand All @@ -143,51 +139,51 @@ async fn setup(
tx.commit()?;
}
info!("Ensuring schema...");
corrosion
.schema(&build_schema(extra_services_columns, extra_statements))
.await?;
Ok(())
}

fn build_schema(extra_services_columns: &[String], extra_statements: &[String]) -> Vec<Statement> {
let extra = extra_services_columns.join(",");
let mut statements = vec![
Statement::Simple(format!("CREATE TABLE consul_services (
node TEXT NOT NULL,
id TEXT NOT NULL,
name TEXT NOT NULL DEFAULT '',
tags TEXT NOT NULL DEFAULT '[]',
meta TEXT NOT NULL DEFAULT '{{}}',
port INTEGER NOT NULL DEFAULT 0,
address TEXT NOT NULL DEFAULT '',
updated_at INTEGER NOT NULL DEFAULT 0,
{}
PRIMARY KEY (node, id)
) WITHOUT ROWID;", if extra.is_empty() { String::new() } else {format!("{extra},")})),
Statement::Simple("CREATE INDEX consul_services_node_id_updated_at ON consul_services (node, id, updated_at);".to_string()),

Statement::Simple("CREATE TABLE consul_checks (
node TEXT NOT NULL,
id TEXT NOT NULL,
service_id TEXT NOT NULL DEFAULT '',
service_name TEXT NOT NULL DEFAULT '',
name TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT '',
output TEXT NOT NULL DEFAULT '',
updated_at INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (node, id)
) WITHOUT ROWID;".to_string()),
Statement::Simple("CREATE INDEX consul_checks_node_id_updated_at ON consul_checks (node, id, updated_at);".to_string()),
Statement::Simple("CREATE INDEX consul_checks_node_service_id ON consul_checks (node, service_id);".to_string()),
struct ColumnInfo {
name: String,
kind: corro_api_types::ColumnType
}

let col_infos: Vec<ColumnInfo> = conn.prepare("PRAGMA table_info(consul_services)")?.query_map([], |row| Ok(ColumnInfo { name: row.get(1)?, kind: row.get(2)? })).map_err(|e| eyre::eyre!("could not query consul_services' table_info: {e}"))?.collect::<Result<Vec<_>, _>>()?;

let expected_cols = [
("node", ColumnType::Text),
("id", ColumnType::Text),
("name", ColumnType::Text),
("tags", ColumnType::Text),
("meta", ColumnType::Text),
("port", ColumnType::Integer),
("address", ColumnType::Text),
("updated_at", ColumnType::Integer),
];

for (name, kind) in expected_cols {
if col_infos.iter().find(|info| info.name == name && info.kind == kind ).is_none() {
eyre::bail!("expected a column consul_services.{name} w/ type {kind:?}");
}
}

let col_infos: Vec<ColumnInfo> = conn.prepare("PRAGMA table_info(consul_checks)")?.query_map([], |row| Ok(ColumnInfo { name: row.get(1)?, kind: row.get(2)? })).map_err(|e| eyre::eyre!("could not query consul_checks' table_info: {e}"))?.collect::<Result<Vec<_>, _>>()?;

let expected_cols = [
("node", ColumnType::Text),
("id", ColumnType::Text),
("service_id", ColumnType::Text),
("service_name", ColumnType::Text),
("name", ColumnType::Text),
("status", ColumnType::Text),
("output", ColumnType::Text),
("updated_at", ColumnType::Integer),
];

for s in extra_statements {
statements.push(Statement::Simple(s.clone()));
for (name, kind) in expected_cols {
if col_infos.iter().find(|info| info.name == name && info.kind == kind ).is_none() {
eyre::bail!("expected a column consul_checks.{name} w/ type {kind:?}");
}
}

statements
Ok(())
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -582,10 +578,39 @@ mod tests {
_ = tracing_subscriber::fmt::try_init();
let (tripwire, tripwire_worker, tripwire_tx) = Tripwire::new_simple();

let ta1 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?;
let tmpdir = tempfile::TempDir::new()?;
tokio::fs::write(tmpdir.path().join("consul.sql"), b"
CREATE TABLE consul_services (
node TEXT NOT NULL,
id TEXT NOT NULL,
name TEXT NOT NULL DEFAULT '',
tags TEXT NOT NULL DEFAULT '[]',
meta TEXT NOT NULL DEFAULT '{}',
port INTEGER NOT NULL DEFAULT 0,
address TEXT NOT NULL DEFAULT '',
updated_at INTEGER NOT NULL DEFAULT 0,
app_id INTEGER AS (CAST(JSON_EXTRACT(meta, '$.app_id') AS INTEGER)),
PRIMARY KEY (node, id)
);
CREATE TABLE consul_checks (
node TEXT NOT NULL,
id TEXT NOT NULL,
service_id TEXT NOT NULL DEFAULT '',
service_name TEXT NOT NULL DEFAULT '',
name TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT '',
output TEXT NOT NULL DEFAULT '',
updated_at INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (node, id)
);
").await?;

let ta1 = launch_test_agent(|conf| conf.add_schema_path(tmpdir.path().display().to_string()).build(), tripwire.clone()).await?;
let ta2 = launch_test_agent(
|conf| {
conf.bootstrap(vec![ta1.agent.gossip_addr().to_string()])
conf.bootstrap(vec![ta1.agent.gossip_addr().to_string()]).add_schema_path(tmpdir.path().display().to_string())
.build()
},
tripwire.clone(),
Expand All @@ -596,8 +621,6 @@ mod tests {

setup(
&ta1_client,
&["app_id INTEGER AS (CAST (JSON_EXTRACT (meta, '$.app_id') AS INTEGER))".into()],
&[],
)
.await?;

Expand Down Expand Up @@ -653,8 +676,6 @@ mod tests {

setup(
&ta2_client,
&["app_id INTEGER AS (CAST (JSON_EXTRACT (meta, '$.app_id') AS INTEGER))".into()],
&[],
)
.await?;

Expand Down

0 comments on commit c265cf9

Please sign in to comment.