feat(core): heartbeat and basic sequence handling
This commit is contained in:
parent
cd50f75c10
commit
3ff9b7d676
3 changed files with 98 additions and 18 deletions
|
|
@ -7,8 +7,8 @@ import kotlinx.serialization.json.int
|
|||
import kotlinx.serialization.json.jsonObject
|
||||
import kotlinx.serialization.json.jsonPrimitive
|
||||
|
||||
object EventPolySerializer : JsonContentPolymorphicSerializer<Event>(Event::class) {
|
||||
override fun selectDeserializer(element: JsonElement): DeserializationStrategy<Event> {
|
||||
object EventPolySerializer : JsonContentPolymorphicSerializer<Event.WithSequence>(Event.WithSequence::class) {
|
||||
override fun selectDeserializer(element: JsonElement): DeserializationStrategy<Event.WithSequence> {
|
||||
val op = element.jsonObject["op"]!!.jsonPrimitive.int
|
||||
if (op == 0) {
|
||||
val name = element.jsonObject["t"]?.jsonPrimitive?.content
|
||||
|
|
@ -18,7 +18,9 @@ object EventPolySerializer : JsonContentPolymorphicSerializer<Event>(Event::clas
|
|||
}
|
||||
}
|
||||
return when (op) {
|
||||
1 -> Event.Incoming.serializer(Payload.Heartbeat.serializer())
|
||||
10 -> Event.Incoming.serializer(Payload.Hello.serializer())
|
||||
11 -> Event.Incoming.serializer(Payload.HeartbeatAck.serializer())
|
||||
else -> Event.Unknown.serializer()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import io.ktor.websocket.readText
|
|||
import io.ktor.websocket.send
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.consumeAsFlow
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onCompletion
|
||||
|
|
@ -24,6 +25,9 @@ import kotlinx.serialization.ExperimentalSerializationApi
|
|||
import moe.lava.neon.core.api.ApiConstants
|
||||
import moe.lava.neon.core.di.GatewayHandlerGraph
|
||||
import moe.lava.neon.core.repository.AuthRepository
|
||||
import kotlin.random.Random
|
||||
import kotlin.time.Duration
|
||||
import kotlin.time.Duration.Companion.milliseconds
|
||||
|
||||
@Inject
|
||||
class Gateway(
|
||||
|
|
@ -36,6 +40,8 @@ class Gateway(
|
|||
|
||||
private val json = ApiConstants.json
|
||||
|
||||
private var seq: Int? = null
|
||||
|
||||
@OptIn(ExperimentalSerializationApi::class)
|
||||
suspend fun connect() {
|
||||
if (ws != null) {
|
||||
|
|
@ -67,10 +73,18 @@ class Gateway(
|
|||
|
||||
logger.d { "Received event ${frame.readText()}" }
|
||||
|
||||
when (val msg = json.decodeFromString(EventPolySerializer, frame.readText())) {
|
||||
val msg = json.decodeFromString(EventPolySerializer, frame.readText())
|
||||
val seq = this.seq ?: 0
|
||||
if (seq + 1 == msg.s) {
|
||||
this.seq = msg.s
|
||||
} else if (msg.s != null) {
|
||||
resume(ResumeReason.SkippedSequence(msg.s!!))
|
||||
return@onEach
|
||||
}
|
||||
|
||||
when (msg) {
|
||||
is Event.Incoming<*> -> scope.launch { handleEvent(msg) }
|
||||
is Event.Unknown -> logger.w { "Unknown event $msg" }
|
||||
is Event.Outgoing<*> -> throw UnsupportedOperationException("Tried to decode outgoing message")
|
||||
is Event.Unknown -> scope.launch { handleUnknownEvent(msg) }
|
||||
}
|
||||
}
|
||||
.launchIn(scope)
|
||||
|
|
@ -82,19 +96,47 @@ class Gateway(
|
|||
is Payload.Hello -> handleHello(payload)
|
||||
is Payload.Ready -> handlers.ready.handle(payload)
|
||||
is Payload.Heartbeat -> {}
|
||||
is Payload.HeartbeatAck -> {}
|
||||
is Payload.HeartbeatAck -> { missedBeats -= 1 }
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun handleUnknownEvent(e: Event.Unknown) {
|
||||
logger.w { "Unknown event $e" }
|
||||
}
|
||||
|
||||
suspend fun handleHello(e: Payload.Hello) {
|
||||
val token = auth.token
|
||||
?: throw IllegalStateException("Token missing between connection and hello, cannot send Identify")
|
||||
|
||||
Payload.Identify(token = token).pack().send()
|
||||
|
||||
val interval = e.heartbeatInterval.milliseconds
|
||||
scope.launch {
|
||||
startHeartbeat(interval)
|
||||
}
|
||||
}
|
||||
|
||||
private var missedBeats = 0
|
||||
private suspend fun startHeartbeat(interval: Duration) {
|
||||
val ws = this.ws
|
||||
?: throw IllegalStateException("Ws missing whilst starting heartbeat")
|
||||
|
||||
delay(interval * Random.nextDouble())
|
||||
while (this@Gateway.ws == ws) {
|
||||
if (missedBeats >= 1) {
|
||||
resume(ResumeReason.MissedHeartbeat)
|
||||
break
|
||||
}
|
||||
Payload.QoSHeartbeat(this@Gateway.seq).pack().send()
|
||||
missedBeats += 1
|
||||
delay(interval)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: handle resuming, etc..
|
||||
suspend fun cleanup(error: Throwable? = null) {
|
||||
logger.d(error) { "Websocket connection closed, cleaning up..." }
|
||||
this.ws = null
|
||||
}
|
||||
|
||||
suspend fun disconnect() {
|
||||
|
|
@ -107,6 +149,28 @@ class Gateway(
|
|||
ws.close()
|
||||
}
|
||||
|
||||
private sealed class ResumeReason {
|
||||
data object MissedHeartbeat : ResumeReason()
|
||||
data class SkippedSequence(val next: Int) : ResumeReason()
|
||||
data class CloseCode(val code: Int) : ResumeReason()
|
||||
}
|
||||
|
||||
private suspend fun resume(reason: ResumeReason?) {
|
||||
val msg = when (reason) {
|
||||
is ResumeReason.MissedHeartbeat ->
|
||||
"heartbeat missed"
|
||||
is ResumeReason.SkippedSequence ->
|
||||
"events skipped one sequence (expected: $seq, actual: ${reason.next})"
|
||||
is ResumeReason.CloseCode ->
|
||||
"closed with code ${reason.code}"
|
||||
null ->
|
||||
"no reason"
|
||||
}
|
||||
|
||||
logger.e { "Resuming, cause: $msg" }
|
||||
// TODO
|
||||
}
|
||||
|
||||
private suspend inline fun <reified T : Payload.Outgoing> Event.Outgoing<T>.send() {
|
||||
val ws = ws
|
||||
?: throw IllegalStateException("Tried to send with no connection")
|
||||
|
|
|
|||
|
|
@ -6,31 +6,36 @@ import kotlinx.serialization.json.JsonElement
|
|||
import moe.lava.neon.core.api.ApiConstants
|
||||
import moe.lava.neon.core.api.structures.User
|
||||
|
||||
sealed class Event {
|
||||
abstract val op: Int
|
||||
abstract val d: Any?
|
||||
sealed interface Event {
|
||||
val op: Int
|
||||
val d: Any?
|
||||
|
||||
sealed interface WithSequence : Event {
|
||||
val s: Int?
|
||||
val t: String?
|
||||
}
|
||||
|
||||
@Serializable
|
||||
data class Incoming<T : Payload.Incoming>(
|
||||
override val op: Int,
|
||||
override val d: T,
|
||||
val s: Int?,
|
||||
val t: String?,
|
||||
) : Event()
|
||||
override val s: Int?,
|
||||
override val t: String?,
|
||||
) : WithSequence
|
||||
|
||||
@Serializable
|
||||
data class Outgoing<T : Payload.Outgoing>(
|
||||
override val op: Int,
|
||||
override val d: T,
|
||||
) : Event()
|
||||
) : Event
|
||||
|
||||
@Serializable
|
||||
data class Unknown(
|
||||
override val op: Int,
|
||||
override val d: JsonElement?,
|
||||
val s: Int?,
|
||||
val t: String?,
|
||||
) : Event()
|
||||
override val s: Int?,
|
||||
override val t: String?,
|
||||
) : WithSequence
|
||||
}
|
||||
|
||||
@Serializable
|
||||
|
|
@ -45,9 +50,18 @@ sealed interface Payload {
|
|||
value class Heartbeat(val lastSequence: Int?) : Incoming, Outgoing
|
||||
|
||||
// 40
|
||||
@JvmInline
|
||||
@Serializable
|
||||
value class QoSHeartbeat(val lastSequence: Int?) : Outgoing
|
||||
data class QoSHeartbeat(
|
||||
val seq: Int?,
|
||||
val qos: QoSPayload = QoSPayload(),
|
||||
) : Outgoing {
|
||||
@Serializable
|
||||
data class QoSPayload(
|
||||
val ver: Int = 27,
|
||||
val active: Boolean = true,
|
||||
val reasons: List<String> = listOf("foregrounded"),
|
||||
)
|
||||
}
|
||||
|
||||
// 11
|
||||
@JvmInline
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue