Skip to content

Commit

Permalink
agent: pass SharedUpdateBroadcastCache to /updates handler
Browse files Browse the repository at this point in the history
It's required by the handler but not getting passed to Axum's middleware
stack.
  • Loading branch information
pborzenkov committed Nov 20, 2024
1 parent 95188b5 commit d2abe51
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 2 deletions.
2 changes: 2 additions & 0 deletions crates/corro-agent/src/agent/run_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul
rx_foca,
subs_manager,
subs_bcast_cache,
updates_bcast_cache,
rtt_rx,
} = opts;

Expand Down Expand Up @@ -96,6 +97,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul
&agent,
&tripwire,
subs_bcast_cache,
updates_bcast_cache,
&subs_manager,
api_listeners,
)
Expand Down
11 changes: 9 additions & 2 deletions crates/corro-agent/src/agent/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ use tripwire::Tripwire;
use crate::{
api::{
peer::gossip_server_endpoint,
public::pubsub::{process_sub_channel, MatcherBroadcastCache, SharedMatcherBroadcastCache},
public::{
pubsub::{process_sub_channel, MatcherBroadcastCache, SharedMatcherBroadcastCache},
update::SharedUpdateBroadcastCache,
},
},
transport::Transport,
};
use corro_types::updates::UpdatesManager;
use corro_types::{
actor::ActorId,
agent::{migrate, Agent, AgentConfig, Booked, BookedVersions, LockRegistry, SplitPool},
Expand All @@ -41,7 +45,6 @@ use corro_types::{
schema::{init_schema, Schema},
sqlite::CrConn,
};
use corro_types::updates::UpdatesManager;

/// Runtime state for the Corrosion agent
pub struct AgentOptions {
Expand All @@ -58,6 +61,7 @@ pub struct AgentOptions {
pub rtt_rx: TokioReceiver<(SocketAddr, Duration)>,
pub subs_manager: SubsManager,
pub subs_bcast_cache: SharedMatcherBroadcastCache,
pub updates_bcast_cache: SharedUpdateBroadcastCache,
pub tripwire: Tripwire,
}

Expand Down Expand Up @@ -118,6 +122,8 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
)
.await?;

let updates_bcast_cache = SharedUpdateBroadcastCache::default();

let cluster_id = {
let conn = pool.read().await?;
conn.query_row(
Expand Down Expand Up @@ -189,6 +195,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
rtt_rx,
subs_manager: subs_manager.clone(),
subs_bcast_cache,
updates_bcast_cache,
tripwire: tripwire.clone(),
};

Expand Down
3 changes: 3 additions & 0 deletions crates/corro-agent/src/agent/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
api::public::{
api_v1_db_schema, api_v1_queries, api_v1_table_stats, api_v1_transactions,
pubsub::{api_v1_sub_by_id, api_v1_subs},
update::SharedUpdateBroadcastCache,
},
transport::Transport,
};
Expand Down Expand Up @@ -170,6 +171,7 @@ pub async fn setup_http_api_handler(
agent: &Agent,
tripwire: &Tripwire,
subs_bcast_cache: BcastCache,
updates_bcast_cache: SharedUpdateBroadcastCache,
subs_manager: &SubsManager,
api_listeners: Vec<TcpListener>,
) -> eyre::Result<()> {
Expand Down Expand Up @@ -280,6 +282,7 @@ pub async fn setup_http_api_handler(
.layer(Extension(Arc::new(AtomicI64::new(0))))
.layer(Extension(agent.clone()))
.layer(Extension(subs_bcast_cache))
.layer(Extension(updates_bcast_cache))
.layer(Extension(subs_manager.clone()))
.layer(Extension(tripwire.clone())),
)
Expand Down

0 comments on commit d2abe51

Please sign in to comment.