Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow destructive schema changes, when flag is provided #222

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion crates/corro-agent/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
time::{Duration, Instant},
};

use axum::Extension;
use axum::{extract::Query, Extension};
use futures::{future, stream::FuturesUnordered, StreamExt, TryStreamExt};
use hyper::StatusCode;
use rand::{
Expand Down Expand Up @@ -783,6 +783,7 @@ async fn test_process_multiple_changes() -> eyre::Result<()> {
// setup the schema, for both nodes
let (status_code, _body) = api_v1_db_schema(
Extension(ta1.agent.clone()),
Query(Default::default()),
axum::Json(vec![corro_tests::TEST_SCHEMA.into()]),
)
.await;
Expand All @@ -791,6 +792,7 @@ async fn test_process_multiple_changes() -> eyre::Result<()> {

let (status_code, _body) = api_v1_db_schema(
Extension(ta2.agent.clone()),
Query(Default::default()),
axum::Json(vec![corro_tests::TEST_SCHEMA.into()]),
)
.await;
Expand Down Expand Up @@ -1912,6 +1914,7 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {
// setup the schema, for both nodes
let (status_code, _body) = api_v1_db_schema(
Extension(ta1.agent.clone()),
Query(Default::default()),
axum::Json(vec![corro_tests::TEST_SCHEMA.into()]),
)
.await;
Expand All @@ -1920,6 +1923,7 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {

let (status_code, _body) = api_v1_db_schema(
Extension(ta2.agent.clone()),
Query(Default::default()),
axum::Json(vec![corro_tests::TEST_SCHEMA.into()]),
)
.await;
Expand Down
10 changes: 7 additions & 3 deletions crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1605,7 +1605,7 @@ pub async fn serve_sync(

#[cfg(test)]
mod tests {
use axum::{Extension, Json};
use axum::{extract::Query, Extension, Json};
use camino::Utf8PathBuf;
use corro_tests::TEST_SCHEMA;
use corro_types::{
Expand Down Expand Up @@ -1645,8 +1645,12 @@ mod tests {
)
.await?;

let (status_code, _res) =
api_v1_db_schema(Extension(agent.clone()), Json(vec![TEST_SCHEMA.to_owned()])).await;
let (status_code, _res) = api_v1_db_schema(
Extension(agent.clone()),
Query(Default::default()),
Json(vec![TEST_SCHEMA.to_owned()]),
)
.await;

assert_eq!(status_code, StatusCode::OK);

Expand Down
25 changes: 21 additions & 4 deletions crates/corro-agent/src/api/public/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
time::{Duration, Instant},
};

use axum::{response::IntoResponse, Extension};
use axum::{extract::Query, response::IntoResponse, Extension};
use bytes::{BufMut, BytesMut};
use compact_str::ToCompactString;
use corro_types::{
Expand All @@ -19,6 +19,7 @@ use corro_types::{
};
use hyper::StatusCode;
use rusqlite::{params_from_iter, ToSql, Transaction};
use serde::Deserialize;
use spawn::spawn_counted;
use tokio::{
sync::{
Expand Down Expand Up @@ -440,7 +441,11 @@ pub async fn api_v1_queries(
}
}

async fn execute_schema(agent: &Agent, statements: Vec<String>) -> eyre::Result<()> {
async fn execute_schema(
agent: &Agent,
statements: Vec<String>,
allow_destructive: bool,
) -> eyre::Result<()> {
let new_sql: String = statements.join(";");

let partial_schema = parse_sql(&new_sql)?;
Expand All @@ -465,7 +470,7 @@ async fn execute_schema(agent: &Agent, statements: Vec<String>) -> eyre::Result<
block_in_place(|| {
let tx = conn.immediate_transaction()?;

apply_schema(&tx, &schema_write, &mut new_schema)?;
apply_schema(&tx, &schema_write, &mut new_schema, allow_destructive)?;

for tbl_name in partial_schema.tables.keys() {
tx.execute("DELETE FROM __corro_schema WHERE tbl_name = ?", [tbl_name])?;
Expand All @@ -484,8 +489,15 @@ async fn execute_schema(agent: &Agent, statements: Vec<String>) -> eyre::Result<
Ok(())
}

#[derive(Deserialize, Default)]
pub struct SchemaQuery {
#[serde(default)]
allow_destructive: bool,
}

pub async fn api_v1_db_schema(
Extension(agent): Extension<Agent>,
query: Query<SchemaQuery>,
axum::extract::Json(statements): axum::extract::Json<Vec<String>>,
) -> (StatusCode, axum::Json<ExecResponse>) {
if statements.is_empty() {
Expand All @@ -503,7 +515,7 @@ pub async fn api_v1_db_schema(

let start = Instant::now();

if let Err(e) = execute_schema(&agent, statements).await {
if let Err(e) = execute_schema(&agent, statements, query.0.allow_destructive).await {
error!("could not merge schemas: {e}");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Expand Down Expand Up @@ -644,6 +656,7 @@ mod tests {

let (status_code, _body) = api_v1_db_schema(
Extension(agent.clone()),
Query(Default::default()),
axum::Json(vec![corro_tests::TEST_SCHEMA.into()]),
)
.await;
Expand Down Expand Up @@ -726,6 +739,7 @@ mod tests {

let (status_code, _body) = api_v1_db_schema(
Extension(agent.clone()),
Query(Default::default()),
axum::Json(vec![corro_tests::TEST_SCHEMA.into()]),
)
.await;
Expand Down Expand Up @@ -835,6 +849,7 @@ mod tests {

let (status_code, _body) = api_v1_db_schema(
Extension(agent.clone()),
Query(Default::default()),
axum::Json(vec![
"CREATE TABLE tests (id BIGINT NOT NULL PRIMARY KEY, foo TEXT);".into(),
]),
Expand Down Expand Up @@ -866,6 +881,7 @@ mod tests {

let (status_code, _body) = api_v1_db_schema(
Extension(agent.clone()),
Query(Default::default()),
axum::Json(vec![
"CREATE TABLE tests2 (id BIGINT NOT NULL PRIMARY KEY, foo TEXT);".into(),
"CREATE TABLE tests (id BIGINT NOT NULL PRIMARY KEY, foo TEXT);".into(),
Expand Down Expand Up @@ -939,6 +955,7 @@ mod tests {

let (status_code, _body) = api_v1_db_schema(
Extension(agent.clone()),
Query(Default::default()),
axum::Json(vec![create_stmt.into()]),
)
.await;
Expand Down
2 changes: 2 additions & 0 deletions crates/corro-agent/src/api/public/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,7 @@ async fn forward_bytes_to_body_sender(

#[cfg(test)]
mod tests {
use axum::extract::Query;
use corro_types::{
api::{ChangeId, RowId},
config::Config,
Expand Down Expand Up @@ -901,6 +902,7 @@ mod tests {

let (status_code, _body) = api_v1_db_schema(
Extension(agent.clone()),
Query(Default::default()),
axum::Json(vec![corro_tests::TEST_SCHEMA.into()]),
)
.await;
Expand Down
15 changes: 12 additions & 3 deletions crates/corro-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,18 @@ impl CorrosionApiClient {
Ok(serde_json::from_slice(&bytes)?)
}

pub async fn schema(&self, statements: &[Statement]) -> Result<ExecResponse, Error> {
pub async fn schema(
&self,
statements: &[Statement],
allow_destructive: bool,
) -> Result<ExecResponse, Error> {
let mut url = format!("http://{}/v1/migrations", self.api_addr);
if allow_destructive {
url += "?allow_destructive=true";
}
let req = hyper::Request::builder()
.method(hyper::Method::POST)
.uri(format!("http://{}/v1/migrations", self.api_addr))
.uri(url)
.header(hyper::header::CONTENT_TYPE, "application/json")
.header(hyper::header::ACCEPT, "application/json")
.body(Body::from(serde_json::to_vec(statements)?))?;
Expand All @@ -232,6 +240,7 @@ impl CorrosionApiClient {
pub async fn schema_from_paths<P: AsRef<Path>>(
&self,
schema_paths: &[P],
allow_destructive: bool,
) -> Result<Option<ExecResponse>, Error> {
let mut statements = vec![];

Expand Down Expand Up @@ -312,7 +321,7 @@ impl CorrosionApiClient {
return Ok(None);
}

Ok(Some(self.schema(&statements).await?))
Ok(Some(self.schema(&statements, allow_destructive).await?))
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/corro-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub async fn launch_test_agent<F: FnOnce(ConfigBuilder) -> Result<Config, Config

{
let client = corro_client::CorrosionApiClient::new(agent.api_addr());
client.schema_from_paths(&schema_paths).await?;
client.schema_from_paths(&schema_paths, false).await?;
}

Ok(TestAgent {
Expand Down
2 changes: 1 addition & 1 deletion crates/corro-tpl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ mod tests {
let client = corro_client::CorrosionApiClient::new(ta.agent.api_addr());

client
.schema(&[Statement::Simple(corro_tests::TEST_SCHEMA.into())])
.schema(&[Statement::Simple(corro_tests::TEST_SCHEMA.into())], false)
.await
.unwrap();

Expand Down
6 changes: 3 additions & 3 deletions crates/corro-types/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2389,7 +2389,7 @@ mod tests {
setup_conn(&conn)?;
migrate(&mut conn)?;
let tx = conn.transaction()?;
apply_schema(&tx, &Schema::default(), &mut schema)?;
apply_schema(&tx, &Schema::default(), &mut schema, false)?;
tx.commit()?;
}

Expand Down Expand Up @@ -2510,7 +2510,7 @@ mod tests {
setup_conn(&conn).unwrap();
migrate(&mut conn).unwrap();
let tx = conn.transaction().unwrap();
apply_schema(&tx, &Schema::default(), &mut schema).unwrap();
apply_schema(&tx, &Schema::default(), &mut schema, false).unwrap();
tx.commit().unwrap();
}

Expand Down Expand Up @@ -2549,7 +2549,7 @@ mod tests {

{
let tx = conn2.transaction().unwrap();
apply_schema(&tx, &Schema::default(), &mut schema).unwrap();
apply_schema(&tx, &Schema::default(), &mut schema, false).unwrap();
tx.commit().unwrap();
}

Expand Down
20 changes: 12 additions & 8 deletions crates/corro-types/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ pub fn apply_schema(
tx: &Transaction,
schema: &Schema,
new_schema: &mut Schema,
allow_destructive: bool,
) -> Result<(), ApplySchemaError> {
if let Some(name) = schema
.tables
Expand All @@ -277,10 +278,11 @@ pub fn apply_schema(
.difference(&new_schema.tables.keys().collect::<HashSet<_>>())
.next()
{
// TODO: add options and check flag
return Err(ApplySchemaError::DropTableWithoutDestructiveFlag(
(*name).clone(),
));
if !allow_destructive {
return Err(ApplySchemaError::DropTableWithoutDestructiveFlag(
(*name).clone(),
));
}
}

let mut schema_to_merge = Schema::default();
Expand Down Expand Up @@ -417,10 +419,12 @@ pub fn apply_schema(
debug!("dropped cols: {dropped_cols:?}");

if let Some(col_name) = dropped_cols.into_iter().next() {
return Err(ApplySchemaError::RemoveColumnWithoutDestructiveFlag(
name.clone(),
col_name.clone(),
));
if !allow_destructive {
return Err(ApplySchemaError::RemoveColumnWithoutDestructiveFlag(
name.clone(),
col_name.clone(),
));
}
}

// 2. check for changed columns
Expand Down
2 changes: 1 addition & 1 deletion crates/corrosion/src/command/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub async fn run(config: Config, config_path: &Utf8PathBuf) -> eyre::Result<()>
if !config.db.schema_paths.is_empty() {
let client = corro_client::CorrosionApiClient::new(*config.api.bind_addr.first().unwrap());
match client
.schema_from_paths(config.db.schema_paths.as_slice())
.schema_from_paths(config.db.schema_paths.as_slice(), false)
.await
{
Ok(Some(res)) => {
Expand Down
14 changes: 10 additions & 4 deletions crates/corrosion/src/command/reload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@ use std::{net::SocketAddr, path::Path};
use corro_client::CorrosionApiClient;
use tracing::info;

pub async fn run<P: AsRef<Path>>(api_addr: SocketAddr, schema_paths: &[P]) -> eyre::Result<()> {
pub async fn run<P: AsRef<Path>>(
api_addr: SocketAddr,
schema_paths: &[P],
allow_destructive: bool,
) -> eyre::Result<()> {
let client = CorrosionApiClient::new(api_addr);

client.schema_from_paths(schema_paths).await?;
client
.schema_from_paths(schema_paths, allow_destructive)
.await?;
info!("Successfully reloaded Corrosion's schema from paths!");
Ok(())
}
Expand All @@ -27,7 +33,7 @@ mod tests {

let client = corro_client::CorrosionApiClient::new(ta.agent.api_addr());
client
.schema_from_paths(&ta.agent.config().db.schema_paths)
.schema_from_paths(&ta.agent.config().db.schema_paths, false)
.await?;

let mut conf = ta.agent.config().as_ref().clone();
Expand All @@ -46,7 +52,7 @@ mod tests {

println!("conf: {conf:?}");

run(ta.agent.api_addr(), &conf.db.schema_paths).await?;
run(ta.agent.api_addr(), &conf.db.schema_paths, false).await?;

assert!(ta.agent.schema().read().tables.contains_key("blah"));

Expand Down
14 changes: 11 additions & 3 deletions crates/corrosion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,13 @@ async fn process_cli(cli: Cli) -> eyre::Result<()> {
}
}
}
Command::Reload => {
command::reload::run(cli.api_addr()?, &cli.config()?.db.schema_paths).await?
Command::Reload { allow_destructive } => {
command::reload::run(
cli.api_addr()?,
&cli.config()?.db.schema_paths,
*allow_destructive,
)
.await?
}
Command::Sync(SyncCommand::Generate) => {
let mut conn = AdminConn::connect(cli.admin_path()).await?;
Expand Down Expand Up @@ -659,7 +664,10 @@ enum Command {
},

/// Reload the config
Reload,
Reload {
#[arg(long, default_value = "false")]
allow_destructive: bool,
},

/// Sync-related commands
#[command(subcommand)]
Expand Down
Loading