diff --git a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/GatewayCloseReason.kt b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/GatewayCloseReason.kt new file mode 100644 index 0000000..7232126 --- /dev/null +++ b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/GatewayCloseReason.kt @@ -0,0 +1,19 @@ +package moe.lava.neon.core.api.gateway + +import io.ktor.websocket.CloseReason + +sealed interface GatewayCloseReason { + sealed interface ClientInitiated : GatewayCloseReason + sealed class ShouldReconnect(val resume: Boolean) : GatewayCloseReason + sealed class KeepDisconnected : GatewayCloseReason + + data object MissedHeartbeat : ShouldReconnect(resume = true), ClientInitiated + data class SkippedSequence(val next: Int) : ShouldReconnect(resume = true), ClientInitiated + data class InvalidSession(val resumable: Boolean) : ShouldReconnect(resume = resumable), ClientInitiated + // TODO: handle non-resumable cases properly + data class ServerClosed(val closeCode: CloseReason) : ShouldReconnect(resume = true) + + data object ClientPaused : KeepDisconnected(), ClientInitiated + + data object Unknown : ShouldReconnect(resume = true) +} diff --git a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/GatewayHandler.kt b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/GatewayHandler.kt index 444885d..40dc00e 100644 --- a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/GatewayHandler.kt +++ b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/GatewayHandler.kt @@ -2,9 +2,15 @@ package moe.lava.neon.core.api.gateway import co.touchlab.kermit.Logger import dev.zacsweers.metro.Inject +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.serialization.ExperimentalSerializationApi import moe.lava.neon.core.di.EventHandlerGraph import moe.lava.neon.core.repository.AuthRepository +import kotlin.math.pow +import kotlin.time.Duration.Companion.seconds @Inject class GatewayHandler( @@ -12,7 +18,11 @@ class GatewayHandler( private val handlers: EventHandlerGraph, ) { private val logger = Logger.withTag("neon.core.api.gateway/handler") + private val scope = CoroutineScope(Dispatchers.IO) private var session: GatewaySession? = null + private var resumeProps: ResumeProperties? = null + + private var retryAttempts: Int = 0 @OptIn(ExperimentalSerializationApi::class) suspend fun connect() { @@ -26,13 +36,45 @@ class GatewayHandler( session = GatewaySession.start( token = token, eventHandlers = handlers, - onDestroy = { session = null } + resumeProps = resumeProps, + onSuccess = { + logger.d { "Successful session start" } + retryAttempts = 0 + }, + onDestroy = { reason, resumeProps -> + session = null + + if (reason is GatewayCloseReason.KeepDisconnected) { + this.resumeProps = resumeProps + } + + if (reason is GatewayCloseReason.ShouldReconnect) { + if (reason.resume) { + this.resumeProps = resumeProps + } else { + this.resumeProps = null + } + scope.launch { + var res: Result + do { + val dur = 2.0.pow(retryAttempts).seconds + logger.d { "Reconnecting in ${dur.inWholeMilliseconds}ms" } + delay(dur) + retryAttempts += 1 + res = runCatching { connect() } + res.exceptionOrNull()?.let { + logger.e(it) { "Reconnect failed" } + } + } while(res.isFailure) + } + } + } ) } suspend fun disconnect() { val session = session ?: throw IllegalStateException("Tried disconnecting with no session") - session.close() + session.close(GatewayCloseReason.ClientPaused) } } diff --git a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/GatewaySession.kt b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/GatewaySession.kt index 57ccbbd..1955ec2 100644 --- a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/GatewaySession.kt +++ b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/GatewaySession.kt @@ -9,11 +9,11 @@ import io.ktor.client.plugins.websocket.webSocketSession import io.ktor.client.request.parameter import io.ktor.http.userAgent import io.ktor.websocket.Frame -import io.ktor.websocket.close import io.ktor.websocket.readText import io.ktor.websocket.send import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.consumeAsFlow @@ -35,10 +35,13 @@ class GatewaySession private constructor( private val token: String, private val handlers: EventHandlerGraph, private val scope: CoroutineScope, - private val onDestroy: () -> Unit, + private var resumeProps: ResumeProperties?, + private val onDestroy: (GatewayCloseReason, ResumeProperties?) -> Unit, + private val onSuccess: () -> Unit, ) { - private var lastSeq: Int? = null + private var lastSeq: Int? = resumeProps?.lastSequence private var missedHeartbeats = 0 + private var closeReason: GatewayCloseReason? = null companion object { suspend fun start( @@ -49,9 +52,13 @@ class GatewaySession private constructor( install(WebSockets) }, scope: CoroutineScope = CoroutineScope(Dispatchers.IO), - onDestroy: () -> Unit, + resumeProps: ResumeProperties? = null, + onDestroy: (GatewayCloseReason, ResumeProperties?) -> Unit, + onSuccess: () -> Unit, ): GatewaySession { - val ws = client.webSocketSession("wss://gateway.discord.gg/") { + val ws = client.webSocketSession( + resumeProps?.resumeGatewayUrl ?: "wss://gateway.discord.gg/" + ) { userAgent(ApiConstants.gatewayUserAgent) url { parameter("encoding", "json") @@ -60,13 +67,13 @@ class GatewaySession private constructor( } } - return GatewaySession(ws, token, eventHandlers, scope, onDestroy) + return GatewaySession(ws, token, eventHandlers, scope, resumeProps, onDestroy, onSuccess) } } init { ws.incoming.consumeAsFlow() - .onCompletion { close(it) } + .onCompletion { onClose(it) } .onEach { frame -> if (frame !is Frame.Text) // if (frame !is Frame.Text && frame !is Frame.Binary) @@ -79,7 +86,7 @@ class GatewaySession private constructor( if (seq + 1 == raw.s) { this.lastSeq = raw.s } else if (raw.s != null) { - resume(ResumeReason.SkippedSequence(raw.s)) + close(GatewayCloseReason.SkippedSequence(raw.s)) return@onEach } @@ -95,8 +102,13 @@ class GatewaySession private constructor( logger.d { payload.toString() } when (val event = payload.d) { is Event.Hello -> handleHello(event) - is Event.Ready -> handlers.ready.handle(event) + is Event.Ready -> handlers.ready.handle(event) { + resumeProps = it + onSuccess() + } + is Event.Resumed -> onSuccess() is Event.Heartbeat -> handleHeartbeat() + is Event.InvalidSession -> close(GatewayCloseReason.InvalidSession(event.resumable)) is Event.HeartbeatAck -> { missedHeartbeats -= 1 } } } @@ -112,14 +124,23 @@ class GatewaySession private constructor( } private suspend fun handleHello(e: Event.Hello) { - Event.Identify(token = token).pack().send() + val resumeProps = resumeProps + if (resumeProps != null) { + Event.Resume( + token = token, + sessionId = resumeProps.sessionId, + seq = resumeProps.lastSequence + ).pack().send() + } else { + Event.Identify(token = token).pack().send() + } val interval = e.heartbeatInterval.milliseconds scope.launch { delay(interval * Random.nextDouble()) while (true) { if (missedHeartbeats >= 1) { - resume(ResumeReason.MissedHeartbeat) + close(GatewayCloseReason.MissedHeartbeat) break } Event.QoSHeartbeat(lastSeq).pack().send() @@ -129,34 +150,40 @@ class GatewaySession private constructor( } } - private sealed class ResumeReason { - data object MissedHeartbeat : ResumeReason() - data class SkippedSequence(val next: Int) : ResumeReason() - data class CloseCode(val code: Int) : ResumeReason() - } - - private suspend fun resume(reason: ResumeReason?) { + fun close(reason: GatewayCloseReason.ClientInitiated?) { val msg = when (reason) { - is ResumeReason.MissedHeartbeat -> + is GatewayCloseReason.MissedHeartbeat -> "heartbeat missed" - is ResumeReason.SkippedSequence -> + is GatewayCloseReason.SkippedSequence -> "payloads skipped one sequence (expected: $lastSeq, actual: ${reason.next})" - is ResumeReason.CloseCode -> - "closed with code ${reason.code}" + is GatewayCloseReason.InvalidSession -> + "invalid session (resumable: $reason)" + is GatewayCloseReason.ClientPaused -> + "client requested pause" null -> "no reason" } + closeReason = reason - logger.e { "Resuming, cause: $msg" } - // TODO + logger.e { "Client-initiated close, cause: $msg" } + ws.cancel() } - // TODO: handle resuming, etc.. - suspend fun close(error: Throwable? = null) { + @OptIn(ExperimentalCoroutinesApi::class) + private fun onClose(error: Throwable? = null) { logger.d(error) { "Websocket connection closed, cleaning up..." } - ws.close() if (scope.isActive) scope.cancel() - onDestroy() + if (resumeProps == null) { + logger.w { "No resume props stored" } + } + onDestroy( + closeReason + ?: runCatching { ws.closeReason.getCompleted() } + .getOrNull() + ?.let { GatewayCloseReason.ServerClosed(it) } + ?: GatewayCloseReason.Unknown, + resumeProps?.copy(lastSequence = lastSeq ?: 0) + ) } private suspend inline fun Payload.Outgoing.send() { diff --git a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/Payloads.kt b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/Payloads.kt index a92ea13..f7d83dc 100644 --- a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/Payloads.kt +++ b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/Payloads.kt @@ -1,6 +1,5 @@ package moe.lava.neon.core.api.gateway -import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.Serializable import kotlinx.serialization.json.JsonElement import moe.lava.neon.core.api.ApiConstants @@ -49,32 +48,8 @@ sealed interface Event { @Serializable value class Heartbeat(val lastSequence: Int?) : Incoming, Outgoing - // 40 - @Serializable - data class QoSHeartbeat( - val seq: Int?, - val qos: QoSPayload = QoSPayload(), - ) : Outgoing { - @Serializable - data class QoSPayload( - val ver: Int = 27, - val active: Boolean = true, - val reasons: List = listOf("foregrounded"), - ) - } - - // 11 - @JvmInline - @Serializable - value class HeartbeatAck(private val nothing: Nothing?) : Incoming - - // 10 - @Serializable - data class Hello(val heartbeatInterval: Int) : Incoming - // 2 @Serializable - @OptIn(ExperimentalSerializationApi::class) data class Identify( val token: String, val properties: ApiConstants.GatewayProperties = ApiConstants.GatewayProperties(), @@ -89,6 +64,42 @@ sealed interface Event { // val clientState: ClientState, ) : Outgoing + // 6 + @Serializable + data class Resume( + val token: String, + val sessionId: String, + val seq: Int, + ) : Outgoing + + // 9 + @JvmInline + @Serializable + value class InvalidSession(val resumable: Boolean) : Incoming + + // 10 + @Serializable + data class Hello(val heartbeatInterval: Int) : Incoming + + // 11 + @JvmInline + @Serializable + value class HeartbeatAck(private val nothing: Nothing?) : Incoming + + // 40 + @Serializable + data class QoSHeartbeat( + val seq: Int?, + val qos: QoSPayload = QoSPayload(), + ) : Outgoing { + @Serializable + data class QoSPayload( + val ver: Int = 27, + val active: Boolean = true, + val reasons: List = listOf("foregrounded"), + ) + } + @Serializable data class Ready( val v: Int, @@ -98,4 +109,7 @@ sealed interface Event { val resumeGatewayUrl: String, // val application: Application, ) : Dispatch() + + @Serializable + data object Resumed : Dispatch() } diff --git a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/ResumeProperties.kt b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/ResumeProperties.kt new file mode 100644 index 0000000..092497d --- /dev/null +++ b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/ResumeProperties.kt @@ -0,0 +1,7 @@ +package moe.lava.neon.core.api.gateway + +data class ResumeProperties( + val sessionId: String, + val resumeGatewayUrl: String, + val lastSequence: Int, +) diff --git a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/SerializingExtensions.kt b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/SerializingExtensions.kt index 80cfb8e..12b98cb 100644 --- a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/SerializingExtensions.kt +++ b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/SerializingExtensions.kt @@ -11,7 +11,7 @@ fun T.pack(): Payload.Outgoing { val opcode: Int = when (this) { is Event.Heartbeat -> 1 is Event.Identify -> 2 - is Event.HeartbeatAck -> 11 + is Event.Resume -> 6 is Event.QoSHeartbeat -> 40 } return Payload.Outgoing(op = opcode, d = this) @@ -21,9 +21,11 @@ fun Payload.Unknown.asIncoming() : Payload.WithSequence { return when (op) { 0 -> when (t) { "READY" -> decode() + "RESUMED" -> decode() else -> this } 1 -> decode() + 9 -> decode() 10 -> decode() 11 -> decode() else -> this diff --git a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/handlers/Handler.kt b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/handlers/Handler.kt index 7fe148f..5f6a6ac 100644 --- a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/handlers/Handler.kt +++ b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/handlers/Handler.kt @@ -2,6 +2,4 @@ package moe.lava.neon.core.api.gateway.handlers import moe.lava.neon.core.api.gateway.Event -sealed interface Handler { - fun handle(event: T) -} +sealed interface Handler diff --git a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/handlers/ReadyHandler.kt b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/handlers/ReadyHandler.kt index b44ee0b..f9b5afc 100644 --- a/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/handlers/ReadyHandler.kt +++ b/core/src/commonMain/kotlin/moe/lava/neon/core/api/gateway/handlers/ReadyHandler.kt @@ -3,12 +3,18 @@ package moe.lava.neon.core.api.gateway.handlers import co.touchlab.kermit.Logger import dev.zacsweers.metro.Inject import moe.lava.neon.core.api.gateway.Event +import moe.lava.neon.core.api.gateway.ResumeProperties private val logger = Logger.withTag("neon.core.api.events/ready") @Inject class ReadyHandler : Handler { - override fun handle(event: Event.Ready) { + fun handle(event: Event.Ready, updateResumeProps: (ResumeProperties) -> Unit) { logger.i { "Received payload $event" } + updateResumeProps(ResumeProperties( + sessionId = event.sessionId, + resumeGatewayUrl = event.resumeGatewayUrl, + lastSequence = 0, + )) } } diff --git a/ui/src/commonMain/kotlin/moe/lava/neon/ui/screens/Sample.kt b/ui/src/commonMain/kotlin/moe/lava/neon/ui/screens/Sample.kt index 24db6c8..ba030f1 100644 --- a/ui/src/commonMain/kotlin/moe/lava/neon/ui/screens/Sample.kt +++ b/ui/src/commonMain/kotlin/moe/lava/neon/ui/screens/Sample.kt @@ -19,6 +19,7 @@ import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier import androidx.lifecycle.ViewModel import androidx.lifecycle.viewModelScope +import co.touchlab.kermit.Logger import dev.zacsweers.metro.AppScope import dev.zacsweers.metro.ContributesIntoMap import dev.zacsweers.metro.Inject @@ -83,16 +84,25 @@ class SampleViewModel( private val auth: AuthRepository, private val gateway: GatewayHandler, ) : ViewModel() { + private val logger = Logger.withTag("neon.ui.screens/Sample") val token get() = auth.token fun connect() { viewModelScope.launch { - gateway.connect() + try { + gateway.connect() + } catch(e: Throwable) { + logger.e(e) { "Failed to connect to gateway: ${e.stackTraceToString()}" } + } } } fun disconnect() { viewModelScope.launch { - gateway.disconnect() + try { + gateway.disconnect() + } catch(e: Throwable) { + logger.e(e) { "Failed to connect to gateway: ${e.stackTraceToString()}" } + } } } fun logout() {