Skip to content

Commit

Permalink
adapter: Fix views synchronizer race
Browse files Browse the repository at this point in the history
commit message

Change-Id: Ibbc34e0856ed22b475e75262e14f8d55995089d8
  • Loading branch information
ethowitz committed Apr 26, 2024
1 parent 6617599 commit 93220d7
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 111 deletions.
28 changes: 8 additions & 20 deletions readyset-adapter/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2046,7 +2046,7 @@ where
}

if only_supported {
queries.retain(|q| q.status.migration_state.is_supported());
queries.retain(|q| q.status.is_supported());
}

let select_schema = if let Some(handle) = self.metrics_handle.as_mut() {
Expand Down Expand Up @@ -2075,15 +2075,7 @@ where
let mut data = queries
.into_iter()
.map(|DeniedQuery { id, query, status }| {
let s = match status.migration_state {
MigrationState::DryRunSucceeded
| MigrationState::Successful
| MigrationState::Dropped => "yes",
MigrationState::Pending | MigrationState::Inlined(_) => "pending",
MigrationState::Unsupported => "unsupported",
}
.to_string();

let s = status.supported_state().to_string();
let mut row = vec![
DfValue::from(id.to_string()),
DfValue::from(Self::format_query_text(
Expand Down Expand Up @@ -2506,11 +2498,7 @@ where
event: &mut QueryExecutionEvent,
processed_query_params: ProcessedQueryParams,
) -> Result<QueryResult<'a, DB>, DB::Error> {
let mut status = status.unwrap_or(QueryStatus {
migration_state: MigrationState::Unsupported,
execution_info: None,
always: false,
});
let mut status = status.unwrap_or(QueryStatus::new(MigrationState::Unsupported));
let original_status = status.clone();
let did_work = if let Some(ref mut i) = status.execution_info {
i.reset_if_exceeded_recovery(
Expand All @@ -2524,9 +2512,9 @@ where
// Test several conditions to see if we should proxy
let upstream_exists = upstream.is_some();
let proxy_out_of_band = settings.migration_mode != MigrationMode::InRequestPath
&& status.migration_state != MigrationState::Successful;
&& status.migration_state() != &MigrationState::Successful;
let unsupported_or_dropped = matches!(
&status.migration_state,
status.migration_state(),
MigrationState::Unsupported | MigrationState::Dropped
);
let exceeded_network_failure = status
Expand Down Expand Up @@ -2568,7 +2556,7 @@ where
match noria_res {
Ok(noria_ok) => {
// We managed to select on ReadySet, good for us
status.migration_state = MigrationState::Successful;
status.set_migration_state(MigrationState::Successful);
if let Some(i) = status.execution_info.as_mut() {
i.execute_succeeded()
}
Expand All @@ -2591,9 +2579,9 @@ where
}

if noria_err.caused_by_view_not_found() {
status.migration_state = MigrationState::Pending;
status.set_migration_state(MigrationState::Pending);
} else if noria_err.caused_by_unsupported() {
status.migration_state = MigrationState::Unsupported;
status.set_migration_state(MigrationState::Unsupported);
};

let always = status.always;
Expand Down
1 change: 1 addition & 0 deletions readyset-adapter/src/backend/noria_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,7 @@ impl NoriaConnector {
create_if_not_exist: bool,
override_schema_search_path: Option<Vec<SqlIdentifier>>,
) -> ReadySetResult<Relation> {
println!("get_view_name_cached");
let search_path =
override_schema_search_path.unwrap_or_else(|| self.schema_search_path().to_vec());
let view_request = ViewCreateRequest::new(q.clone(), search_path.clone());
Expand Down
18 changes: 5 additions & 13 deletions readyset-adapter/src/proxied_queries_reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ impl ProxiedQueriesReporter {
TelemetryEvent::ProxiedQuery,
TelemetryBuilder::new()
.proxied_query(anon_q)
.migration_status(query.status.migration_state.to_string())
.migration_status(query.status.migration_state().to_string())
.build(),
))
};

match reported_queries.insert(query.id, query.status.migration_state.clone()) {
match reported_queries.insert(query.id, query.status.migration_state().clone()) {
Some(old_migration_state) => {
// Check whether we know of a new migration state for this query. Send an event if
// so
if old_migration_state != query.status.migration_state {
if &old_migration_state != query.status.migration_state() {
build_event()
} else {
None
Expand Down Expand Up @@ -105,11 +105,7 @@ mod tests {
query: Query::ParseFailed(Arc::new(
"this is easier than making a view create request".to_string(),
)),
status: QueryStatus {
migration_state: MigrationState::Pending,
execution_info: None,
always: false,
},
status: QueryStatus::new(MigrationState::Pending),
};
proxied_queries_reporter.report_query(&mut init_q).await;
let status = {
Expand All @@ -126,11 +122,7 @@ mod tests {
query: Query::ParseFailed(Arc::new(
"this is easier than making a view create request".to_string(),
)),
status: QueryStatus {
migration_state: MigrationState::Successful,
execution_info: None,
always: false,
},
status: QueryStatus::new(MigrationState::Successful),
};
proxied_queries_reporter.report_query(&mut updated_q).await;
let status = {
Expand Down
102 changes: 28 additions & 74 deletions readyset-adapter/src/query_status_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ impl QueryStatusCache {
{
let q = q.into();
let status = QueryStatus::default_for_query(&q);
let migration_state = status.migration_state.clone();
let migration_state = status.migration_state().clone();
let id = self.insert_with_status(q, status);
(id, migration_state)
}
Expand All @@ -326,9 +326,9 @@ impl QueryStatusCache {
Query::Parsed { .. } => status,
Query::ParseFailed(_) => {
let mut status = status;
if status.migration_state != MigrationState::Unsupported {
if status.migration_state() != &MigrationState::Unsupported {
error!("Cannot set migration state to anything other than Unsupported for a Query::ParseFailed");
status.migration_state = MigrationState::Unsupported
status.set_migration_state(MigrationState::Unsupported);
}
status
}
Expand All @@ -355,7 +355,7 @@ impl QueryStatusCache {
let query_state = self.id_to_status.get(&id);

match query_state {
Some(s) => (id, s.value().migration_state.clone()),
Some(s) => (id, s.value().migration_state().clone()),
None => self.insert(q.clone()),
}
}
Expand All @@ -370,7 +370,7 @@ impl QueryStatusCache {
let id = q.query_id();
let query_state = self.id_to_status.get(&id);

(id, query_state.map(|s| s.value().migration_state.clone()))
(id, query_state.map(|s| s.value().migration_state().clone()))
}

/// This function returns the query status of a query. If the query does not exist
Expand Down Expand Up @@ -491,10 +491,10 @@ impl QueryStatusCache {
//
// `Inlined` queries may only be changed from `Inlined` to `Unsupported`.
if !matches!(
s.migration_state,
s.migration_state(),
MigrationState::Unsupported | MigrationState::Inlined(_)
) {
s.migration_state = MigrationState::Pending
s.set_migration_state(MigrationState::Pending);
}
false
}
Expand All @@ -504,14 +504,7 @@ impl QueryStatusCache {
});

if should_insert {
self.insert_with_status(
q.clone(),
QueryStatus {
migration_state: MigrationState::Pending,
execution_info: None,
always: false,
},
);
self.insert_with_status(q.clone(), QueryStatus::new(MigrationState::Pending));
}
}

Expand All @@ -526,37 +519,15 @@ impl QueryStatusCache {
// Dropped should not be set manually
debug_assert!(!matches!(m, MigrationState::Dropped));

let should_insert = q.with_mut_status(self, |s| {
match s {
Some(s) => {
match s.migration_state {
// We do not support transitions from the `Unsupported` state, as we assume
// any `Unsupported` query will remain `Unsupported` for the duration of
// this process.
MigrationState::Unsupported => {}
// A query with an Inlined state can only transition to Unsupported.
MigrationState::Inlined(_) => {
if matches!(m, MigrationState::Unsupported) {
s.migration_state = MigrationState::Unsupported;
}
}
// All other state transitions are allowed.
_ => s.migration_state = m.clone(),
}
false
}
None => true,
let should_insert = q.with_mut_status(self, |s| match s {
Some(s) => {
s.set_migration_state(m.clone());
false
}
None => true,
});
if should_insert {
self.insert_with_status(
q.clone(),
QueryStatus {
migration_state: m,
execution_info: None,
always: false,
},
);
self.insert_with_status(q.clone(), QueryStatus::new(m));
}
}

Expand All @@ -570,20 +541,13 @@ impl QueryStatusCache {
{
let should_insert = q.with_mut_status(self, |s| match s {
Some(s) => {
s.migration_state = MigrationState::Dropped;
s.set_migration_state(MigrationState::Dropped);
false
}
None => true,
});
if should_insert {
self.insert_with_status(
q.clone(),
QueryStatus {
migration_state: MigrationState::Dropped,
execution_info: None,
always: false,
},
);
self.insert_with_status(q.clone(), QueryStatus::new(MigrationState::Dropped));
}
}

Expand All @@ -592,20 +556,13 @@ impl QueryStatusCache {
pub fn unsupported_inlined_migration(&self, q: &ViewCreateRequest) {
let should_insert = q.with_mut_status(self, |s| match s {
Some(s) => {
s.migration_state = MigrationState::Unsupported;
s.set_migration_state(MigrationState::Unsupported);
false
}
None => true,
});
if should_insert {
self.insert_with_status(
q.clone(),
QueryStatus {
migration_state: MigrationState::Unsupported,
execution_info: None,
always: false,
},
);
self.insert_with_status(q.clone(), QueryStatus::new(MigrationState::Unsupported));
}
self.persistent_handle.pending_inlined_migrations.remove(q);
}
Expand All @@ -619,7 +576,7 @@ impl QueryStatusCache {
Q: QueryStatusKey,
{
q.with_mut_status(self, |s| match s {
Some(s) if s.migration_state != MigrationState::Unsupported => {
Some(s) if s.migration_state() != &MigrationState::Unsupported => {
s.always = always;
}
_ => {}
Expand All @@ -634,8 +591,8 @@ impl QueryStatusCache {
Q: QueryStatusKey,
{
let should_insert = q.with_mut_status(self, |s| match s {
Some(s) if s.migration_state != MigrationState::Unsupported => {
s.migration_state = status.migration_state.clone();
Some(s) if s.migration_state() != &MigrationState::Unsupported => {
s.set_migration_state(status.migration_state().clone());
s.execution_info = status.execution_info.clone();
false
}
Expand All @@ -659,15 +616,15 @@ impl QueryStatusCache {
.iter_mut()
.filter(|v| v.is_successful())
.for_each(|mut v| {
v.migration_state = MigrationState::Pending;
v.set_migration_state(MigrationState::Pending);
v.always = false;
});
let mut statuses = self.persistent_handle.statuses.write();
statuses
.iter_mut()
.filter(|(_query_id, (_query, status))| status.is_successful())
.for_each(|(_query_id, (_query, ref mut status))| {
status.migration_state = MigrationState::Pending;
status.set_migration_state(MigrationState::Pending);
status.always = false;
});
}
Expand Down Expand Up @@ -726,11 +683,7 @@ impl QueryStatusCache {

// Then update the inlined state epoch for the query
query.with_mut_status(self, |s| {
if let Some(QueryStatus {
migration_state: MigrationState::Inlined(ref mut state),
..
}) = s
{
if let Some(state) = s.and_then(|s| s.as_inlined_mut()) {
state.epoch += 1;
}
})
Expand All @@ -746,7 +699,7 @@ impl QueryStatusCache {
// Get the placeholders that require inlining
let placeholders =
q.key()
.with_status(self, |s| match s.map(|s| &s.migration_state) {
.with_status(self, |s| match s.map(|s| s.migration_state()) {
Some(MigrationState::Inlined(InlinedState {
inlined_placeholders,
..
Expand All @@ -772,6 +725,7 @@ impl QueryStatusCache {
/// Does not include any queries that require inlining.
pub fn pending_migration(&self) -> QueryList {
let statuses = self.persistent_handle.statuses.read();

statuses
.iter()
.filter_map(|(_query_id, (query, status))| {
Expand Down Expand Up @@ -1172,8 +1126,8 @@ mod tests {
epoch: 1,
});
cache.update_query_migration_state(&q, inlined_state.clone());
let state = cache.query_status(&q).migration_state;
assert_eq!(state, inlined_state);
let state = cache.query_status(&q);
assert_eq!(state.migration_state(), &inlined_state);
assert_eq!(
cache
.persistent_handle
Expand Down
Loading

0 comments on commit 93220d7

Please sign in to comment.