Skip to content

Commit

Permalink
more tests around dropping items from queue
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed Nov 8, 2024
1 parent 99fb53f commit 69759e6
Showing 1 changed file with 94 additions and 6 deletions.
100 changes: 94 additions & 6 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -918,11 +918,11 @@ pub async fn handle_changes(
}

// drop old items when the queue is full.
if queue.len() > max_queue_len {
let change = queue.pop_back();
if let Some(change) = change {
for v in change.0.versions() {
let _ = seen.remove(&(change.0.actor_id, v));
if queue.len() >= max_queue_len {
let dropped = queue.pop_front();
if let Some(dropped) = dropped {
for v in dropped.0.versions() {
let _ = seen.remove(&(dropped.0.actor_id, v));
}
}

Expand Down Expand Up @@ -1082,11 +1082,18 @@ pub async fn handle_sync(

#[cfg(test)]
mod tests {
use crate::agent::setup;
use crate::api::public::api_v1_db_schema;

use super::*;
use axum::{http::StatusCode, Extension, Json};
use corro_tests::TEST_SCHEMA;
use corro_types::api::{Change, ColumnName, TableName};
use corro_types::{base::CrsqlDbVersion, base::Version, config::Config, pubsub::pack_columns};
use rusqlite::Connection;
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::time::timeout;
use tokio::time::{timeout, Duration};

#[test]
fn ensure_truncate_works() -> eyre::Result<()> {
Expand All @@ -1105,6 +1112,87 @@ mod tests {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_loadshed_handle_changes() -> eyre::Result<()> {
_ = tracing_subscriber::fmt::try_init();
let (tripwire, _tripwire_worker, _tripwire_tx) = Tripwire::new_simple();
let dir = tempfile::tempdir()?;

let mut config = Config::builder()
.db_path(dir.path().join("corrosion.db").display().to_string())
.gossip_addr("127.0.0.1:0".parse()?)
.api_addr("127.0.0.1:0".parse()?)
.build()?;
config.perf.apply_queue_len = 1;
config.perf.processing_queue_len = 3;

let (agent, agent_options) = setup(config, tripwire.clone()).await?;

let (status_code, _res) =
api_v1_db_schema(Extension(agent.clone()), Json(vec![TEST_SCHEMA.to_owned()])).await;
assert_eq!(status_code, StatusCode::OK);

let other_actor = ActorId(uuid::Uuid::new_v4());
let bookie = Bookie::new(Default::default());
tokio::spawn(handle_changes(
agent.clone(),
bookie.clone(),
agent_options.rx_changes,
tripwire,
));

{
// hold write connection so that max_concurrency is reached
let _conn = agent.pool().write_normal().await?;

// queue size is very small - only three changes
// 10-6 are stuck proecessing because we hold the write conn
// next two versions, 3-5, enter the queue
// last version 2-1, displace 4 and 5 from the queue and
// they never get processed
for i in (1i64..=10i64).rev() {
let crsql_row = Change {
table: TableName("tests".into()),
pk: pack_columns(&vec![i.into()])?,
cid: ColumnName("text".into()),
val: "two override".into(),
col_version: 1,
db_version: CrsqlDbVersion(4),
seq: CrsqlSeq(0),
site_id: agent.actor_id().to_bytes(),
cl: 1,
};

let change = (
ChangeV1 {
actor_id: other_actor,
changeset: Changeset::Full {
version: Version(i as u64),
changes: vec![crsql_row.clone()],
seqs: CrsqlSeq(0)..=CrsqlSeq(0),
last_seq: CrsqlSeq(0),
ts: agent.clock().new_timestamp().into(),
},
},
ChangeSource::Sync,
);

agent.tx_changes().send(change).await?;
}
}

sleep(Duration::from_secs(2)).await;

let bookie = bookie.read("read booked").await;
let booked = bookie.get(&other_actor).unwrap().read("test").await;
assert!(booked.contains_all(Version(6)..=Version(10), None));
assert!(booked.contains_all(Version(1)..=Version(3), None));
assert!(!booked.contains_version(&Version(5)));
assert!(!booked.contains_version(&Version(4)));

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn ensure_vacuum_works() -> eyre::Result<()> {
let tmpdir = tempfile::tempdir()?;
Expand Down

0 comments on commit 69759e6

Please sign in to comment.