refactor: split up core into multiple modules

This commit is contained in:
Cilly Leang 2026-02-05 01:05:02 +11:00
parent 2725342c3f
commit 0d84411f14
Signed by: cilly
GPG key ID: 6500251E087653C9
38 changed files with 344 additions and 149 deletions

View file

@ -0,0 +1,65 @@
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
plugins {
alias(libs.plugins.androidLibrary)
alias(libs.plugins.kotlinMultiplatform)
alias(libs.plugins.kotlinSerialization)
}
kotlin {
jvm()
androidTarget {
compilerOptions {
jvmTarget.set(JvmTarget.JVM_11)
}
}
sourceSets {
commonMain.dependencies {
implementation(project(":api:shared"))
implementation(libs.kermit)
implementation(libs.ktor.client.core)
implementation(libs.ktor.client.content.negotiation)
implementation(libs.ktor.client.websockets)
implementation(libs.ktor.serialization.kotlinx.json)
}
commonTest.dependencies {
implementation(libs.kotlin.test)
}
jvmMain.dependencies {
implementation(libs.ktor.client.okhttp)
}
androidMain.dependencies {
implementation(libs.ktor.client.okhttp)
}
}
}
dependencies {
coreLibraryDesugaring(libs.desugar)
}
android {
namespace = "moe.lava.neon.api.gateway"
compileSdk = libs.versions.android.compileSdk.get().toInt()
defaultConfig {
minSdk = libs.versions.android.minSdk.get().toInt()
}
packaging {
resources {
excludes += "/META-INF/{AL2.0,LGPL2.1}"
}
}
buildTypes {
getByName("release") {
isMinifyEnabled = false
}
}
compileOptions {
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
isCoreLibraryDesugaringEnabled = true
}
}

View file

@ -0,0 +1,29 @@
package moe.lava.neon.api.gateway
@Suppress("unused")
object Capability {
const val LAZY_USER_NOTES = 1 shl 0
const val NO_AFFINE_USER_IDS = 1 shl 1
const val VERSIONED_READ_STATES = 1 shl 2
const val VERSIONED_USER_GUILD_SETTINGS = 1 shl 3
const val DEDUPE_USER_OBJECTS = 1 shl 4
const val PRIORITIZED_READY_PAYLOAD = 1 shl 5
const val MULTIPLE_GUILD_EXPERIMENT_POPULATIONS = 1 shl 6
const val NON_CHANNEL_READ_STATES = 1 shl 7
const val AUTH_TOKEN_REFRESH = 1 shl 8
const val USER_SETTINGS_PROTO = 1 shl 9
const val CLIENT_STATE_V2 = 1 shl 10
const val PASSIVE_GUILD_UPDATE = 1 shl 11 // off in rn 311.20
const val AUTO_CALL_CONNECT = 1 shl 12
const val DEBOUNCE_MESSAGE_REACTIONS = 1 shl 13
const val PASSIVE_GUILD_UPDATE_V2 = 1 shl 14
const val UNKNOWN_15 = 1 shl 15 // off in rn 311.20
const val AUTO_LOBBY_CONNECT = 1 shl 16 // off in rn 311.20
const val UNKNOWN_17 = 1 shl 17
const val UNKNOWN_18 = 1 shl 18 // off in rn 311.20
const val UNKNOWN_19 = 1 shl 19
const val UNKNOWN_20 = 1 shl 20
inline fun from(builder: Capability.() -> List<Int>): Int =
builder().reduce { a, b -> a + b }
}

View file

@ -0,0 +1,20 @@
package moe.lava.neon.api.gateway
import io.ktor.websocket.CloseReason
sealed interface GatewayCloseReason {
sealed interface ClientInitiated : GatewayCloseReason
sealed class ShouldReconnect(val resume: Boolean) : GatewayCloseReason
sealed class KeepDisconnected : GatewayCloseReason
data object MissedHeartbeat : ShouldReconnect(resume = true), ClientInitiated
data class SkippedSequence(val next: Int) : ShouldReconnect(resume = true), ClientInitiated
data class InvalidSession(val resumable: Boolean) : ShouldReconnect(resume = resumable), ClientInitiated
// TODO: handle non-resumable cases properly
data class ServerClosed(val closeCode: CloseReason) : ShouldReconnect(resume = true)
data object ServerReconnect : ShouldReconnect(resume = true), ClientInitiated
data object ClientPaused : KeepDisconnected(), ClientInitiated
data object Unknown : ShouldReconnect(resume = true)
}

View file

@ -0,0 +1,76 @@
package moe.lava.neon.api.gateway
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.serialization.ExperimentalSerializationApi
import moe.lava.neon.api.gateway.handlers.Handler
import kotlin.math.pow
import kotlin.reflect.KClass
import kotlin.time.Duration.Companion.seconds
typealias EventHandlers = Map<KClass<out Event.Dispatch>, MutableList<Handler<in Event.Dispatch>>>
class GatewayHandler {
private val eventHandlers: EventHandlers = mutableMapOf()
private val logger = Logger.withTag("neon.core.api.gateway/handler")
private val scope = CoroutineScope(Dispatchers.IO)
private var session: GatewaySession? = null
private var resumeProps: ResumeProperties? = null
private var retryAttempts: Int = 0
@OptIn(ExperimentalSerializationApi::class)
suspend fun connect(token: String) {
if (session != null) {
logger.w(Throwable()) { "Attempted to connect, but client already connected, ignoring..." }
return
}
session = GatewaySession.start(
token = token,
eventHandlers = eventHandlers,
resumeProps = resumeProps,
onSuccess = {
logger.d { "Successful session start" }
retryAttempts = 0
},
onDestroy = { reason, resumeProps ->
session = null
if (reason is GatewayCloseReason.KeepDisconnected) {
this.resumeProps = resumeProps
}
if (reason is GatewayCloseReason.ShouldReconnect) {
if (reason.resume) {
this.resumeProps = resumeProps
} else {
this.resumeProps = null
}
scope.launch {
var res: Result<Unit>
do {
val dur = 2.0.pow(retryAttempts).seconds
logger.d { "Reconnecting in ${dur.inWholeMilliseconds}ms" }
delay(dur)
retryAttempts += 1
res = runCatching { connect(token) }
res.exceptionOrNull()?.let {
logger.e(it) { "Reconnect failed" }
}
} while(res.isFailure)
}
}
}
)
}
suspend fun disconnect() {
val session = session
?: throw IllegalStateException("Tried disconnecting with no session")
session.close(GatewayCloseReason.ClientPaused)
}
}

View file

@ -0,0 +1,207 @@
package moe.lava.neon.api.gateway
import co.touchlab.kermit.Logger
import io.ktor.client.HttpClient
import io.ktor.client.plugins.cookies.HttpCookies
import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession
import io.ktor.client.plugins.websocket.WebSockets
import io.ktor.client.plugins.websocket.webSocketSession
import io.ktor.client.request.parameter
import io.ktor.http.userAgent
import io.ktor.websocket.Frame
import io.ktor.websocket.readText
import io.ktor.websocket.send
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import moe.lava.neon.api.ApiConstants
import moe.lava.neon.api.ApiConstants.json
import kotlin.random.Random
import kotlin.time.Duration.Companion.milliseconds
private val logger = Logger.withTag("neon.core.api.gateway/session")
internal class GatewaySession private constructor(
private var ws: DefaultClientWebSocketSession,
private val token: String,
private val handlers: EventHandlers,
private val scope: CoroutineScope,
private var resumeProps: ResumeProperties?,
private val onDestroy: (GatewayCloseReason, ResumeProperties?) -> Unit,
private val onSuccess: () -> Unit,
) {
private var lastSeq: Int? = resumeProps?.lastSequence
private var missedHeartbeats = 0
private var closeReason: GatewayCloseReason? = null
companion object {
suspend fun start(
token: String,
eventHandlers: EventHandlers,
client: HttpClient = HttpClient {
install(HttpCookies)
install(WebSockets)
},
scope: CoroutineScope = CoroutineScope(Dispatchers.IO),
resumeProps: ResumeProperties? = null,
onDestroy: (GatewayCloseReason, ResumeProperties?) -> Unit,
onSuccess: () -> Unit,
): GatewaySession {
val ws = client.webSocketSession(
resumeProps?.resumeGatewayUrl ?: "wss://gateway.discord.gg/"
) {
userAgent(ApiConstants.gatewayUserAgent)
url {
parameter("encoding", "json")
parameter("v", "9")
// parameter("compress", "zstd-stream")
}
}
return GatewaySession(ws, token, eventHandlers, scope, resumeProps, onDestroy, onSuccess)
}
}
init {
ws.incoming.consumeAsFlow()
.onCompletion { onClose(it) }
.onEach { frame ->
if (frame !is Frame.Text)
// if (frame !is Frame.Text && frame !is Frame.Binary)
return@onEach
logger.d { "Received payload ${frame.readText()}" }
val raw = json.decodeFromString<Payload.Unknown>(frame.readText())
val seq = this.lastSeq ?: 0
if (seq + 1 == raw.s) {
this.lastSeq = raw.s
} else if (raw.s != null) {
close(GatewayCloseReason.SkippedSequence(raw.s))
return@onEach
}
when (val payload = raw.asIncoming()) {
is Payload.Incoming<*> -> scope.launch { handlePayload(payload) }
is Payload.Unknown -> scope.launch { handleUnknownPayload(payload) }
}
}
.launchIn(scope)
}
private suspend fun handlePayload(payload: Payload.Incoming<*>) {
logger.d { payload.toString() }
val event = payload.d
when (event) {
is Event.Heartbeat -> handleHeartbeat()
is Event.Reconnect -> close(GatewayCloseReason.ServerReconnect)
is Event.InvalidSession -> close(GatewayCloseReason.InvalidSession(event.resumable))
is Event.Hello -> handleHello(event)
is Event.HeartbeatAck -> { missedHeartbeats -= 1 }
is Event.Ready -> {
resumeProps = ResumeProperties(
sessionId = event.sessionId,
resumeGatewayUrl = event.resumeGatewayUrl,
lastSequence = 0,
)
onSuccess()
}
is Event.Resumed -> onSuccess()
}
if (event is Event.Dispatch) {
val eventHandlers = handlers[event::class] ?: return
eventHandlers.forEach { it.handle(event) }
}
}
private suspend fun handleUnknownPayload(payload: Payload.Unknown) {
logger.w { "Unknown payload $payload" }
}
private suspend fun handleHeartbeat() {
logger.w { "Received heartbeat from server, possible connection issue" }
Event.QoSHeartbeat(lastSeq).pack().send()
missedHeartbeats += 1
}
private suspend fun handleHello(e: Event.Hello) {
val resumeProps = resumeProps
if (resumeProps != null) {
Event.Resume(
token = token,
sessionId = resumeProps.sessionId,
seq = resumeProps.lastSequence
).pack().send()
} else {
Event.Identify(token = token).pack().send()
}
val interval = e.heartbeatInterval.milliseconds
scope.launch {
delay(interval * Random.nextDouble())
while (true) {
if (missedHeartbeats >= 1) {
close(GatewayCloseReason.MissedHeartbeat)
break
}
Event.QoSHeartbeat(lastSeq).pack().send()
missedHeartbeats += 1
delay(interval)
}
}
}
fun close(reason: GatewayCloseReason.ClientInitiated?) {
val msg = when (reason) {
is GatewayCloseReason.MissedHeartbeat ->
"heartbeat missed"
is GatewayCloseReason.SkippedSequence ->
"payloads skipped one sequence (expected: $lastSeq, actual: ${reason.next})"
is GatewayCloseReason.InvalidSession ->
"invalid session (resumable: $reason)"
is GatewayCloseReason.ClientPaused ->
"client requested pause"
is GatewayCloseReason.ServerReconnect ->
"server requested reconnect"
null ->
"no reason"
}
closeReason = reason
logger.e { "Client-initiated close, cause: $msg" }
ws.cancel()
}
@OptIn(ExperimentalCoroutinesApi::class)
private fun onClose(error: Throwable? = null) {
logger.d(error) { "Websocket connection closed, cleaning up..." }
if (scope.isActive) scope.cancel()
if (resumeProps == null) {
logger.w { "No resume props stored" }
}
onDestroy(
closeReason
?: runCatching { ws.closeReason.getCompleted() }
.getOrNull()
?.let { GatewayCloseReason.ServerClosed(it) }
?: GatewayCloseReason.Unknown,
resumeProps?.copy(lastSequence = lastSeq ?: 0)
)
}
private suspend inline fun <reified T : Event.Outgoing> Payload.Outgoing<T>.send() {
logger.d { "Sending payload $this" }
logger.d { "Raw: ${json.encodeToString(this)}" }
ws.send(json.encodeToString(this))
}
}

View file

@ -0,0 +1,118 @@
package moe.lava.neon.api.gateway
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonElement
import moe.lava.neon.api.ApiConstants
import moe.lava.neon.api.objects.User
sealed interface Payload {
val op: Int
val d: Any?
sealed interface WithSequence : Payload {
val s: Int?
val t: String?
}
@Serializable
data class Incoming<T : Event.Incoming>(
override val op: Int,
override val d: T,
override val s: Int?,
override val t: String?,
) : WithSequence
@Serializable
data class Outgoing<T : Event.Outgoing>(
override val op: Int,
override val d: T,
) : Payload
@Serializable
data class Unknown(
override val op: Int,
override val d: JsonElement?,
override val s: Int?,
override val t: String?,
) : WithSequence
}
@Serializable
sealed interface Event {
sealed interface Incoming : Event
sealed interface Outgoing : Event
sealed class Dispatch : Incoming
// 1
@JvmInline
@Serializable
value class Heartbeat(val lastSequence: Int?) : Incoming, Outgoing
// 2
@Serializable
data class Identify(
val token: String,
val properties: ApiConstants.GatewayProperties = ApiConstants.GatewayProperties(),
val capabilities: Int = Capability.from { listOf(
LAZY_USER_NOTES,
NO_AFFINE_USER_IDS,
DEDUPE_USER_OBJECTS,
USER_SETTINGS_PROTO,
DEBOUNCE_MESSAGE_REACTIONS
) },
// TODO: Client state v2
// val clientState: ClientState,
) : Outgoing
// 6
@Serializable
data class Resume(
val token: String,
val sessionId: String,
val seq: Int,
) : Outgoing
// 7
@Serializable
data object Reconnect : Incoming
// 9
@JvmInline
@Serializable
value class InvalidSession(val resumable: Boolean) : Incoming
// 10
@Serializable
data class Hello(val heartbeatInterval: Int) : Incoming
// 11
@Serializable
data object HeartbeatAck : Incoming
// 40
@Serializable
data class QoSHeartbeat(
val seq: Int?,
val qos: QoSPayload = QoSPayload(),
) : Outgoing {
@Serializable
data class QoSPayload(
val ver: Int = 27,
val active: Boolean = true,
val reasons: List<String> = listOf("foregrounded"),
)
}
@Serializable
data class Ready(
val v: Int,
val user: User,
// val guilds: List<UnavailableGuild>,
val sessionId: String,
val resumeGatewayUrl: String,
// val application: Application,
) : Dispatch()
@Serializable
data object Resumed : Dispatch()
}

View file

@ -0,0 +1,7 @@
package moe.lava.neon.api.gateway
internal data class ResumeProperties(
val sessionId: String,
val resumeGatewayUrl: String,
val lastSequence: Int,
)

View file

@ -0,0 +1,47 @@
package moe.lava.neon.api.gateway
import kotlinx.serialization.json.JsonNull
import kotlinx.serialization.json.decodeFromJsonElement
import moe.lava.neon.api.ApiConstants.json
internal fun <T : Event.Outgoing> T.pack(): Payload.Outgoing<T> {
val opcode: Int = when (this) {
is Event.Heartbeat -> 1
is Event.Identify -> 2
is Event.Resume -> 6
is Event.QoSHeartbeat -> 40
}
return Payload.Outgoing(op = opcode, d = this)
}
internal fun Payload.Unknown.asIncoming() : Payload.WithSequence {
return when (op) {
0 -> when (t) {
"READY" -> decode<Event.Ready>()
"RESUMED" -> decode<Event.Resumed>()
else -> this
}
1 -> decode<Event.Heartbeat>()
7 -> decodeObject(Event.Reconnect)
9 -> decode<Event.InvalidSession>()
10 -> decode<Event.Hello>()
11 -> decodeObject(Event.HeartbeatAck)
else -> this
}
}
private inline fun <reified T : Event.Incoming> Payload.Unknown.decode(): Payload.Incoming<T> =
Payload.Incoming(
op = op,
d = json.decodeFromJsonElement<T>(d ?: JsonNull),
s = s,
t = t,
)
private inline fun <reified T : Event.Incoming> Payload.Unknown.decodeObject(obj: T): Payload.Incoming<T> =
Payload.Incoming(
op = op,
d = obj,
s = s,
t = t,
)

View file

@ -0,0 +1,7 @@
package moe.lava.neon.api.gateway.handlers
import moe.lava.neon.api.gateway.Event
sealed interface Handler<T: Event.Dispatch> {
suspend fun handle(event: T)
}