Skip to content

Commit

Permalink
Implemet cheaper update notifications in corrosion
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed Oct 29, 2024
1 parent 6fb772b commit 95188b5
Show file tree
Hide file tree
Showing 12 changed files with 1,151 additions and 240 deletions.
1 change: 1 addition & 0 deletions crates/corro-admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use corro_types::{
broadcast::{FocaCmd, FocaInput, Timestamp},
sqlite::SqlitePoolError,
sync::generate_sync,
updates::Handle,
};
use futures::{SinkExt, TryStreamExt};
use rusqlite::{named_params, params, OptionalExtension};
Expand Down
3 changes: 3 additions & 0 deletions crates/corro-agent/src/agent/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use corro_types::{
schema::{init_schema, Schema},
sqlite::CrConn,
};
use corro_types::updates::UpdatesManager;

/// Runtime state for the Corrosion agent
pub struct AgentOptions {
Expand Down Expand Up @@ -106,6 +107,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age

let subs_manager = SubsManager::default();

let updates_manager = UpdatesManager::default();
// Setup subscription handlers
let subs_bcast_cache = setup_spawn_subscriptions(
&subs_manager,
Expand Down Expand Up @@ -210,6 +212,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
schema: RwLock::new(schema),
cluster_id,
subs_manager,
updates_manager,
tripwire,
});

Expand Down
40 changes: 29 additions & 11 deletions crates/corro-agent/src/agent/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use corro_types::{
channel::CorroReceiver,
config::AuthzConfig,
pubsub::SubsManager,
updates::{match_changes, match_changes_from_db_version},
};
use std::{
cmp,
Expand All @@ -36,6 +37,7 @@ use std::{
time::{Duration, Instant},
};

use crate::api::public::update::api_v1_updates;
use axum::{
error_handling::HandleErrorLayer,
extract::DefaultBodyLimit,
Expand Down Expand Up @@ -216,6 +218,20 @@ pub async fn setup_http_api_handler(
.layer(ConcurrencyLimitLayer::new(128)),
),
)
.route(
"/v1/updates/:table",
post(api_v1_updates).route_layer(
tower::ServiceBuilder::new()
.layer(HandleErrorLayer::new(|_error: BoxError| async {
Ok::<_, Infallible>((
StatusCode::SERVICE_UNAVAILABLE,
"max concurrency limit reached".to_string(),
))
}))
.layer(LoadShedLayer::new())
.layer(ConcurrencyLimitLayer::new(128)),
),
)
.route(
"/v1/subscriptions/:id",
get(api_v1_sub_by_id).route_layer(
Expand Down Expand Up @@ -705,11 +721,16 @@ pub async fn process_fully_buffered_changes(
if let Some(db_version) = db_version {
let conn = agent.pool().read().await?;
block_in_place(|| {
if let Err(e) = agent
.subs_manager()
.match_changes_from_db_version(&conn, db_version)
if let Err(e) = match_changes_from_db_version(agent.subs_manager(), &conn, db_version) {
error!(%db_version, "could not match changes for subs from db version: {e}");
}
});

block_in_place(|| {
if let Err(e) =
match_changes_from_db_version(agent.updates_manager(), &conn, db_version)
{
error!(%db_version, "could not match changes from db version: {e}");
error!(%db_version, "could not match changes for updates from db version: {e}");
}
});
}
Expand Down Expand Up @@ -969,7 +990,6 @@ pub async fn process_multiple_changes(
.snapshot()
};


snap.update_cleared_ts(&tx, ts)
.map_err(|source| ChangeError::Rusqlite {
source,
Expand All @@ -988,12 +1008,11 @@ pub async fn process_multiple_changes(

if let Some(ts) = last_cleared {
let mut booked_writer = agent
.booked()
.blocking_write("process_multiple_changes(update_cleared_ts)");
.booked()
.blocking_write("process_multiple_changes(update_cleared_ts)");
booked_writer.update_cleared_ts(ts);
}


for (_, changeset, _, _) in changesets.iter() {
if let Some(ts) = changeset.ts() {
let dur = (agent.clock().new_timestamp().get_time() - ts.0).to_duration();
Expand Down Expand Up @@ -1050,9 +1069,8 @@ pub async fn process_multiple_changes(

for (_actor_id, changeset, db_version, _src) in changesets {
change_chunk_size += changeset.changes().len();
agent
.subs_manager()
.match_changes(changeset.changes(), db_version);
match_changes(agent.subs_manager(), changeset.changes(), db_version);
match_changes(agent.updates_manager(), changeset.changes(), db_version);
}

histogram!("corro.agent.changes.processing.time.seconds").record(start.elapsed());
Expand Down
2 changes: 2 additions & 0 deletions crates/corro-agent/src/api/public/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use corro_types::broadcast::broadcast_changes;

pub mod pubsub;

pub mod update;

pub async fn make_broadcastable_changes<F, T>(
agent: &Agent,
f: F,
Expand Down
Loading

0 comments on commit 95188b5

Please sign in to comment.