Skip to content

Commit

Permalink
be more robust to spurious websocket messages
Browse files Browse the repository at this point in the history
  • Loading branch information
martinbonnin committed Nov 9, 2023
1 parent e57846d commit 07422c1
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ interface OperationListener {
fun onRetry(throwable: Throwable)
}

/**
* @param terminated a state that signals that a general error happened all future WebSocket messages
* must be ignored. This is to make it robust to [WsProtocol] that send spurious messages.
*
* [OperationListener] is called directly from the [WebSocketListener.onMessage] thread except for
* [GeneralErrorServerMessage] that needs to suspend for [reopenWhen].
* In that state, all websocket messages are ignored
*/
private class ActiveOperationListener(val operationListener: OperationListener, var terminated: Boolean)

internal class SubscribableWebSocket(
webSocketEngine: WebSocketEngine,
url: String,
Expand All @@ -77,7 +87,7 @@ internal class SubscribableWebSocket(
private val lock = reentrantLock()
private var timeoutJob: Job? = null
private var state: State = State.Initial
private var activeListeners = mutableMapOf<String, OperationListener>()
private var activeListeners = mutableMapOf<String, ActiveOperationListener>()
private var idleJob: Job? = null
private var pingJob: Job? = null
// end of locked fields
Expand Down Expand Up @@ -121,9 +131,9 @@ internal class SubscribableWebSocket(

listeners.forEach {
if (reopen) {
it.onRetry(throwable)
it.operationListener.onRetry(throwable)
} else {
it.onError(throwable)
it.operationListener.onError(throwable)
}
}
}
Expand Down Expand Up @@ -151,10 +161,12 @@ internal class SubscribableWebSocket(
}

private fun listenerFor(id: String): OperationListener? = lock.withLock {
if (state == State.GeneralError) {
null
} else {
activeListeners.get(id)
activeListeners.get(id)?.let {
if (it.terminated) {
null
} else {
it.operationListener
}
}
}

Expand Down Expand Up @@ -226,7 +238,9 @@ internal class SubscribableWebSocket(

is GeneralErrorServerMessage -> {
lock.withLock {
state = State.GeneralError
activeListeners.values.forEach {
it.terminated = true
}
}
scope.launch { disconnect(message.exception) }
}
Expand Down Expand Up @@ -257,7 +271,7 @@ internal class SubscribableWebSocket(
if (activeListeners.containsKey(request.requestUuid.toString())) {
false
} else {
activeListeners.put(request.requestUuid.toString(), listener)
activeListeners.put(request.requestUuid.toString(), ActiveOperationListener(listener, false))
true
}
}
Expand Down Expand Up @@ -305,15 +319,6 @@ private enum class State {
Initial,
AwaitAck,
Connected,
/**
* a state that signals that a general error happened all future WebSocket messages
* must be ignored. This is to make it robust to [WsProtocol] that send spurious messages.
*
* [OperationListener] is called directly from the [WebSocketListener.onMessage] thread except for
* [GeneralErrorServerMessage] that needs to suspend for [reopenWhen].
* In that state, all websocket messages are ignored
*/
GeneralError,
Disconnected

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import kotlin.test.assertIs
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.seconds


private data class Item(
val message: WebSocketMessage? = null,
val open: Boolean = false,
Expand Down

0 comments on commit 07422c1

Please sign in to comment.