Skip to content

Commit

Permalink
reducer can now handle errors
Browse files Browse the repository at this point in the history
  • Loading branch information
carlsverre committed Dec 1, 2023
1 parent aea5885 commit 874f55e
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 116 deletions.
8 changes: 4 additions & 4 deletions demo/demo-reducer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
created_at TEXT NOT NULL
)"
)
.await;
.await?;
}

Mutation::CreateTask { id, description } => {
Expand All @@ -40,19 +40,19 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
id,
description
)
.await;
.await?;
}

Mutation::DeleteTask { id } => {
execute!("delete from tasks where id = ?", id).await;
execute!("delete from tasks where id = ?", id).await?;
}

Mutation::ToggleCompleted { id } => {
execute!(
"update tasks set completed = not completed where id = ?",
id
)
.await;
.await?;
}
}

Expand Down
17 changes: 10 additions & 7 deletions lib/sqlsync-react/sqlsync-react-test-reducer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {

match mutation {
Mutation::InitSchema => {
futures::join!(
execute!(
"CREATE TABLE IF NOT EXISTS counter (
let create_table = execute!(
"CREATE TABLE IF NOT EXISTS counter (
id INTEGER PRIMARY KEY,
value INTEGER
)"
),
execute!("INSERT OR IGNORE INTO counter (id, value) VALUES (0, 0)")
);
let init_counter = execute!(
"INSERT OR IGNORE INTO counter (id, value) VALUES (0, 0)"
);

create_table.await?;
init_counter.await?;
}

Mutation::Incr { value } => {
Expand All @@ -33,7 +36,7 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
ON CONFLICT (id) DO UPDATE SET value = value + ?",
value
)
.await;
.await?;
}

Mutation::Decr { value } => {
Expand All @@ -42,7 +45,7 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
ON CONFLICT (id) DO UPDATE SET value = value - ?",
value
)
.await;
.await?;
}
}

Expand Down
4 changes: 2 additions & 2 deletions lib/sqlsync-reducer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ serde = { workspace = true, features = ["derive"] }
bincode.workspace = true
futures.workspace = true
log.workspace = true
thiserror.workspace = true

wasmi = { workspace = true, optional = true }
thiserror = { workspace = true, optional = true }

[features]
default = ["guest"]
host = ["wasmi", "thiserror"]
host = ["wasmi"]
guest = []

[dev-dependencies]
Expand Down
18 changes: 11 additions & 7 deletions lib/sqlsync-reducer/examples/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
use sqlsync_reducer::{
host_ffi::{register_log_handler, WasmFFI},
types::{ExecResponse, QueryResponse, Request, SqliteError},
types::{ErrorResponse, ExecResponse, QueryResponse, Request},
};
use wasmi::{Engine, Linker, Module, Store};

Expand Down Expand Up @@ -58,7 +58,7 @@ fn main() -> anyhow::Result<()> {
log::info!("received query request: {} {:?}", sql, params);
let ptr = ffi.encode(
&mut store,
&Ok::<_, SqliteError>(QueryResponse {
&Ok::<_, ErrorResponse>(QueryResponse {
columns: vec!["foo".into(), "bar".into()],
rows: vec![vec!["baz".into(), "qux".into()].into()],
}),
Expand All @@ -70,16 +70,20 @@ fn main() -> anyhow::Result<()> {
if sql == "FAIL" {
let ptr = ffi.encode(
&mut store,
&Err::<ExecResponse, _>(SqliteError {
code: Some(1),
message: "error".to_string(),
}),
&Err::<ExecResponse, _>(
ErrorResponse::SqliteError {
code: 1,
message: "error".to_string(),
},
),
)?;
responses.insert(id, ptr);
} else {
let ptr = ffi.encode(
&mut store,
&Ok::<_, SqliteError>(ExecResponse { changes: 1 }),
&Ok::<_, ErrorResponse>(ExecResponse {
changes: 1,
}),
)?;
responses.insert(id, ptr);
}
Expand Down
9 changes: 5 additions & 4 deletions lib/sqlsync-reducer/src/guest_reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use serde::de::DeserializeOwned;
use crate::{
guest_ffi::{fbm, FFIBufPtr},
types::{
ExecResponse, QueryResponse, ReducerError, Request, RequestId,
Requests, Responses, SqliteError, SqliteValue,
ErrorResponse, ExecResponse, QueryResponse, ReducerError, Request,
RequestId, Requests, Responses, SqliteValue,
},
};

Expand Down Expand Up @@ -107,6 +107,7 @@ impl Reactor {
}
}

#[must_use]
pub struct ResponseFuture<T: DeserializeOwned> {
id: RequestId,
_marker: std::marker::PhantomData<T>,
Expand All @@ -132,7 +133,7 @@ impl<T: DeserializeOwned> Future for ResponseFuture<T> {
pub fn raw_query(
sql: String,
params: Vec<SqliteValue>,
) -> ResponseFuture<Result<QueryResponse, SqliteError>> {
) -> ResponseFuture<Result<QueryResponse, ErrorResponse>> {
let request = Request::Query { sql, params };
let id = reactor().queue_request(request);
ResponseFuture::new(id)
Expand All @@ -141,7 +142,7 @@ pub fn raw_query(
pub fn raw_execute(
sql: String,
params: Vec<SqliteValue>,
) -> ResponseFuture<Result<ExecResponse, SqliteError>> {
) -> ResponseFuture<Result<ExecResponse, ErrorResponse>> {
let request = Request::Exec { sql, params };
let id = reactor().queue_request(request);
ResponseFuture::new(id)
Expand Down
11 changes: 7 additions & 4 deletions lib/sqlsync-reducer/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{

use log::Level;
use serde::{Deserialize, Serialize};
use thiserror::Error;

pub type RequestId = u32;

Expand Down Expand Up @@ -62,10 +63,12 @@ pub struct ExecResponse {
pub changes: usize,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct SqliteError {
pub code: Option<i32>,
pub message: String,
#[derive(Serialize, Deserialize, Debug, Error)]
pub enum ErrorResponse {
#[error("SQLite Error({code}): {message}")]
SqliteError { code: i32, message: String },
#[error("Unknown: {0}")]
Unknown(String),
}

#[derive(Serialize, Deserialize)]
Expand Down
19 changes: 10 additions & 9 deletions lib/sqlsync/examples/counter-reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,32 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
let mutation: Mutation = bincode::deserialize(&mutation)?;
match mutation {
Mutation::InitSchema => {
futures::join!(
execute!(
"CREATE TABLE IF NOT EXISTS counter (
let create_table = execute!(
"CREATE TABLE IF NOT EXISTS counter (
id INTEGER PRIMARY KEY,
value INTEGER
)"
),
execute!(
"INSERT OR IGNORE INTO counter (id, value) VALUES (0, 0)"
)
);
let init_counter = execute!(
"INSERT OR IGNORE INTO counter (id, value) VALUES (0, 0)"
);

create_table.await?;
init_counter.await?;
}
Mutation::Incr => {
execute!(
"INSERT INTO counter (id, value) VALUES (0, 0)
ON CONFLICT (id) DO UPDATE SET value = value + 1"
)
.await;
.await?;
}
Mutation::Decr => {
execute!(
"INSERT INTO counter (id, value) VALUES (0, 0)
ON CONFLICT (id) DO UPDATE SET value = value - 1"
)
.await;
.await?;
}
}

Expand Down
4 changes: 2 additions & 2 deletions lib/sqlsync/examples/hello-reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
key,
value
)
.await;
.await?;
}
Mutation::Delete(key) => {
execute!("DELETE FROM kv WHERE key = ?", key).await;
execute!("DELETE FROM kv WHERE key = ?", key).await?;
}
}

Expand Down
21 changes: 9 additions & 12 deletions lib/sqlsync/examples/task-reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ enum Mutation {
}

async fn query_max_sort() -> Result<f64, ReducerError> {
let response = query!("select max(sort) from tasks").await;
let response = query!("select max(sort) from tasks").await?;
assert!(response.rows.len() == 1, "expected 1 row");
Ok(response.rows[0].maybe_get(0)?.unwrap_or(0.0))
}
Expand All @@ -45,7 +45,7 @@ async fn query_sort_after(id: i64) -> Result<f64, ReducerError> {
",
id
)
.await;
.await?;

if response.rows.len() == 0 {
query_max_sort().await
Expand Down Expand Up @@ -73,7 +73,7 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
created_at TEXT NOT NULL
)"
)
.await;
.await?;
}

Mutation::AppendTask { id, description } => {
Expand All @@ -86,18 +86,14 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
max_sort + 1.,
description
)
.await;
.await?;
}

Mutation::RemoveTask { id } => {
execute!("delete from tasks where id = ?", id).await;
execute!("delete from tasks where id = ?", id).await?;
}

Mutation::UpdateTask {
id,
description,
completed,
} => {
Mutation::UpdateTask { id, description, completed } => {
execute!(
"update tasks set
description = IFNULL(?, description),
Expand All @@ -107,12 +103,13 @@ async fn reducer(mutation: Vec<u8>) -> Result<(), ReducerError> {
description,
completed
)
.await;
.await?;
}

Mutation::MoveTask { id, after } => {
let new_sort = query_sort_after(after).await?;
execute!("update tasks set sort = ? where id = ?", new_sort, id);
execute!("update tasks set sort = ? where id = ?", new_sort, id)
.await?;
}
}

Expand Down
Loading

0 comments on commit 874f55e

Please sign in to comment.