Skip to content

Commit

Permalink
adapter: Fix views synchronizer bugs
Browse files Browse the repository at this point in the history
Previously, the views synchronizer only checked the server for views for
queries that were in the "pending" state. This meant that if the
migration handler set a query's state to "dry run succeeded" before the
views synchronizer had a chance to check the server for a view, the query
would be stuck in the "dry run succeeded" state forever, even if a view
for the query did indeed exist already.

This commit fixes the issue by having the views synchronizer check the
server for views for queries in *either* the "pending" or "dry run
succeeded" states. In order to prevent the views synchronizer from
rechecking every query with status "dry run succeeded" over and over
again, a "cache" has been added to the views synchronizer to keep track
of which queries have already been checked.

While working on this, I also noticed that it was possible for the
following sequence of events to occur:

- Migration handler sees that a query is pending and kicks off a dry run
  migration
- Views synchronizer finds a view on the server for the same query and
  sets the status to "successful"
- Migration handler finishes the dry run migration for the query and
  overwrites the status as "dry run succeeded"

This could lead to a situation where a query that was previously
(correctly) labeled as "successful" is moved back to the "dry run
succeeded" state. To fix the issue, this commit updates the migration
handler to only write the "dry run succeeded" status if the query's
status is still "pending" after the dry run is completed.

Release-Note-Core: Fixed a bug where queries that already had caches
  were sometimes stuck in the `SHOW PROXIED QUERIES` list
Change-Id: Ie5faa100158fc80c906d8ad5cb897d8a02a07be9
  • Loading branch information
ethowitz committed May 13, 2024
1 parent d5c5daf commit f9ceca3
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 4 deletions.
12 changes: 11 additions & 1 deletion readyset-adapter/src/migration_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,18 @@ impl MigrationHandler {
match controller.dry_run(changelist).await {
Ok(_) => {
self.start_time.remove(view_request);

// It's possible that the ViewsSynchronizer found an existing view for this query
// on the server while we were performing a dry run, in which case it would have
// updated the query's status to "successful". In this situation, we don't want to
// overwrite the "successful" status, so we only write the new "dry run succeeded"
// status if the query's status is still "pending"
self.query_status_cache
.update_query_migration_state(view_request, MigrationState::DryRunSucceeded);
.with_mut_migration_state(view_request, |status| {
if status.is_pending() {
*status = MigrationState::DryRunSucceeded;
}
});
}
Err(e) if e.caused_by_unsupported() => {
self.start_time.remove(view_request);
Expand Down
34 changes: 34 additions & 0 deletions readyset-adapter/src/query_status_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,24 @@ impl QueryStatusCache {
}
}

/// Yields to the given function `f` a mutable reference to the migration state of the query
/// `q`. The primary purpose of this method is allow for atomic reads and writes of the
/// migration state of a query.
pub fn with_mut_migration_state<Q, F>(&self, q: &Q, f: F) -> bool
where
Q: QueryStatusKey,
F: Fn(&mut MigrationState),
{
q.with_mut_status(self, |maybe_query_status| {
if let Some(query_status) = maybe_query_status {
f(&mut query_status.migration_state);
true
} else {
false
}
})
}

/// Marks a query as dropped by the user.
///
/// NOTE: this should only be called after we successfully remove a View for this query. This is
Expand Down Expand Up @@ -785,6 +803,22 @@ impl QueryStatusCache {
.into()
}

/// Returns a list of queries whose migration states match `states`.
pub fn queries_with_statuses(&self, states: &[MigrationState]) -> QueryList {
let statuses = self.persistent_handle.statuses.read();
statuses
.iter()
.filter_map(|(_query_id, (query, status))| {
if states.contains(&status.migration_state) {
Some((query.clone(), status.clone()))
} else {
None
}
})
.collect::<Vec<(Query, QueryStatus)>>()
.into()
}

/// Returns a list of queries that have a state of [`QueryState::Successful`].
pub fn allow_list(&self) -> Vec<(QueryId, Arc<ViewCreateRequest>, QueryStatus)> {
self.persistent_handle.allow_list()
Expand Down
18 changes: 16 additions & 2 deletions readyset-adapter/src/views_synchronizer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::collections::HashSet;
use std::sync::Arc;

use dataflow_expression::Dialect;
use nom_sql::{DialectDisplay, Relation};
use readyset_client::query::MigrationState;
use readyset_client::query::{MigrationState, Query};
use readyset_client::{ReadySetHandle, ViewCreateRequest};
use readyset_util::shared_cache::LocalCache;
use readyset_util::shutdown::ShutdownReceiver;
Expand All @@ -22,6 +23,10 @@ pub struct ViewsSynchronizer {
dialect: Dialect,
/// Global and thread-local cache of view endpoints and prepared statements.
view_name_cache: LocalCache<ViewCreateRequest, Relation>,
/// A cache to keep track of the queries for which we've already checked the server for
/// existing views. Note that this cache is *not* updated (i.e. a query is not removed) when a
/// "dry run succeeded" query is migrated.
views_checked: HashSet<Query>,
}

impl ViewsSynchronizer {
Expand All @@ -38,6 +43,7 @@ impl ViewsSynchronizer {
poll_interval,
dialect,
view_name_cache,
views_checked: HashSet::new(),
}
}

Expand Down Expand Up @@ -71,8 +77,16 @@ impl ViewsSynchronizer {
debug!("Views synchronizer polling");
let queries = self
.query_status_cache
.pending_migration()
.queries_with_statuses(&[MigrationState::DryRunSucceeded, MigrationState::Pending])
.into_iter()
.filter(|(q, _)| {
if self.views_checked.contains(q) {
false
} else {
self.views_checked.insert(q.clone());
true
}
})
.filter_map(|(q, _)| {
q.into_parsed()
// once arc_unwrap_or_clone is stabilized, we can use that cleaner syntax
Expand Down
2 changes: 1 addition & 1 deletion readyset-clustertest/src/readyset_mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1599,7 +1599,7 @@ async fn views_synchronize_between_deployments() {

// Eventually it should show up in adapter 1 too
eventually! {
adapter_1.as_mysql_conn().unwrap().query_drop("SELECT * FROM t1;");
adapter_1.as_mysql_conn().unwrap().query_drop("SELECT * FROM t1;").await.unwrap();
last_statement_destination(adapter_1.as_mysql_conn().unwrap()).await == QueryDestination::Readyset
}

Expand Down

0 comments on commit f9ceca3

Please sign in to comment.