Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapter: Fix views synchronizer bugs #1249

Open
wants to merge 1 commit into
base: Id1465b9c660965c898932c783f5e47064a50a4b7
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1,000 changes: 1,000 additions & 0 deletions logictests/generated/mysql/queries

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions readyset-adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ quanta = { version = "0.11", default-features = false }
lru = "0.12.0"
crossbeam-skiplist = "0.1.1"
slab = "0.4"
xxhash-rust = { version = "0.8.10", features = ["xxh3"] }

readyset-adapter-types = { path = "../readyset-adapter-types/" }
readyset-alloc = { path = "../readyset-alloc/" }
Expand Down Expand Up @@ -81,6 +82,10 @@ path = "src/lib.rs"
name = "parse"
harness = false

[[bench]]
name = "hash"
harness = false

[features]
ryw = []
failure_injection = ["fail/failpoints"]
1,059 changes: 1,059 additions & 0 deletions readyset-adapter/benches/hash.rs

Large diffs are not rendered by default.

12 changes: 11 additions & 1 deletion readyset-adapter/src/migration_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,18 @@ impl MigrationHandler {
match controller.dry_run(changelist).await {
Ok(_) => {
self.start_time.remove(view_request);

// It's possible that the ViewsSynchronizer found an existing view for this query
// on the server while we were performing a dry run, in which case it would have
// updated the query's status to "successful". In this situation, we don't want to
// overwrite the "successful" status, so we only write the new "dry run succeeded"
// status if the query's status is still "pending"
self.query_status_cache
.update_query_migration_state(view_request, MigrationState::DryRunSucceeded);
.with_mut_migration_state(view_request, |status| {
if status.is_pending() {
*status = MigrationState::DryRunSucceeded;
}
});
}
Err(e) if e.caused_by_unsupported() => {
self.start_time.remove(view_request);
Expand Down
34 changes: 34 additions & 0 deletions readyset-adapter/src/query_status_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,24 @@ impl QueryStatusCache {
}
}

/// Yields to the given function `f` a mutable reference to the migration state of the query
/// `q`. The primary purpose of this method is allow for atomic reads and writes of the
/// migration state of a query.
pub fn with_mut_migration_state<Q, F>(&self, q: &Q, f: F) -> bool
where
Q: QueryStatusKey,
F: Fn(&mut MigrationState),
{
q.with_mut_status(self, |maybe_query_status| {
if let Some(query_status) = maybe_query_status {
f(&mut query_status.migration_state);
true
} else {
false
}
})
}

/// Marks a query as dropped by the user.
///
/// NOTE: this should only be called after we successfully remove a View for this query. This is
Expand Down Expand Up @@ -785,6 +803,22 @@ impl QueryStatusCache {
.into()
}

/// Returns a list of queries whose migration states match `states`.
pub fn queries_with_statuses(&self, states: &[MigrationState]) -> QueryList {
let statuses = self.persistent_handle.statuses.read();
statuses
.iter()
.filter_map(|(_query_id, (query, status))| {
if states.contains(&status.migration_state) {
Some((query.clone(), status.clone()))
} else {
None
}
})
.collect::<Vec<(Query, QueryStatus)>>()
.into()
}

/// Returns a list of queries that have a state of [`QueryState::Successful`].
pub fn allow_list(&self) -> Vec<(QueryId, Arc<ViewCreateRequest>, QueryStatus)> {
self.persistent_handle.allow_list()
Expand Down
37 changes: 29 additions & 8 deletions readyset-adapter/src/views_synchronizer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::sync::Arc;

use dataflow_expression::Dialect;
Expand All @@ -8,6 +9,7 @@ use readyset_util::shared_cache::LocalCache;
use readyset_util::shutdown::ShutdownReceiver;
use tokio::select;
use tracing::{debug, info, instrument, trace, warn};
use xxhash_rust::xxh3;

use crate::query_status_cache::QueryStatusCache;

Expand All @@ -22,6 +24,13 @@ pub struct ViewsSynchronizer {
dialect: Dialect,
/// Global and thread-local cache of view endpoints and prepared statements.
view_name_cache: LocalCache<ViewCreateRequest, Relation>,
/// A cache to keep track of the queries for which we've already checked the server for
/// existing views. Note that this cache is *not* updated (i.e. a query is not removed) when a
/// "dry run succeeded" query is migrated.
///
/// This HashSet stores 128-bit hashes computed via xxHash in an attempt to minimize the amount
/// of data we need to store to keep track of the queries we've already seen.
views_checked: HashSet<u128>,
}

impl ViewsSynchronizer {
Expand All @@ -38,6 +47,7 @@ impl ViewsSynchronizer {
poll_interval,
dialect,
view_name_cache,
views_checked: HashSet::new(),
}
}

Expand Down Expand Up @@ -69,24 +79,34 @@ impl ViewsSynchronizer {

async fn poll(&mut self) {
debug!("Views synchronizer polling");
let queries = self
let (queries, hashes): (Vec<_>, Vec<_>) = self
.query_status_cache
.pending_migration()
.queries_with_statuses(&[MigrationState::DryRunSucceeded, MigrationState::Pending])
.into_iter()
.filter_map(|(q, _)| {
q.into_parsed()
// once arc_unwrap_or_clone is stabilized, we can use that cleaner syntax
.map(|p| Arc::try_unwrap(p).unwrap_or_else(|arc| (*arc).clone()))
q.into_parsed().and_then(|p| {
let hash = xxh3::xxh3_128(&bincode::serialize(&*p).unwrap());

if self.views_checked.contains(&hash) {
// once arc_unwrap_or_clone is stabilized, we can use that cleaner syntax
Some((
Arc::try_unwrap(p).unwrap_or_else(|arc| (*arc).clone()),
hash,
))
} else {
None
}
})
})
.collect::<Vec<_>>();
.unzip();

match self
.controller
.view_names(queries.clone(), self.dialect)
.await
{
Ok(statuses) => {
for (query, name) in queries.into_iter().zip(statuses) {
for ((query, name), hash) in queries.into_iter().zip(statuses).zip(hashes) {
trace!(
// FIXME(REA-2168): Use correct dialect.
query = %query.statement.display(nom_sql::Dialect::MySQL),
Expand All @@ -96,7 +116,8 @@ impl ViewsSynchronizer {
if let Some(name) = name {
self.view_name_cache.insert(query.clone(), name).await;
self.query_status_cache
.update_query_migration_state(&query, MigrationState::Successful)
.update_query_migration_state(&query, MigrationState::Successful);
self.views_checked.insert(hash);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion readyset-clustertest/src/readyset_mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1599,7 +1599,7 @@ async fn views_synchronize_between_deployments() {

// Eventually it should show up in adapter 1 too
eventually! {
adapter_1.as_mysql_conn().unwrap().query_drop("SELECT * FROM t1;");
adapter_1.as_mysql_conn().unwrap().query_drop("SELECT * FROM t1;").await.unwrap();
last_statement_destination(adapter_1.as_mysql_conn().unwrap()).await == QueryDestination::Readyset
}

Expand Down