From ba73b0d92d94463d74543550d0efe61fa6a6f416 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 20 May 2024 19:44:45 +0800 Subject: [PATCH] log_backup: fix panic when encountered error during resuming (#17021) (#17028) close tikv/tikv#17020 Spawn thread from the thread pool directly. (Instead of the thread local runtime handle.) Signed-off-by: hillium Co-authored-by: hillium --- components/backup-stream/src/endpoint.rs | 2 +- components/backup-stream/src/metadata/client.rs | 14 +++++++++++++- components/backup-stream/tests/failpoints/mod.rs | 14 ++++++++++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/components/backup-stream/src/endpoint.rs b/components/backup-stream/src/endpoint.rs index 7e6e97f2d53..b26a04e56c0 100644 --- a/components/backup-stream/src/endpoint.rs +++ b/components/backup-stream/src/endpoint.rs @@ -753,7 +753,7 @@ where Err(err) => { err.report(format!("failed to resume backup stream task {}", task_name)); let sched = self.scheduler.clone(); - tokio::task::spawn(root!("retry_resume"; async move { + self.pool.spawn(root!("retry_resume"; async move { tokio::time::sleep(Duration::from_secs(5)).await; sched .schedule(Task::WatchTask(TaskOp::ResumeTask(task_name))) diff --git a/components/backup-stream/src/metadata/client.rs b/components/backup-stream/src/metadata/client.rs index 21ca2d60556..de8d9c55251 100644 --- a/components/backup-stream/src/metadata/client.rs +++ b/components/backup-stream/src/metadata/client.rs @@ -331,6 +331,13 @@ impl MetadataClient { .await } + /// resume a task. + pub async fn resume(&self, name: &str) -> Result<()> { + self.meta_store + .delete(Keys::Key(MetaKey::pause_of(name))) + .await + } + pub async fn get_tasks_pause_status(&self) -> Result, bool>> { let kvs = self .meta_store @@ -354,6 +361,11 @@ impl MetadataClient { defer! { super::metrics::METADATA_OPERATION_LATENCY.with_label_values(&["task_get"]).observe(now.saturating_elapsed().as_secs_f64()) } + fail::fail_point!("failed_to_get_task", |_| { + Err(Error::MalformedMetadata( + "failed to connect etcd client".to_string(), + )) + }); let items = self .meta_store .get_latest(Keys::Key(MetaKey::task_of(name))) @@ -376,7 +388,7 @@ impl MetadataClient { } fail::fail_point!("failed_to_get_tasks", |_| { Err(Error::MalformedMetadata( - "faild to connect etcd client".to_string(), + "failed to connect etcd client".to_string(), )) }); let kvs = self diff --git a/components/backup-stream/tests/failpoints/mod.rs b/components/backup-stream/tests/failpoints/mod.rs index b92c672a462..37c039848e3 100644 --- a/components/backup-stream/tests/failpoints/mod.rs +++ b/components/backup-stream/tests/failpoints/mod.rs @@ -399,4 +399,18 @@ mod all { std::iter::once(enc_key.as_encoded().as_slice()), ) } + + #[test] + fn failed_to_get_task_when_pausing() { + let suite = SuiteBuilder::new_named("resume_error").nodes(1).build(); + suite.must_register_task(1, "resume_error"); + let mcli = suite.get_meta_cli(); + run_async_test(mcli.pause("resume_error")).unwrap(); + suite.sync(); + fail::cfg("failed_to_get_task", "1*return").unwrap(); + run_async_test(mcli.resume("resume_error")).unwrap(); + suite.sync(); + // Make sure our suite doesn't panic. + suite.sync(); + } }