Skip to content

Commit

Permalink
Initial Zstd draft
Browse files Browse the repository at this point in the history
  • Loading branch information
DRSchlaubi committed Nov 2, 2024
1 parent e99c5c5 commit 8f4cb59
Show file tree
Hide file tree
Showing 14 changed files with 312 additions and 35 deletions.
7 changes: 7 additions & 0 deletions core/src/commonMain/kotlin/builder/kord/KordBuilder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import dev.kord.core.gateway.DefaultMasterGateway
import dev.kord.core.gateway.handler.DefaultGatewayEventInterceptor
import dev.kord.core.gateway.handler.GatewayEventInterceptor
import dev.kord.core.supplier.EntitySupplyStrategy
import dev.kord.gateway.Compression
import dev.kord.gateway.DefaultGateway
import dev.kord.gateway.Gateway
import dev.kord.gateway.builder.Shards
Expand Down Expand Up @@ -49,6 +50,7 @@ public abstract class BaseKordBuilder internal constructor(public val token: Str
DefaultGateway {
client = resources.httpClient
identifyRateLimiter = rateLimiter
compression = this@BaseKordBuilder.compression
}
}
}
Expand All @@ -57,6 +59,11 @@ public abstract class BaseKordBuilder internal constructor(public val token: Str
{ KtorRequestHandler(it.httpClient, ExclusionRequestRateLimiter(), token = token) }
private var cacheBuilder: KordCacheBuilder.(resources: ClientResources) -> Unit = {}

/**
* The [compression mode][Compression] used.
*/
public var compression: Compression = Compression.ZLib

/**
* Enables stack trace recovery on the currently defined [RequestHandler].
*
Expand Down
2 changes: 2 additions & 0 deletions gateway/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ kotlin {
jvmMain {
dependencies {
implementation(libs.slf4j.api)
implementation(libs.zstd.jni)
}
}
jsMain {
dependencies {
implementation(libs.kotlin.node)
implementation(npm("fast-zlib", libs.versions.fastZlib.get()))
implementation(npm("simple-zstd", "1.4.2"))

// workaround for https://youtrack.jetbrains.com/issue/KT-43500 /
// https://youtrack.jetbrains.com/issue/KT-64109#focus=Comments-27-10064206.0-0 /
Expand Down
54 changes: 54 additions & 0 deletions gateway/src/commonMain/kotlin/Compression.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
@file:Suppress("FunctionName")

package dev.kord.gateway

import dev.kord.common.annotation.KordInternal
import io.ktor.websocket.*

/** @suppress */
@KordInternal // Only public for interface, binary API might change at any time
public interface Decompressor : AutoCloseable {
public fun Frame.decompress(): String

public companion object Noop : Decompressor {
override fun Frame.decompress(): String = data.decodeToString()
override fun close() {}
}
}

internal expect fun ZLibDecompressor(): Decompressor
internal expect fun ZstdDecompressor(): Decompressor

/**
* Different compression modes for the Discord gateway.
*
* @property name the name used by the Discord API
*/
public sealed interface Compression {
public val name: String?
public fun newDecompressor(): Decompressor

/**
* Implementation using no compression.
*/
public data object None : Compression {
override val name: String? = null
override fun newDecompressor(): Decompressor = Decompressor.Noop
}

/**
* Implementation using [zlib](https://zlib.net/).
*/
public data object ZLib : Compression {
override val name: String = "zlib-stream"
override fun newDecompressor(): Decompressor = ZLibDecompressor()
}

/**
* Implementation using [Zstandard](https://facebook.github.io/zstd/)
*/
public data object Zstd : Compression {
override val name: String = "zstd-stream"
override fun newDecompressor(): Decompressor = ZstdDecompressor()
}
}
37 changes: 20 additions & 17 deletions gateway/src/commonMain/kotlin/DefaultGateway.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ private sealed class State(val retry: Boolean) {
}

/**
* @param url The url to connect to.
* @param client The [HttpClient] from which a WebSocket will be created, requires the [WebSockets] plugin to be
* @property url The url to connect to.
* @property client The [HttpClient] from which a WebSocket will be created, requires the [WebSockets] plugin to be
* installed.
* @param reconnectRetry A [Retry] used for reconnection attempts.
* @param sendRateLimiter A [RateLimiter] that follows the Discord API specifications for sending messages.
* @param identifyRateLimiter An [IdentifyRateLimiter] that follows the Discord API specifications for identifying.
* @property reconnectRetry A [Retry] used for reconnection attempts.
* @property sendRateLimiter A [RateLimiter] that follows the Discord API specifications for sending messages.
* @property identifyRateLimiter An [IdentifyRateLimiter] that follows the Discord API specifications for identifying.
* @property compression the [compression mode][Compression] used
*/
public data class DefaultGatewayData(
val url: String,
Expand All @@ -54,6 +55,7 @@ public data class DefaultGatewayData(
val identifyRateLimiter: IdentifyRateLimiter,
val dispatcher: CoroutineDispatcher,
val eventFlow: MutableSharedFlow<Event>,
val compression: Compression,
)

/**
Expand All @@ -63,8 +65,6 @@ public class DefaultGateway(private val data: DefaultGatewayData) : Gateway {

override val coroutineContext: CoroutineContext = SupervisorJob() + data.dispatcher

private val compression: Boolean

private val _ping = MutableStateFlow<Duration?>(null)
override val ping: StateFlow<Duration?> get() = _ping

Expand All @@ -76,7 +76,7 @@ public class DefaultGateway(private val data: DefaultGatewayData) : Gateway {

private val handshakeHandler: HandshakeHandler

private lateinit var inflater: Inflater
private lateinit var decompressor: Decompressor

private val jsonParser = Json {
ignoreUnknownKeys = true
Expand All @@ -86,9 +86,10 @@ public class DefaultGateway(private val data: DefaultGatewayData) : Gateway {
private val stateMutex = Mutex()

init {
val initialUrl = Url(data.url)
compression = initialUrl.parameters.contains("compress", "zlib-stream")

val initialUrl = URLBuilder(data.url).apply {
val compressionName = data.compression.name ?: return@apply
parameters.append("compress", compressionName)
}.build()
val sequence = Sequence()
SequenceHandler(events, sequence)
handshakeHandler = HandshakeHandler(events, initialUrl, ::trySend, sequence, data.reconnectRetry)
Expand Down Expand Up @@ -117,7 +118,11 @@ public class DefaultGateway(private val data: DefaultGatewayData) : Gateway {
*
* > Every connection to the gateway should use its own unique zlib context.
*/
inflater = Inflater()
try {
decompressor = data.compression.newDecompressor()
} catch (e: Throwable) {
e.printStackTrace()
}
} catch (exception: Exception) {
defaultGatewayLogger.error(exception) { "" }
if (exception.isTimeout()) {
Expand Down Expand Up @@ -179,10 +184,7 @@ public class DefaultGateway(private val data: DefaultGatewayData) : Gateway {

private suspend fun read(frame: Frame) {
defaultGatewayLogger.trace { "Received raw frame: $frame" }
val json = when {
compression -> with(inflater) { frame.inflateData() }
else -> frame.data.decodeToString()
}
val json = with(decompressor) { frame.decompress() }

try {
defaultGatewayLogger.trace { "Gateway <<< $json" }
Expand All @@ -195,7 +197,7 @@ public class DefaultGateway(private val data: DefaultGatewayData) : Gateway {
}

private suspend fun handleClose() {
inflater.close()
decompressor.close()

val reason = withTimeoutOrNull(1500) {
socket.closeReason.await()
Expand All @@ -211,6 +213,7 @@ public class DefaultGateway(private val data: DefaultGatewayData) : Gateway {
state.update { State.Stopped }
throw IllegalStateException("Gateway closed: ${reason.code} ${reason.message}")
}

discordReason.resetSession -> {
setStopped()
}
Expand Down
6 changes: 4 additions & 2 deletions gateway/src/commonMain/kotlin/DefaultGatewayBuilder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import kotlin.time.Duration.Companion.seconds

public class DefaultGatewayBuilder {
public var url: String =
"wss://gateway.discord.gg/?v=${KordConfiguration.GATEWAY_VERSION}&encoding=json&compress=zlib-stream"
"wss://gateway.discord.gg/?v=${KordConfiguration.GATEWAY_VERSION}&encoding=json"
public var client: HttpClient? = null
public var reconnectRetry: Retry? = null
public var sendRateLimiter: RateLimiter? = null
public var identifyRateLimiter: IdentifyRateLimiter? = null
public var dispatcher: CoroutineDispatcher = Dispatchers.Default
public var eventFlow: MutableSharedFlow<Event> = MutableSharedFlow(extraBufferCapacity = Int.MAX_VALUE)
public var compression: Compression = Compression.ZLib

public fun build(): DefaultGateway {
val client = client ?: HttpClient(httpEngine()) {
Expand All @@ -44,7 +45,8 @@ public class DefaultGatewayBuilder {
sendRateLimiter,
identifyRateLimiter,
dispatcher,
eventFlow
eventFlow,
compression
)

return DefaultGateway(data)
Expand Down
9 changes: 0 additions & 9 deletions gateway/src/commonMain/kotlin/Inflater.kt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import io.ktor.websocket.*
import node.buffer.Buffer
import node.buffer.BufferEncoding

internal actual fun Inflater() = object : Inflater {
internal actual fun ZLibDecompressor() = object : Decompressor {
private val inflate = Inflate()

override fun Frame.inflateData(): String {
override fun Frame.decompress(): String {
val buffer = Buffer.from(data)

return inflate.process(buffer).toString(BufferEncoding.utf8)
Expand Down
29 changes: 29 additions & 0 deletions gateway/src/jsMain/kotlin/ZstdDecompressor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package dev.kord.gateway

import dev.kord.gateway.internal.ZSTDDecompress
import io.ktor.websocket.*
import js.typedarrays.toUint8Array
import node.stream.DuplexEvent
import web.encoding.TextDecoder

internal actual fun ZstdDecompressor() = object : Decompressor {
private val stream = ZSTDDecompress()
private val decoder = TextDecoder()

override fun Frame.decompress(): String {
try {
stream.write(data.toUint8Array())
stream.on(DuplexEvent.FINISH) {
println("finish")
}
stream.on(DuplexEvent.DATA) {
println("Data: $it")
}
} catch (exception: Exception) {
exception.printStackTrace()
}
return ""
}

override fun close() = stream.end()
}
7 changes: 7 additions & 0 deletions gateway/src/jsMain/kotlin/internal/JsZstd.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
@file:JsModule("simple-zstd")

package dev.kord.gateway.internal

import node.stream.Transform

internal external class ZSTDDecompress : Transform
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import io.ktor.websocket.*
import java.io.ByteArrayOutputStream
import java.util.zip.InflaterOutputStream

internal actual fun Inflater() = object : Inflater {
internal actual fun ZLibDecompressor() = object : Decompressor {
private val delegate = java.util.zip.Inflater()

override fun Frame.inflateData(): String {
override fun Frame.decompress(): String {
val outputStream = ByteArrayOutputStream()
InflaterOutputStream(outputStream, delegate).use {
it.write(data)
Expand Down
33 changes: 33 additions & 0 deletions gateway/src/jvmMain/kotlin/ZstdDecompressor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package dev.kord.gateway

import com.github.luben.zstd.ZstdInputStream
import io.ktor.websocket.*
import java.io.ByteArrayInputStream
import java.io.InputStream

internal actual fun ZstdDecompressor() = object : Decompressor {

private val input = UpdatableByteArrayInputStream()
private val zstdStream = ZstdInputStream(input).apply { continuous = true }

override fun Frame.decompress(): String {
input.updateDelegate(data)
return zstdStream.readAllBytes().decodeToString()
}

override fun close() {
zstdStream.close()
}
}

private class UpdatableByteArrayInputStream : InputStream() {
private var delegate: ByteArrayInputStream? = null

private val d: InputStream get() = delegate ?: error("No data available")

override fun read(): Int = d.read()

fun updateDelegate(bytes: ByteArray) {
delegate = ByteArrayInputStream(bytes)
}
}
3 changes: 3 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ kotlin-node = "22.5.4-pre.818" # https://github.com/JetBrains/kotlin-wrappers
bignum = "0.3.10" # https://github.com/ionspin/kotlin-multiplatform-bignum
stately = "2.1.0" # https://github.com/touchlab/Stately
fastZlib = "2.0.1" # https://github.com/timotejroiko/fast-zlib
zstd-jni = "1.5.6-7" # https://github.com/luben/zstd-jni
zstd-codec = "0.1.5" # https://www.npmjs.com/package/zstd-codec

# code generation
ksp = "2.0.21-1.0.25" # https://github.com/google/ksp
Expand Down Expand Up @@ -63,6 +65,7 @@ kotlin-node = { module = "org.jetbrains.kotlin-wrappers:kotlin-node", version.re
# JDK replacements
bignum = { module = "com.ionspin.kotlin:bignum", version.ref = "bignum" }
stately-collections = { module = "co.touchlab:stately-concurrent-collections", version.ref = "stately" }
zstd-jni = { module = "com.github.luben:zstd-jni", version.ref = "zstd-jni" }

# code generation
ksp-api = { module = "com.google.devtools.ksp:symbol-processing-api", version.ref = "ksp" }
Expand Down
Loading

0 comments on commit 8f4cb59

Please sign in to comment.