Skip to content

Commit

Permalink
Add more debug information on consume cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
lukellmann committed Aug 11, 2024
1 parent bdb3fa4 commit 5cbbf87
Showing 1 changed file with 10 additions and 5 deletions.
15 changes: 10 additions & 5 deletions gateway/src/commonMain/kotlin/ratelimit/IdentifyRateLimiter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,17 @@ private class IdentifyRateLimiterImpl(
override suspend fun consume(shardId: Int, events: SharedFlow<Event>) {
require(shardId >= 0) { "shardId must be non-negative but was $shardId" }

// if the coroutine that called consume() is cancelled, the CancellableContinuation makes sure the waiting is
// stopped (the Gateway won't try to identify), so we don't need to hold the mutex and waste time for other
// calls
return suspendCancellableCoroutine { continuation ->
val job = launchIdentifyWaiter(shardId, events, continuation)
continuation.invokeOnCancellation { job.cancel() }
val waiter = launchIdentifyWaiter(shardId, events, continuation)
// this will be invoked if the coroutine that called consume() is cancelled
continuation.invokeOnCancellation { cause ->
// stop the waiter, so we don't hold the mutex and waste time for other consume() calls (the Gateway
// won't try to identify if it was cancelled at this point)
waiter.cancel("Identify waiter was cancelled because consume() was cancelled", cause)
logger.debug(cause) {
"Identifying on shard $shardId with rate_limit_key ${shardId % maxConcurrency} was cancelled"
}
}
}
}

Expand Down

0 comments on commit 5cbbf87

Please sign in to comment.