Skip to content

Commit

Permalink
log_backup: fix panic when encountered error during resuming (#17021) (
Browse files Browse the repository at this point in the history
…#17028)

close #17020

Spawn thread from the thread pool directly. (Instead of the thread local runtime handle.)

Signed-off-by: hillium <[email protected]>

Co-authored-by: hillium <[email protected]>
  • Loading branch information
ti-chi-bot and YuJuncen committed May 20, 2024
1 parent 53048f9 commit ba73b0d
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 2 deletions.
2 changes: 1 addition & 1 deletion components/backup-stream/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ where
Err(err) => {
err.report(format!("failed to resume backup stream task {}", task_name));
let sched = self.scheduler.clone();
tokio::task::spawn(root!("retry_resume"; async move {
self.pool.spawn(root!("retry_resume"; async move {
tokio::time::sleep(Duration::from_secs(5)).await;
sched
.schedule(Task::WatchTask(TaskOp::ResumeTask(task_name)))
Expand Down
14 changes: 13 additions & 1 deletion components/backup-stream/src/metadata/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,13 @@ impl<Store: MetaStore> MetadataClient<Store> {
.await
}

/// resume a task.
pub async fn resume(&self, name: &str) -> Result<()> {
self.meta_store
.delete(Keys::Key(MetaKey::pause_of(name)))
.await
}

pub async fn get_tasks_pause_status(&self) -> Result<HashMap<Vec<u8>, bool>> {
let kvs = self
.meta_store
Expand All @@ -354,6 +361,11 @@ impl<Store: MetaStore> MetadataClient<Store> {
defer! {
super::metrics::METADATA_OPERATION_LATENCY.with_label_values(&["task_get"]).observe(now.saturating_elapsed().as_secs_f64())
}
fail::fail_point!("failed_to_get_task", |_| {
Err(Error::MalformedMetadata(
"failed to connect etcd client".to_string(),
))
});
let items = self
.meta_store
.get_latest(Keys::Key(MetaKey::task_of(name)))
Expand All @@ -376,7 +388,7 @@ impl<Store: MetaStore> MetadataClient<Store> {
}
fail::fail_point!("failed_to_get_tasks", |_| {
Err(Error::MalformedMetadata(
"faild to connect etcd client".to_string(),
"failed to connect etcd client".to_string(),
))
});
let kvs = self
Expand Down
14 changes: 14 additions & 0 deletions components/backup-stream/tests/failpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,4 +399,18 @@ mod all {
std::iter::once(enc_key.as_encoded().as_slice()),
)
}

#[test]
fn failed_to_get_task_when_pausing() {
let suite = SuiteBuilder::new_named("resume_error").nodes(1).build();
suite.must_register_task(1, "resume_error");
let mcli = suite.get_meta_cli();
run_async_test(mcli.pause("resume_error")).unwrap();
suite.sync();
fail::cfg("failed_to_get_task", "1*return").unwrap();
run_async_test(mcli.resume("resume_error")).unwrap();
suite.sync();
// Make sure our suite doesn't panic.
suite.sync();
}
}

0 comments on commit ba73b0d

Please sign in to comment.