From cdb7ed2075952d7a797ea9fe150c857f49046100 Mon Sep 17 00:00:00 2001 From: Di Wu Date: Fri, 24 May 2024 11:53:08 -0700 Subject: [PATCH 1/2] fix(datastore): change OutgoingMutationQueue use TaskQueue for state transition --- .../OutgoingMutationQueue/OutgoingMutationQueue.swift | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift index 26cde77852..1ca931a66d 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift @@ -27,10 +27,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { private let operationQueue: OperationQueue /// A DispatchQueue for synchronizing state on the mutation queue - private let mutationDispatchQueue = DispatchQueue( - label: "com.amazonaws.OutgoingMutationQueue", - target: DispatchQueue.global() - ) + private let mutationDispatchQueue = TaskQueue() private weak var api: APICategoryGraphQLBehaviorExtended? private weak var reconciliationQueue: IncomingEventReconciliationQueue? @@ -55,7 +52,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { let operationQueue = OperationQueue() operationQueue.name = "com.amazonaws.OutgoingMutationOperationQueue" - operationQueue.underlyingQueue = mutationDispatchQueue + operationQueue.qualityOfService = .default operationQueue.maxConcurrentOperationCount = 1 operationQueue.isSuspended = true @@ -139,6 +136,10 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { queryMutationEventsFromStorage { [weak self] in guard let self = self else { return } + guard case .starting = self.stateMachine.state else { + self.log.debug("Failed to continue doStart operation, current state(\(self.stateMachine.state)) is not starting") + return + } self.operationQueue.isSuspended = false // State machine notification to ".receivedSubscription" will be handled in `receive(subscription:)` From 6baf419993ed6eb25d5ee04fbe88cd5068369f0f Mon Sep 17 00:00:00 2001 From: Di Wu Date: Mon, 27 May 2024 10:05:37 -0700 Subject: [PATCH 2/2] Update AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift Co-authored-by: Michael Law <1365977+lawmicha@users.noreply.github.com> --- .../OutgoingMutationQueue/OutgoingMutationQueue.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift index 1ca931a66d..d517b170af 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift @@ -137,7 +137,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { queryMutationEventsFromStorage { [weak self] in guard let self = self else { return } guard case .starting = self.stateMachine.state else { - self.log.debug("Failed to continue doStart operation, current state(\(self.stateMachine.state)) is not starting") + self.log.debug("Unexpected state transition while performing `doStart()` during `.starting` state. Current state: \(self.stateMachine.state).") return }