From 3ff9b7d67680b3a3d4d35e851733de2d9d32c58f Mon Sep 17 00:00:00 2001 From: Cilly Leang Date: Tue, 27 Jan 2026 16:10:08 +1100 Subject: [PATCH] feat(core): heartbeat and basic sequence handling --- .../core/api/gateway/EventPolySerializer.kt | 6 +- .../moe/lava/neon/core/api/gateway/Gateway.kt | 72 +++++++++++++++++-- .../lava/neon/core/api/gateway/Payloads.kt | 38 ++++++---- 3 files changed, 98 insertions(+), 18 deletions(-) diff --git a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/EventPolySerializer.kt b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/EventPolySerializer.kt index 153a982..f2dac8d 100644 --- a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/EventPolySerializer.kt +++ b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/EventPolySerializer.kt @@ -7,8 +7,8 @@ import kotlinx.serialization.json.int import kotlinx.serialization.json.jsonObject import kotlinx.serialization.json.jsonPrimitive -object EventPolySerializer : JsonContentPolymorphicSerializer(Event::class) { - override fun selectDeserializer(element: JsonElement): DeserializationStrategy { +object EventPolySerializer : JsonContentPolymorphicSerializer(Event.WithSequence::class) { + override fun selectDeserializer(element: JsonElement): DeserializationStrategy { val op = element.jsonObject["op"]!!.jsonPrimitive.int if (op == 0) { val name = element.jsonObject["t"]?.jsonPrimitive?.content @@ -18,7 +18,9 @@ object EventPolySerializer : JsonContentPolymorphicSerializer(Event::clas } } return when (op) { + 1 -> Event.Incoming.serializer(Payload.Heartbeat.serializer()) 10 -> Event.Incoming.serializer(Payload.Hello.serializer()) + 11 -> Event.Incoming.serializer(Payload.HeartbeatAck.serializer()) else -> Event.Unknown.serializer() } } diff --git a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/Gateway.kt b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/Gateway.kt index faeadb4..c57455f 100644 --- a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/Gateway.kt +++ b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/Gateway.kt @@ -15,6 +15,7 @@ import io.ktor.websocket.readText import io.ktor.websocket.send import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onCompletion @@ -24,6 +25,9 @@ import kotlinx.serialization.ExperimentalSerializationApi import moe.lava.neon.core.api.ApiConstants import moe.lava.neon.core.di.GatewayHandlerGraph import moe.lava.neon.core.repository.AuthRepository +import kotlin.random.Random +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds @Inject class Gateway( @@ -36,6 +40,8 @@ class Gateway( private val json = ApiConstants.json + private var seq: Int? = null + @OptIn(ExperimentalSerializationApi::class) suspend fun connect() { if (ws != null) { @@ -67,10 +73,18 @@ class Gateway( logger.d { "Received event ${frame.readText()}" } - when (val msg = json.decodeFromString(EventPolySerializer, frame.readText())) { + val msg = json.decodeFromString(EventPolySerializer, frame.readText()) + val seq = this.seq ?: 0 + if (seq + 1 == msg.s) { + this.seq = msg.s + } else if (msg.s != null) { + resume(ResumeReason.SkippedSequence(msg.s!!)) + return@onEach + } + + when (msg) { is Event.Incoming<*> -> scope.launch { handleEvent(msg) } - is Event.Unknown -> logger.w { "Unknown event $msg" } - is Event.Outgoing<*> -> throw UnsupportedOperationException("Tried to decode outgoing message") + is Event.Unknown -> scope.launch { handleUnknownEvent(msg) } } } .launchIn(scope) @@ -82,19 +96,47 @@ class Gateway( is Payload.Hello -> handleHello(payload) is Payload.Ready -> handlers.ready.handle(payload) is Payload.Heartbeat -> {} - is Payload.HeartbeatAck -> {} + is Payload.HeartbeatAck -> { missedBeats -= 1 } } } + suspend fun handleUnknownEvent(e: Event.Unknown) { + logger.w { "Unknown event $e" } + } + suspend fun handleHello(e: Payload.Hello) { val token = auth.token ?: throw IllegalStateException("Token missing between connection and hello, cannot send Identify") + Payload.Identify(token = token).pack().send() + + val interval = e.heartbeatInterval.milliseconds + scope.launch { + startHeartbeat(interval) + } + } + + private var missedBeats = 0 + private suspend fun startHeartbeat(interval: Duration) { + val ws = this.ws + ?: throw IllegalStateException("Ws missing whilst starting heartbeat") + + delay(interval * Random.nextDouble()) + while (this@Gateway.ws == ws) { + if (missedBeats >= 1) { + resume(ResumeReason.MissedHeartbeat) + break + } + Payload.QoSHeartbeat(this@Gateway.seq).pack().send() + missedBeats += 1 + delay(interval) + } } // TODO: handle resuming, etc.. suspend fun cleanup(error: Throwable? = null) { logger.d(error) { "Websocket connection closed, cleaning up..." } + this.ws = null } suspend fun disconnect() { @@ -107,6 +149,28 @@ class Gateway( ws.close() } + private sealed class ResumeReason { + data object MissedHeartbeat : ResumeReason() + data class SkippedSequence(val next: Int) : ResumeReason() + data class CloseCode(val code: Int) : ResumeReason() + } + + private suspend fun resume(reason: ResumeReason?) { + val msg = when (reason) { + is ResumeReason.MissedHeartbeat -> + "heartbeat missed" + is ResumeReason.SkippedSequence -> + "events skipped one sequence (expected: $seq, actual: ${reason.next})" + is ResumeReason.CloseCode -> + "closed with code ${reason.code}" + null -> + "no reason" + } + + logger.e { "Resuming, cause: $msg" } + // TODO + } + private suspend inline fun Event.Outgoing.send() { val ws = ws ?: throw IllegalStateException("Tried to send with no connection") diff --git a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/Payloads.kt b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/Payloads.kt index d750f79..ba0ddf8 100644 --- a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/Payloads.kt +++ b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/Payloads.kt @@ -6,31 +6,36 @@ import kotlinx.serialization.json.JsonElement import moe.lava.neon.core.api.ApiConstants import moe.lava.neon.core.api.structures.User -sealed class Event { - abstract val op: Int - abstract val d: Any? +sealed interface Event { + val op: Int + val d: Any? + + sealed interface WithSequence : Event { + val s: Int? + val t: String? + } @Serializable data class Incoming( override val op: Int, override val d: T, - val s: Int?, - val t: String?, - ) : Event() + override val s: Int?, + override val t: String?, + ) : WithSequence @Serializable data class Outgoing( override val op: Int, override val d: T, - ) : Event() + ) : Event @Serializable data class Unknown( override val op: Int, override val d: JsonElement?, - val s: Int?, - val t: String?, - ) : Event() + override val s: Int?, + override val t: String?, + ) : WithSequence } @Serializable @@ -45,9 +50,18 @@ sealed interface Payload { value class Heartbeat(val lastSequence: Int?) : Incoming, Outgoing // 40 - @JvmInline @Serializable - value class QoSHeartbeat(val lastSequence: Int?) : Outgoing + data class QoSHeartbeat( + val seq: Int?, + val qos: QoSPayload = QoSPayload(), + ) : Outgoing { + @Serializable + data class QoSPayload( + val ver: Int = 27, + val active: Boolean = true, + val reasons: List = listOf("foregrounded"), + ) + } // 11 @JvmInline