Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BaseGateway abstraction #798

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
136 changes: 129 additions & 7 deletions gateway/api/gateway.api
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,43 @@ public final class dev/kord/gateway/AutoModerationRuleUpdate : dev/kord/gateway/
public fun toString ()Ljava/lang/String;
}

public abstract class dev/kord/gateway/BaseGateway : dev/kord/gateway/Gateway {
public fun <init> ()V
public fun detach (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
protected abstract fun getDispatcher ()Lkotlinx/coroutines/CoroutineDispatcher;
public fun getEvents ()Lkotlinx/coroutines/flow/MutableSharedFlow;
public synthetic fun getEvents ()Lkotlinx/coroutines/flow/SharedFlow;
protected final fun getLog ()Lmu/KLogger;
protected final fun getState ()Ldev/kord/gateway/BaseGateway$State;
protected abstract fun onDetach (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected abstract fun onSend (Ldev/kord/gateway/Command;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected abstract fun onStart (Ldev/kord/gateway/GatewayConfiguration;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected abstract fun onStop (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun send (Ldev/kord/gateway/Command;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected final fun setState (Ldev/kord/gateway/BaseGateway$State;)V
public fun start (Ldev/kord/gateway/GatewayConfiguration;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun stop (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected final fun throwStateError ()Ljava/lang/Void;
}

protected abstract class dev/kord/gateway/BaseGateway$State {
public synthetic fun <init> (ZLkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun getRetry ()Z
}

public final class dev/kord/gateway/BaseGateway$State$Detached : dev/kord/gateway/BaseGateway$State {
public static final field INSTANCE Ldev/kord/gateway/BaseGateway$State$Detached;
}

public final class dev/kord/gateway/BaseGateway$State$Running : dev/kord/gateway/BaseGateway$State {
public fun <init> (Z)V
}

public final class dev/kord/gateway/BaseGateway$State$Stopped : dev/kord/gateway/BaseGateway$State {
public static final field INSTANCE Ldev/kord/gateway/BaseGateway$State$Stopped;
}

public final class dev/kord/gateway/ChannelCreate : dev/kord/gateway/DispatchEvent {
public fun <init> (Ldev/kord/common/entity/DiscordChannel;Ljava/lang/Integer;)V
public final fun component1 ()Ldev/kord/common/entity/DiscordChannel;
Expand Down Expand Up @@ -219,16 +256,12 @@ public final class dev/kord/gateway/Command$SerializationStrategy : kotlinx/seri
public synthetic fun serialize (Lkotlinx/serialization/encoding/Encoder;Ljava/lang/Object;)V
}

public final class dev/kord/gateway/DefaultGateway : dev/kord/gateway/Gateway {
public final class dev/kord/gateway/DefaultGateway : dev/kord/gateway/BaseGateway {
public static final field Companion Ldev/kord/gateway/DefaultGateway$Companion;
public fun <init> (Ldev/kord/gateway/DefaultGatewayData;)V
public fun detach (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
public fun getEvents ()Lkotlinx/coroutines/flow/SharedFlow;
public fun getEvents ()Lkotlinx/coroutines/flow/MutableSharedFlow;
public synthetic fun getEvents ()Lkotlinx/coroutines/flow/SharedFlow;
public fun getPing ()Lkotlinx/coroutines/flow/StateFlow;
public fun send (Ldev/kord/gateway/Command;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun start (Ldev/kord/gateway/GatewayConfiguration;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun stop (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class dev/kord/gateway/DefaultGateway$Companion {
Expand Down Expand Up @@ -618,6 +651,7 @@ public abstract interface class dev/kord/gateway/Gateway : kotlinx/coroutines/Co
}

public final class dev/kord/gateway/Gateway$Companion {
public final fun connectionManaged (Ldev/kord/gateway/DefaultGatewayData;Lkotlin/jvm/functions/Function1;)Ldev/kord/gateway/Gateway;
public final fun none ()Ldev/kord/gateway/Gateway;
}

Expand Down Expand Up @@ -2018,6 +2052,94 @@ public final class dev/kord/gateway/builder/Shards {
public fun toString ()Ljava/lang/String;
}

public abstract interface class dev/kord/gateway/connection/GatewayConnection {
public abstract fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun getPing ()Lkotlinx/coroutines/flow/StateFlow;
public abstract fun open (Ldev/kord/gateway/connection/GatewayConnection$Data;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun send (Ldev/kord/gateway/Command;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class dev/kord/gateway/connection/GatewayConnection$CloseReason {
}

public final class dev/kord/gateway/connection/GatewayConnection$CloseReason$Error : dev/kord/gateway/connection/GatewayConnection$CloseReason {
public fun <init> (Ljava/lang/Throwable;)V
public final fun getCause ()Ljava/lang/Throwable;
}

public final class dev/kord/gateway/connection/GatewayConnection$CloseReason$InvalidSession : dev/kord/gateway/connection/GatewayConnection$CloseReason {
public static final field INSTANCE Ldev/kord/gateway/connection/GatewayConnection$CloseReason$InvalidSession;
}

public final class dev/kord/gateway/connection/GatewayConnection$CloseReason$Manual : dev/kord/gateway/connection/GatewayConnection$CloseReason {
public static final field INSTANCE Ldev/kord/gateway/connection/GatewayConnection$CloseReason$Manual;
}

public final class dev/kord/gateway/connection/GatewayConnection$CloseReason$Plain : dev/kord/gateway/connection/GatewayConnection$CloseReason {
public fun <init> (ILjava/lang/String;Ldev/kord/gateway/Resume;)V
public final fun getCode ()I
public final fun getMessage ()Ljava/lang/String;
public final fun getResume ()Ldev/kord/gateway/Resume;
}

public final class dev/kord/gateway/connection/GatewayConnection$CloseReason$Reconnect : dev/kord/gateway/connection/GatewayConnection$CloseReason {
public static final field INSTANCE Ldev/kord/gateway/connection/GatewayConnection$CloseReason$Reconnect;
}

public final class dev/kord/gateway/connection/GatewayConnection$CloseReason$ResumableInvalidSession : dev/kord/gateway/connection/GatewayConnection$CloseReason {
public fun <init> (Ldev/kord/gateway/Resume;)V
public final fun getResume ()Ldev/kord/gateway/Resume;
}

public final class dev/kord/gateway/connection/GatewayConnection$CloseReason$ResumableReconnect : dev/kord/gateway/connection/GatewayConnection$CloseReason {
public fun <init> (Ldev/kord/gateway/Resume;)V
public final fun getResume ()Ldev/kord/gateway/Resume;
}

public final class dev/kord/gateway/connection/GatewayConnection$Data {
public fun <init> (Ldev/kord/common/entity/DiscordShard;Ljava/net/URI;Ldev/kord/gateway/connection/GatewayConnection$Session;Lio/ktor/client/HttpClient;Lkotlinx/serialization/json/Json;Lkotlinx/coroutines/flow/MutableSharedFlow;Ldev/kord/common/ratelimit/RateLimiter;Ldev/kord/gateway/ratelimit/IdentifyRateLimiter;Ldev/kord/gateway/retry/Retry;)V
public final fun component1 ()Ldev/kord/common/entity/DiscordShard;
public final fun component2 ()Ljava/net/URI;
public final fun component3 ()Ldev/kord/gateway/connection/GatewayConnection$Session;
public final fun component4 ()Lio/ktor/client/HttpClient;
public final fun component5 ()Lkotlinx/serialization/json/Json;
public final fun component6 ()Lkotlinx/coroutines/flow/MutableSharedFlow;
public final fun component7 ()Ldev/kord/common/ratelimit/RateLimiter;
public final fun component8 ()Ldev/kord/gateway/ratelimit/IdentifyRateLimiter;
public final fun component9 ()Ldev/kord/gateway/retry/Retry;
public final fun copy (Ldev/kord/common/entity/DiscordShard;Ljava/net/URI;Ldev/kord/gateway/connection/GatewayConnection$Session;Lio/ktor/client/HttpClient;Lkotlinx/serialization/json/Json;Lkotlinx/coroutines/flow/MutableSharedFlow;Ldev/kord/common/ratelimit/RateLimiter;Ldev/kord/gateway/ratelimit/IdentifyRateLimiter;Ldev/kord/gateway/retry/Retry;)Ldev/kord/gateway/connection/GatewayConnection$Data;
public static synthetic fun copy$default (Ldev/kord/gateway/connection/GatewayConnection$Data;Ldev/kord/common/entity/DiscordShard;Ljava/net/URI;Ldev/kord/gateway/connection/GatewayConnection$Session;Lio/ktor/client/HttpClient;Lkotlinx/serialization/json/Json;Lkotlinx/coroutines/flow/MutableSharedFlow;Ldev/kord/common/ratelimit/RateLimiter;Ldev/kord/gateway/ratelimit/IdentifyRateLimiter;Ldev/kord/gateway/retry/Retry;ILjava/lang/Object;)Ldev/kord/gateway/connection/GatewayConnection$Data;
public fun equals (Ljava/lang/Object;)Z
public final fun getClient ()Lio/ktor/client/HttpClient;
public final fun getEventFlow ()Lkotlinx/coroutines/flow/MutableSharedFlow;
public final fun getIdentifyRateLimiter ()Ldev/kord/gateway/ratelimit/IdentifyRateLimiter;
public final fun getJson ()Lkotlinx/serialization/json/Json;
public final fun getReconnectRetry ()Ldev/kord/gateway/retry/Retry;
public final fun getSendRateLimiter ()Ldev/kord/common/ratelimit/RateLimiter;
public final fun getSession ()Ldev/kord/gateway/connection/GatewayConnection$Session;
public final fun getShard ()Ldev/kord/common/entity/DiscordShard;
public final fun getUri ()Ljava/net/URI;
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

public abstract interface class dev/kord/gateway/connection/GatewayConnection$Session {
}

public final class dev/kord/gateway/connection/GatewayConnection$Session$New : dev/kord/gateway/connection/GatewayConnection$Session {
public fun <init> (Ldev/kord/gateway/Identify;)V
public final fun getIdentify ()Ldev/kord/gateway/Identify;
}

public final class dev/kord/gateway/connection/GatewayConnection$Session$Resumed : dev/kord/gateway/connection/GatewayConnection$Session {
public fun <init> (Ldev/kord/gateway/Resume;)V
public final fun getResume ()Ldev/kord/gateway/Resume;
}

public final class dev/kord/gateway/connection/GatewayConnectionKt {
public static final fun GatewayConnection ()Ldev/kord/gateway/connection/GatewayConnection;
}

public abstract interface class dev/kord/gateway/ratelimit/IdentifyRateLimiter {
public abstract fun consume (ILkotlinx/coroutines/flow/SharedFlow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun getMaxConcurrency ()I
Expand Down
141 changes: 141 additions & 0 deletions gateway/src/main/kotlin/BaseGateway.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package dev.kord.gateway

import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableSharedFlow
import mu.KLogger
import mu.KotlinLogging
import kotlin.coroutines.CoroutineContext

/**
* Base abstraction for a gateway implementation.
*/
public abstract class BaseGateway : Gateway {

/**
* The logger for this gateway.
*/
protected val log: KLogger = KotlinLogging.logger { }

/**
* The current state of this gateway as atomic reference.
* @see State
*/
private val atomicState: AtomicRef<State> = atomic(State.Stopped)

/**
* The current state of this gateway.
* @see State
*/
protected var state: State by atomicState

/**
* The dispatcher used to run the gateway.
* It will be used to assemble the [coroutineContext] of this gateway.
* By default, there is [SupervisorJob] before the dispatcher in [coroutineContext].
*/
protected abstract val dispatcher: CoroutineDispatcher

override val coroutineContext: CoroutineContext get() = SupervisorJob() + dispatcher

override val events: MutableSharedFlow<Event> = MutableSharedFlow()

override suspend fun start(configuration: GatewayConfiguration) {
requireState<State.Stopped>()
atomicState.update { State.Running(true) }
onStart(configuration)
}

/**
* This method is called just after the [start] method,
* once the state is updated to [State.Running].
* The state is ensured to be valid before this method is called.
*/
protected abstract suspend fun onStart(configuration: GatewayConfiguration)

override suspend fun stop() {
requireStateIsNot<State.Detached>()
events.emit(Close.UserClose)
atomicState.update { State.Stopped }
onStop()
}

/**
* This method is called just after the [stop] method,
* once the [Close.UserClose] event is emitted,
* and the state is updated to [State.Stopped].
* The state is ensured to be valid before this method is called.
*/
protected abstract suspend fun onStop()

override suspend fun detach() {
(this as CoroutineScope).cancel()
if (state is State.Detached) return
atomicState.update { State.Detached }
events.emit(Close.Detach)
onDetach()
}

/**
* This method is called just after the [detach] method,
* once the state is updated to [State.Detached],
* and the [Close.Detach] event is emitted.
* The state is ensured to be valid before this method is called.
*/
protected abstract suspend fun onDetach()

override suspend fun send(command: Command) {
requireStateIsNot<State.Detached>()
onSend(command)
}

/**
* This method is called just after the [send] method.
* The state is ensured to be valid before this method is called.
*/
protected abstract suspend fun onSend(command: Command)

/**
* Checks whether the current [state] is not of type [T].
* If it is, an [IllegalStateException] is thrown with a describing message.
*/
protected inline fun <reified T : State> requireStateIsNot() {
if (state !is T) return
throwStateError()
}

/**
* Checks whether the current [state] is of type [T].
* If it isn't, an [IllegalStateException] is thrown with a describing message.
*/
protected inline fun <reified T : State> requireState() {
if (state is T) return
throwStateError()
}

/**
* Throws an [IllegalStateException] with a describing message based on the current [state].
*/
protected fun throwStateError(): Nothing {
when (state) {
is State.Stopped -> error("The gateway is already stopped.")
is State.Running -> error("The gateway is already running, call stop() first.")
is State.Detached -> error("The Gateway has been detached and can no longer be used, create a new instance instead.")
}
}

/**
* Represents the current state of the gateway.
* @param retry whether the gateway should attempt to reconnect when it stops.
*/
protected sealed class State(public val retry: Boolean) {
public object Stopped : State(false)
public class Running(retry: Boolean) : State(retry)
public object Detached : State(false)
}
}
Loading