refactor(api/gateway): use flows for dispatch handling

This commit is contained in:
Cilly Leang 2026-02-22 18:10:55 +11:00
parent 053b24a614
commit fcdd237809
Signed by: cilly
GPG key ID: 6500251E087653C9
2 changed files with 10 additions and 9 deletions

View file

@ -4,6 +4,8 @@ import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.ExperimentalSerializationApi
import moe.lava.neon.api.gateway.handlers.Handler import moe.lava.neon.api.gateway.handlers.Handler
@ -11,10 +13,10 @@ import kotlin.math.pow
import kotlin.reflect.KClass import kotlin.reflect.KClass
import kotlin.time.Duration.Companion.seconds import kotlin.time.Duration.Companion.seconds
typealias EventHandlers = Map<KClass<out Event.Dispatch>, MutableList<Handler<in Event.Dispatch>>>
class GatewayHandler { class GatewayHandler {
private val eventHandlers: EventHandlers = mutableMapOf() private val mEvents = MutableSharedFlow<Event.Dispatch>()
val events = mEvents.asSharedFlow()
private val logger = Logger.withTag("neon.core.api.gateway/handler") private val logger = Logger.withTag("neon.core.api.gateway/handler")
private val scope = CoroutineScope(Dispatchers.IO) private val scope = CoroutineScope(Dispatchers.IO)
private var session: GatewaySession? = null private var session: GatewaySession? = null
@ -31,12 +33,12 @@ class GatewayHandler {
session = GatewaySession.start( session = GatewaySession.start(
token = token, token = token,
eventHandlers = eventHandlers,
resumeProps = resumeProps, resumeProps = resumeProps,
onSuccess = { onSuccess = {
logger.d { "Successful session start" } logger.d { "Successful session start" }
retryAttempts = 0 retryAttempts = 0
}, },
onDispatch = { scope.launch { mEvents.emit(it) } },
onDestroy = { reason, resumeProps -> onDestroy = { reason, resumeProps ->
session = null session = null

View file

@ -32,9 +32,9 @@ private val logger = Logger.withTag("neon.core.api.gateway/session")
internal class GatewaySession private constructor( internal class GatewaySession private constructor(
private var ws: DefaultClientWebSocketSession, private var ws: DefaultClientWebSocketSession,
private val token: String, private val token: String,
private val handlers: EventHandlers,
private val scope: CoroutineScope, private val scope: CoroutineScope,
private var resumeProps: ResumeProperties?, private var resumeProps: ResumeProperties?,
private val onDispatch: (Event.Dispatch) -> Unit,
private val onDestroy: (GatewayCloseReason, ResumeProperties?) -> Unit, private val onDestroy: (GatewayCloseReason, ResumeProperties?) -> Unit,
private val onSuccess: () -> Unit, private val onSuccess: () -> Unit,
) { ) {
@ -45,13 +45,13 @@ internal class GatewaySession private constructor(
companion object { companion object {
suspend fun start( suspend fun start(
token: String, token: String,
eventHandlers: EventHandlers,
client: HttpClient = HttpClient { client: HttpClient = HttpClient {
install(HttpCookies) install(HttpCookies)
install(WebSockets) install(WebSockets)
}, },
scope: CoroutineScope = CoroutineScope(Dispatchers.IO), scope: CoroutineScope = CoroutineScope(Dispatchers.IO),
resumeProps: ResumeProperties? = null, resumeProps: ResumeProperties? = null,
onDispatch: (Event.Dispatch) -> Unit,
onDestroy: (GatewayCloseReason, ResumeProperties?) -> Unit, onDestroy: (GatewayCloseReason, ResumeProperties?) -> Unit,
onSuccess: () -> Unit, onSuccess: () -> Unit,
): GatewaySession { ): GatewaySession {
@ -66,7 +66,7 @@ internal class GatewaySession private constructor(
} }
} }
return GatewaySession(ws, token, eventHandlers, scope, resumeProps, onDestroy, onSuccess) return GatewaySession(ws, token, scope, resumeProps, onDispatch, onDestroy, onSuccess)
} }
} }
@ -118,8 +118,7 @@ internal class GatewaySession private constructor(
is Event.Resumed -> onSuccess() is Event.Resumed -> onSuccess()
} }
if (event is Event.Dispatch) { if (event is Event.Dispatch) {
val eventHandlers = handlers[event::class] ?: return onDispatch(event)
eventHandlers.forEach { it.handle(event) }
} }
} }