Skip to content

Commit

Permalink
Ensure waiting is logged
Browse files Browse the repository at this point in the history
  • Loading branch information
lukellmann committed Aug 10, 2024
1 parent ae7ab3f commit bdb3fa4
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.onSubscription
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.coroutines.resume
import kotlin.time.Duration.Companion.seconds

Expand Down Expand Up @@ -94,15 +93,15 @@ private class IdentifyRateLimiterImpl(
) = scope.launch {
val rateLimitKey = shardId % maxConcurrency
val mutex = mutexesByRateLimitKey[rateLimitKey]
// best effort, only used for logging (might be false even if mutex.withLock suspends later if we are unlucky)
val wasLocked = mutex.isLocked
val wasLocked = !mutex.tryLock()
if (wasLocked) {
logger.debug {
"Waiting for other shard(s) with rate_limit_key $rateLimitKey to identify before identifying on " +
"shard $shardId"
}
mutex.lock()
}
mutex.withLock { // in case something terrible happens, ensure the mutex is unlocked
try { // in case something terrible happens, ensure the mutex is unlocked
// using a timeout so a broken gateway won't block its rate_limit_key for a long time
val responseToIdentify = withTimeoutOrNull(IDENTIFY_TIMEOUT) {
events.onSubscription { // onSubscription ensures we don't miss events
Expand All @@ -126,6 +125,8 @@ private class IdentifyRateLimiterImpl(
} + ", delaying $DELAY_AFTER_IDENTIFY before freeing up rate_limit_key $rateLimitKey"
}
delay(DELAY_AFTER_IDENTIFY) // delay before unlocking mutex to free up the current rateLimitKey
} finally {
mutex.unlock()
}
}

Expand Down

0 comments on commit bdb3fa4

Please sign in to comment.