Skip to content

Commit

Permalink
Server-initiated mutations
Browse files Browse the repository at this point in the history
Fixes #42
  • Loading branch information
carlsverre committed Jan 6, 2024
1 parent 649caaf commit 634cd75
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 141 deletions.
1 change: 0 additions & 1 deletion lib/sqlsync-worker/sqlsync-wasm/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ impl_from_error!(
io::Error,
sqlsync::error::Error,
sqlsync::sqlite::Error,
sqlsync::JournalError,
sqlsync::replication::ReplicationError,
sqlsync::JournalIdParseError,
sqlsync::ReducerError,
Expand Down
125 changes: 95 additions & 30 deletions lib/sqlsync/examples/end-to-end-local-net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use sqlsync::local::NoopSignal;
use sqlsync::replication::ReplicationMsg;
use sqlsync::replication::ReplicationProtocol;
use sqlsync::JournalId;
use sqlsync::Lsn;
use sqlsync::MemoryJournalFactory;
use sqlsync::Reducer;

Expand Down Expand Up @@ -141,6 +140,8 @@ fn handle_client(

let mut num_steps = 0;

let mut remaining_direct_mutations = 5;

loop {
let msg = receive_msg(&mut socket_reader)?;
log::info!("server: received {:?}", msg);
Expand All @@ -157,6 +158,31 @@ fn handle_client(
log::info!("server: stepping doc (steps: {})", num_steps);
unlock!(|doc| doc.step()?);

// trigger a direct increment on the server side after every message
if remaining_direct_mutations > 0 {
remaining_direct_mutations -= 1;
unlock!(|doc| {
log::info!("server: running a direct mutation on the doc");
doc.mutate_direct(|tx| {
match tx.execute(
"INSERT INTO counter (id, value) VALUES (1, 0)
ON CONFLICT (id) DO UPDATE SET value = value + 1",
[],
) {
Ok(_) => Ok::<_, anyhow::Error>(()),
// ignore missing table error
Err(rusqlite::Error::SqliteFailure(_, Some(msg)))
if msg == "no such table: counter" =>
{
log::info!("server: skipping direct mutation");
Ok(())
}
Err(err) => Err(err)?,
}
})?;
});
}

// sync back to the client if needed
unlock!(|doc| {
if let Some((msg, mut reader)) = protocol.sync(doc)? {
Expand Down Expand Up @@ -219,7 +245,16 @@ fn start_client(
let total_mutations = 10 as usize;
let mut remaining_mutations = total_mutations;

// the total number of sync attempts we will make
let total_syncs = 100 as usize;
let mut syncs = 0;

loop {
syncs += 1;
if syncs > total_syncs {
panic!("client({}): too many syncs", timeline_id);
}

let msg = receive_msg(&mut socket_reader)?;
log::info!("client({}): received {:?}", timeline_id, msg);

Expand Down Expand Up @@ -248,25 +283,31 @@ fn start_client(
}

log::info!("client({}): QUERYING STATE", timeline_id);
doc.query(|conn| {
conn.query_row("select value from counter", [], |row| {
let value: Option<i32> = row.get(0)?;
log::info!(
"client({}): counter value: {:?}",
timeline_id,
value
);
Ok(())
})?;

Ok::<_, anyhow::Error>(())
let current_value = doc.query(|conn| {
let value = conn.query_row(
"select value from counter where id = 0",
[],
|row| {
let value: Option<usize> = row.get(0)?;
log::info!(
"client({}): counter value: {:?}",
timeline_id,
value
);
Ok(value)
},
)?;

Ok::<_, anyhow::Error>(value)
})?;

if let Some(lsn) = doc.storage_lsn() {
// once the storage has reached (total_mutations+1) * num_clients
// then we have reached the end
log::info!("client({}): storage lsn: {}", timeline_id, lsn);
if lsn >= ((total_mutations * num_clients) + 1) as Lsn {
if let Some(value) = current_value {
log::info!(
"client({}): storage lsn: {:?}",
timeline_id,
doc.storage_lsn()
);
if value == (total_mutations * num_clients) {
break;
}
}
Expand All @@ -279,23 +320,47 @@ fn start_client(

// final query, value should be total_mutations * num_clients
doc.query(|conn| {
conn.query_row_and_then("select value from counter", [], |row| {
let value: Option<usize> = row.get(0)?;
log::info!(
"client({}): FINAL counter value: {:?}",
timeline_id,
value
);
if value != Some(total_mutations * num_clients) {
return Err(anyhow::anyhow!(
conn.query_row_and_then(
"select value from counter where id = 0",
[],
|row| {
let value: Option<usize> = row.get(0)?;
log::info!(
"client({}): FINAL counter value: {:?}",
timeline_id,
value
);
if value != Some(total_mutations * num_clients) {
return Err(anyhow::anyhow!(
"client({}): counter value is incorrect: {:?}, expected {}",
timeline_id,
value,
total_mutations * num_clients
));
}
Ok(())
})?;
}
Ok(())
},
)?;
conn.query_row_and_then(
"select value from counter where id = 1",
[],
|row| {
let value: Option<usize> = row.get(0)?;
log::info!(
"client({}): FINAL server counter value: {:?}",
timeline_id,
value
);
if value.is_none() || value == Some(0) {
return Err(anyhow::anyhow!(
"client({}): server counter value is incorrect: {:?}, expected non-zero value",
timeline_id,
value,
));
}
Ok(())
},
)?;
Ok::<_, anyhow::Error>(())
})?;

Expand Down
54 changes: 43 additions & 11 deletions lib/sqlsync/src/coordinator.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::convert::From;
use std::fmt::Debug;
use std::io;

use crate::db::{open_with_vfs, ConnectionPair};
use rusqlite::Transaction;

use crate::db::{open_with_vfs, run_in_tx, ConnectionPair};
use crate::error::Result;
use crate::reducer::Reducer;
use crate::replication::{ReplicationDestination, ReplicationError, ReplicationSource};
use crate::replication::{
ReplicationDestination, ReplicationError, ReplicationSource,
};
use crate::timeline::{apply_timeline_range, run_timeline_migration};
use crate::Lsn;
use crate::{
journal::{Journal, JournalFactory, JournalId},
lsn::LsnRange,
storage::Storage,
};
use crate::{JournalError, Lsn};

struct ReceiveQueueEntry {
id: JournalId,
Expand Down Expand Up @@ -44,10 +49,11 @@ impl<J: Journal> CoordinatorDocument<J> {
timeline_factory: J::Factory,
reducer_wasm_bytes: &[u8],
) -> Result<Self> {
let (mut sqlite, storage) = open_with_vfs(storage)?;
let (mut sqlite, mut storage) = open_with_vfs(storage)?;

// TODO: this feels awkward here
run_timeline_migration(&mut sqlite.readwrite)?;
storage.commit()?;

Ok(Self {
reducer: Reducer::new(reducer_wasm_bytes)?,
Expand All @@ -62,10 +68,12 @@ impl<J: Journal> CoordinatorDocument<J> {
fn get_or_create_timeline_mut(
&mut self,
id: JournalId,
) -> std::result::Result<&mut J, JournalError> {
) -> io::Result<&mut J> {
match self.timelines.entry(id) {
Entry::Occupied(entry) => Ok(entry.into_mut()),
Entry::Vacant(entry) => Ok(entry.insert(self.timeline_factory.open(id)?)),
Entry::Vacant(entry) => {
Ok(entry.insert(self.timeline_factory.open(id)?))
}
}
}

Expand All @@ -89,12 +97,26 @@ impl<J: Journal> CoordinatorDocument<J> {
}
}

pub fn mutate_direct<F, E>(&mut self, f: F) -> Result<(), E>
where
F: FnOnce(&mut Transaction) -> Result<(), E>,
E: From<rusqlite::Error> + From<io::Error>,
{
run_in_tx(&mut self.sqlite.readwrite, f)?;
self.storage.commit()?;
Ok(())
}

pub fn step(&mut self) -> Result<()> {
// check to see if we have anything in the receive queue
let entry = self.timeline_receive_queue.pop_front();

if let Some(entry) = entry {
log::debug!("applying range {} to timeline {}", entry.range, entry.id);
log::debug!(
"applying range {} to timeline {}",
entry.range,
entry.id
);

// get the timeline
let timeline = self
Expand All @@ -119,7 +141,9 @@ impl<J: Journal> CoordinatorDocument<J> {
}

/// CoordinatorDocument knows how to replicate it's storage journal
impl<J: Journal + ReplicationSource> ReplicationSource for CoordinatorDocument<J> {
impl<J: Journal + ReplicationSource> ReplicationSource
for CoordinatorDocument<J>
{
type Reader<'a> = <J as ReplicationSource>::Reader<'a>
where
Self: 'a;
Expand All @@ -132,14 +156,22 @@ impl<J: Journal + ReplicationSource> ReplicationSource for CoordinatorDocument<J
self.storage.source_range()
}

fn read_lsn<'a>(&'a self, lsn: crate::Lsn) -> io::Result<Option<Self::Reader<'a>>> {
fn read_lsn<'a>(
&'a self,
lsn: crate::Lsn,
) -> io::Result<Option<Self::Reader<'a>>> {
self.storage.read_lsn(lsn)
}
}

/// CoordinatorDocument knows how to receive timeline journals from elsewhere
impl<J: Journal + ReplicationDestination> ReplicationDestination for CoordinatorDocument<J> {
fn range(&mut self, id: JournalId) -> std::result::Result<LsnRange, ReplicationError> {
impl<J: Journal + ReplicationDestination> ReplicationDestination
for CoordinatorDocument<J>
{
fn range(
&mut self,
id: JournalId,
) -> std::result::Result<LsnRange, ReplicationError> {
let timeline = self.get_or_create_timeline_mut(id)?;
ReplicationDestination::range(timeline, id)
}
Expand Down
19 changes: 15 additions & 4 deletions lib/sqlsync/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::convert::From;

use rusqlite::{
hooks::{AuthAction, AuthContext, Authorization},
Connection, OpenFlags,
Connection, OpenFlags, Transaction,
};
use sqlite_vfs::FilePtr;

Expand All @@ -13,11 +15,9 @@ pub struct ConnectionPair {
pub readonly: Connection,
}

type Result<T> = std::result::Result<T, rusqlite::Error>;

pub fn open_with_vfs<J: Journal>(
journal: J,
) -> Result<(ConnectionPair, Box<Storage<J>>)> {
) -> rusqlite::Result<(ConnectionPair, Box<Storage<J>>)> {
let mut storage = Box::new(Storage::new(journal));
let storage_ptr = FilePtr::new(&mut storage);

Expand Down Expand Up @@ -68,3 +68,14 @@ pub fn open_with_vfs<J: Journal>(
storage,
))
}

pub fn run_in_tx<F, E>(sqlite: &mut Connection, f: F) -> Result<(), E>
where
F: FnOnce(&mut Transaction) -> Result<(), E>,
E: From<rusqlite::Error>,
{
let mut txn = sqlite.transaction()?;
f(&mut txn)?; // will cause a rollback on failure
txn.commit()?;
Ok(())
}
14 changes: 8 additions & 6 deletions lib/sqlsync/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use std::io;

use thiserror::Error;

use crate::{
reducer::ReducerError, replication::ReplicationError, timeline::TimelineError, JournalError,
JournalIdParseError,
reducer::ReducerError, replication::ReplicationError,
timeline::TimelineError, JournalIdParseError,
};

#[derive(Error, Debug)]
pub enum Error {
#[error(transparent)]
ReplicationError(#[from] ReplicationError),

#[error(transparent)]
JournalError(#[from] JournalError),

#[error(transparent)]
JournalIdParseError(#[from] JournalIdParseError),

Expand All @@ -24,6 +23,9 @@ pub enum Error {

#[error(transparent)]
SqliteError(#[from] rusqlite::Error),

#[error("io error: {0}")]
IoError(#[from] io::Error),
}

pub type Result<T> = std::result::Result<T, Error>;
pub type Result<T, E = Error> = std::result::Result<T, E>;
Loading

0 comments on commit 634cd75

Please sign in to comment.