From 8c0bff3bc40af36347536fc1fbe23a0f5c2fbe98 Mon Sep 17 00:00:00 2001 From: LavaDesu Date: Sat, 2 Aug 2025 01:35:59 +1000 Subject: [PATCH] fix: handle more network errors, and large refactors --- .../moe/lava/banksia/ui/BanksiaViewModel.kt | 26 +- .../moe/lava/banksia/Logging.android.kt | 5 + .../kotlin/moe/lava/banksia/Logging.kt | 2 + .../moe/lava/banksia/api/ptv/PtvService.kt | 300 +++++++++--------- .../kotlin/moe/lava/banksia/util/CacheMap.kt | 48 +++ .../kotlin/moe/lava/banksia/util/LoopFlow.kt | 65 ++++ .../kotlin/moe/lava/banksia/Logging.ios.kt | 4 + .../kotlin/moe/lava/banksia/Logging.jvm.kt | 5 + 8 files changed, 286 insertions(+), 169 deletions(-) create mode 100644 shared/src/commonMain/kotlin/moe/lava/banksia/util/CacheMap.kt create mode 100644 shared/src/commonMain/kotlin/moe/lava/banksia/util/LoopFlow.kt diff --git a/composeApp/src/commonMain/kotlin/moe/lava/banksia/ui/BanksiaViewModel.kt b/composeApp/src/commonMain/kotlin/moe/lava/banksia/ui/BanksiaViewModel.kt index 3fe327c..fa1a8bb 100644 --- a/composeApp/src/commonMain/kotlin/moe/lava/banksia/ui/BanksiaViewModel.kt +++ b/composeApp/src/commonMain/kotlin/moe/lava/banksia/ui/BanksiaViewModel.kt @@ -12,7 +12,6 @@ import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.flow.update -import kotlinx.coroutines.flow.updateAndGet import kotlinx.coroutines.launch import kotlinx.datetime.Clock import kotlinx.datetime.Instant @@ -31,6 +30,7 @@ import moe.lava.banksia.ui.state.MapState import moe.lava.banksia.ui.state.SearchState import moe.lava.banksia.util.BoxedValue import moe.lava.banksia.util.BoxedValue.Companion.box +import moe.lava.banksia.util.LoopFlow.Companion.waitUntilSubscribed sealed class BanksiaEvent { data object DismissState : BanksiaEvent() @@ -72,7 +72,7 @@ class BanksiaViewModel : ViewModel() { private val iSearchState = MutableStateFlow(SearchState()) val searchState = iSearchState.asStateFlow() - private val ptvService = PtvService() + private val ptvService = PtvService(viewModelScope) private var locationTrackerJob: Job? = null private var lastKnownLocation: Point? = null @@ -84,9 +84,9 @@ class BanksiaViewModel : ViewModel() { viewModelScope.launch { when (event) { is BanksiaEvent.DismissState -> dismissState() - is BanksiaEvent.SelectRoute -> switchRoute(event.id) - is BanksiaEvent.SelectRun -> switchRun(event.ref) - is BanksiaEvent.SelectStop -> switchStop(event.typeAndId) + is BanksiaEvent.SelectRoute -> state = InternalState(route = event.id) + is BanksiaEvent.SelectRun -> state = state.copy(run = event.ref) + is BanksiaEvent.SelectStop -> state = state.copy(stop = event.typeAndId) is BanksiaEvent.SearchUpdate -> searchUpdate(event.text) } } @@ -167,13 +167,14 @@ class BanksiaViewModel : ViewModel() { return } - var lastState = iInfoState.value + val lastState = state.run var routeName: String? = null ptvService.runFlow(ref, firstWithCache = true) - .takeWhile { lastState == iInfoState.value } + .waitUntilSubscribed(iInfoState) + .takeWhile { lastState == state.run } .onEach { run -> if (routeName == null) { - lastState = iInfoState.updateAndGet { + iInfoState.update { InfoPanelState.Run( direction = run.destinationName, type = run.routeType, @@ -182,7 +183,7 @@ class BanksiaViewModel : ViewModel() { routeName = ptvService.route(run.routeId).routeName } - lastState = iInfoState.updateAndGet { + iInfoState.update { InfoPanelState.Run( direction = run.destinationName, type = run.routeType, @@ -220,7 +221,7 @@ class BanksiaViewModel : ViewModel() { val timetable = HashMap, Pair>>() res.departures.forEach { dep -> val key = Pair(dep.directionId, dep.routeId) - val direction = ptvService.cache.direction(dep.directionId, dep.routeId) ?: return@forEach + val direction = ptvService.direction(dep.directionId, dep.routeId) val route = res.routes[dep.routeId.toString()] val prefix = route?.let { if (it.routeNumber == "") "" else "${it.routeNumber} - " } ?: "" val element = timetable.getOrPut(key) { Pair(prefix + direction.directionName, mutableListOf()) }.second @@ -282,12 +283,11 @@ class BanksiaViewModel : ViewModel() { newCameraPosition?.let { iCameraChangeEmitter.emit(it.box()) } } - var runsRouteKey: Int? = null private fun buildRuns(route: PtvRoute) { - runsRouteKey = route.routeId ptvService .runsFlow(route.routeId) - .takeWhile { route.routeId == runsRouteKey } + .waitUntilSubscribed(iInfoState) + .takeWhile { state.route == route.routeId } .onEach { runs -> val markers = runs .filter { it.vehiclePosition != null } diff --git a/shared/src/androidMain/kotlin/moe/lava/banksia/Logging.android.kt b/shared/src/androidMain/kotlin/moe/lava/banksia/Logging.android.kt index eb781ec..dd6b2d9 100644 --- a/shared/src/androidMain/kotlin/moe/lava/banksia/Logging.android.kt +++ b/shared/src/androidMain/kotlin/moe/lava/banksia/Logging.android.kt @@ -5,3 +5,8 @@ import android.util.Log actual fun log(tag: String, msg: String) { Log.i(tag, msg) } + +actual fun error(tag: String, msg: String, throwable: Throwable?) { + Log.e(tag, msg) + throwable?.let { Log.e(tag, it.stackTraceToString()) } +} diff --git a/shared/src/commonMain/kotlin/moe/lava/banksia/Logging.kt b/shared/src/commonMain/kotlin/moe/lava/banksia/Logging.kt index 08e28c9..16f42d6 100644 --- a/shared/src/commonMain/kotlin/moe/lava/banksia/Logging.kt +++ b/shared/src/commonMain/kotlin/moe/lava/banksia/Logging.kt @@ -1,3 +1,5 @@ package moe.lava.banksia expect fun log(tag: String, msg: String) +fun error(tag: String, throwable: Throwable) = error(tag, "", throwable) +expect fun error(tag: String, msg: String, throwable: Throwable? = null) diff --git a/shared/src/commonMain/kotlin/moe/lava/banksia/api/ptv/PtvService.kt b/shared/src/commonMain/kotlin/moe/lava/banksia/api/ptv/PtvService.kt index 467feae..e3771d8 100644 --- a/shared/src/commonMain/kotlin/moe/lava/banksia/api/ptv/PtvService.kt +++ b/shared/src/commonMain/kotlin/moe/lava/banksia/api/ptv/PtvService.kt @@ -6,13 +6,15 @@ import io.ktor.client.plugins.HttpSend import io.ktor.client.plugins.contentnegotiation.ContentNegotiation import io.ktor.client.plugins.defaultRequest import io.ktor.client.plugins.plugin +import io.ktor.client.request.HttpRequestBuilder import io.ktor.client.request.get import io.ktor.client.request.parameter +import io.ktor.client.request.url +import io.ktor.client.statement.HttpResponse import io.ktor.http.appendPathSegments import io.ktor.serialization.kotlinx.json.json +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.flow import kotlinx.serialization.Serializable import kotlinx.serialization.json.Json import moe.lava.banksia.Constants @@ -22,7 +24,11 @@ import moe.lava.banksia.api.ptv.structures.PtvRoute import moe.lava.banksia.api.ptv.structures.PtvRouteType import moe.lava.banksia.api.ptv.structures.PtvRun import moe.lava.banksia.api.ptv.structures.PtvStop +import moe.lava.banksia.error import moe.lava.banksia.log +import moe.lava.banksia.util.CacheMap +import moe.lava.banksia.util.LoopFlow.Companion.initWith +import moe.lava.banksia.util.loopFlow import okio.ByteString.Companion.encodeUtf8 import kotlin.random.Random @@ -47,52 +53,22 @@ object Responses { data class PtvDirectionsResponse(val directions: List) } -class PtvService { +suspend inline fun MutableMap.getOrPutSuspend(key: K, defaultValue: suspend () -> V): V { + if (!containsKey(key)) + this[key] = defaultValue() + return this[key]!! +} + +class PtvService(coroutineScope: CoroutineScope) { class PtvCache( - private val service: PtvService, - private val directions: HashMap, PtvDirection> = HashMap(), - private val routes: HashMap = HashMap(), - private val runs: HashMap = HashMap(), - private val stops: HashMap = HashMap(), - ) { - suspend fun direction(directionID: Int, routeID: Int): PtvDirection? { - val ret = directions[Pair(directionID, routeID)] - if (ret == null) { - val res = service.directionsByRoute(routeID) - for (dir in res) - directions[Pair(dir.directionId, dir.routeId)] = dir - } + coroutineScope: CoroutineScope, + val directions: CacheMap, PtvDirection> = CacheMap(coroutineScope), + val routes: CacheMap = CacheMap(coroutineScope), + val runs: CacheMap = CacheMap(coroutineScope), + val stops: CacheMap = CacheMap(coroutineScope), + ) - return ret ?: directions[Pair(directionID, routeID)] - } - - fun setRoutes(routes: Iterable) { - routes.forEach { - this.routes[it.routeId] = it - } - } - - fun getRoute(routeId: Int) = routes[routeId] - fun getRoutes() = routes.values.toList() - - fun addStops(stops: Iterable) { - stops.forEach { - this.stops[it.stopId] = it - } - } - - fun getStop(stopId: Int) = stops[stopId] - - fun addRuns(runs: Iterable) { - runs.forEach { - this.runs[it.runRef] = it - } - } - - fun getRun(runRef: String) = runs[runRef] - } - - val cache = PtvCache(this) + val cache = PtvCache(coroutineScope) private val client = HttpClient() { install(ContentNegotiation) { @@ -105,7 +81,7 @@ class PtvService { } } - constructor() { + init { client.plugin(HttpSend).intercept { req -> req.parameter("devid", Constants.devid) @OptIn(ExperimentalStdlibApi::class) @@ -118,137 +94,149 @@ class PtvService { } } + suspend fun HttpClient.safeGet( + urlString: String? = null, + retries: Int = 1, + block: (HttpRequestBuilder.() -> Unit)? = null + ): HttpResponse = + runCatching { + get { + urlString?.let { url(it) } + block?.invoke(this) + } + }.getOrElse { e -> + error("PtvService", "Fetch error occurred (attempt $retries / 3), retrying in 5000ms...", e) + if (retries >= 3) + throw e + delay(5000) + safeGet(urlString, retries + 1, block) + } + suspend fun route(id: Int, includeGeopath: Boolean = false): PtvRoute { - val cached = cache.getRoute(id) + val cached = cache.routes[id] // TODO: im braindead so clean this up later if (cached != null && (!includeGeopath || (includeGeopath && cached.geopath.isNotEmpty()))) return cached - val response: Responses.PtvRouteResponse = client.get("routes") { - url { - appendPathSegments(id.toString()) - parameters.append("include_geopath", if (includeGeopath) "true" else "false") + return client + .safeGet("routes") { + url { + appendPathSegments(id.toString()) + parameters.append("include_geopath", if (includeGeopath) "true" else "false") + } } - }.body() - cache.setRoutes(listOf(response.route)) - return response.route + .body() + .route + .also { cache.routes[it.routeId] = it } } suspend fun routes(): List { - val cached = cache.getRoutes() - if (cached.isNotEmpty()) - return cached - - val response: Responses.PtvRoutesResponse = client.get("routes").body() - cache.setRoutes(response.routes) - return response.routes + val cached = cache.routes + if (cached.isEmpty()) { + client + .safeGet("routes") + .body() + .routes + .forEach { route -> + cached[route.routeId] = route + } + } + return cached.values.toList() } - fun runFlow(ref: String, firstWithCache: Boolean = false, intervalMillis: Long = 5000): Flow = flow { - val cached = cache.getRun(ref) - if (firstWithCache && cached != null) - emit(cached) + fun runFlow(ref: String, firstWithCache: Boolean = false) = + loopFlow { + client + .safeGet { + url { + appendPathSegments("runs", ref) + } + } + .body() + .runs + .also { it.forEach { run -> cache.runs[run.runRef] = run } } + .let { emit(it[0]) } + }.initWith { + cache.runs[ref]?.let { + if (firstWithCache) + emit(it) + } + } - while (true) { - val response: Responses.PtvRunsResponse = client.get { + fun runsFlow(routeId: Int) = + loopFlow { + client + .safeGet { + url { + appendPathSegments("runs", "route", routeId.toString()) + parameter("expand", "VehiclePosition") + } + } + .body() + .runs + .also { it.forEach { run -> cache.runs[run.runRef] = run } } + .let { emit(it) } + } + + suspend fun stopsByRoute(routeId: Int, routeType: PtvRouteType): List = + client + .safeGet("stops") { url { appendPathSegments( - "runs", - ref, + "route", routeId.toString(), + "route_type", routeType.ordinal.toString(), ) } - }.body() - cache.addRuns(response.runs) - emit(response.runs[0]) - delay(intervalMillis) - } - } + } + .body() + .stops + .also { it.forEach { stop -> cache.stops[stop.stopId] = stop } } - fun runsFlow(routeId: Int, intervalMillis: Long = 5000): Flow> = flow { - while (true) { - val response: Responses.PtvRunsResponse = client.get { + suspend fun stop(routeType: PtvRouteType, stopId: Int): PtvStop = + cache.stops.getOrPutSuspend(stopId) { + client + .safeGet { + url { + appendPathSegments( + "stops", stopId.toString(), + "route_type", routeType.ordinal.toString(), + ) + } + } + .body() + .stop + } + + suspend fun directionsByRoute(routeId: Int): List = + client + .safeGet("directions") { url { - appendPathSegments( - "runs", - "route", - routeId.toString(), - ) - parameter("expand", "VehiclePosition") + appendPathSegments("route", routeId.toString()) } - }.body() - cache.addRuns(response.runs) - emit(response.runs) - delay(intervalMillis) + } + .body() + .directions + + suspend fun direction(directionId: Int, routeId: Int): PtvDirection { + if (!cache.directions.containsKey(directionId to routeId)) { + val directions = directionsByRoute(routeId) + for (direction in directions) + cache.directions[direction.directionId to direction.routeId] = direction } - } - suspend fun stopsByRoute(routeId: Int, routeType: PtvRouteType): List { - val response: Responses.PtvStopsResponse = client.get("stops") { - url { - appendPathSegments( - "route", - routeId.toString(), - "route_type", - routeType.ordinal.toString() - ) - } - }.body() - val stops = response.stops - cache.addStops(stops) - return stops - } - - suspend fun stop(routeType: PtvRouteType, stopId: Int): PtvStop { - val cached = cache.getStop(stopId) - if (cached != null) - return cached - - val response: Responses.PtvStopResponse = client.get() { - url { - appendPathSegments( - "stops", - stopId.toString(), - "route_type", - routeType.ordinal.toString(), - ) - } - }.body() - val stop = response.stop - cache.addStops(listOf(stop)) - return stop - } - - suspend fun directionsByRoute(routeId: Int): List { - val response: Responses.PtvDirectionsResponse = client.get("directions") { - url { - appendPathSegments("route", routeId.toString()) - } - }.body() - return response.directions - } - - suspend fun direction(id: Int, routeType: PtvRouteType?): List { - val response: Responses.PtvDirectionsResponse = client.get("directions") { - url { - appendPathSegments(id.toString()) - if (routeType != null) - appendPathSegments("route_type", routeType.ordinal.toString()) - } - }.body() - return response.directions + return cache.directions[directionId to routeId]!! } suspend fun departures(routeType: PtvRouteType, stopId: Int): Responses.PtvDeparturesResponse = - client.get("departures") { - url { - appendPathSegments( - "route_type", - routeType.ordinal.toString(), - "stop", - stopId.toString() - ) - parameter("expand", "Route") - parameter("expand", "Direction") - } - }.body() + client + .safeGet ("departures") { + url { + appendPathSegments( + "route_type", routeType.ordinal.toString(), + "stop", stopId.toString(), + ) + parameter("expand", "Route") + parameter("expand", "Direction") + } + }.body() } diff --git a/shared/src/commonMain/kotlin/moe/lava/banksia/util/CacheMap.kt b/shared/src/commonMain/kotlin/moe/lava/banksia/util/CacheMap.kt new file mode 100644 index 0000000..4605505 --- /dev/null +++ b/shared/src/commonMain/kotlin/moe/lava/banksia/util/CacheMap.kt @@ -0,0 +1,48 @@ +package moe.lava.banksia.util + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import moe.lava.banksia.error + +class CacheMap( + coroutineScope: CoroutineScope, + val expiryMinutes: Int = 5, + private val innerMap: MutableMap = mutableMapOf() +) : MutableMap by innerMap { + val keyExpiries = mutableMapOf() + var counter = 0 + + init { + coroutineScope.launch { + while (true) { + delay(60000) + counter += 1 + keyExpiries + .filterValues { expiry -> expiry >= counter } + .keys + .forEach { key -> + innerMap.remove(key) + keyExpiries.remove(key) + } + } + } + } + + override fun put(key: K, value: V): V? { + keyExpiries.put(key, counter + expiryMinutes + 1) + return innerMap.put(key, value) + } + + override fun putAll(from: Map) { + keyExpiries.putAll(from.map { it.key to (counter + expiryMinutes + 1) }) + innerMap.putAll(from) + } + + override val entries: MutableSet> + get() { + error("CacheMap", ".entries accessed, cloning..", IllegalStateException()) + return this.entries.toMutableSet() + } + +} diff --git a/shared/src/commonMain/kotlin/moe/lava/banksia/util/LoopFlow.kt b/shared/src/commonMain/kotlin/moe/lava/banksia/util/LoopFlow.kt new file mode 100644 index 0000000..95c71b0 --- /dev/null +++ b/shared/src/commonMain/kotlin/moe/lava/banksia/util/LoopFlow.kt @@ -0,0 +1,65 @@ +package moe.lava.banksia.util + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.AbstractFlow +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.first +import moe.lava.banksia.log +import kotlin.experimental.ExperimentalTypeInference + +@OptIn(ExperimentalCoroutinesApi::class) +class LoopFlow(private val block: suspend FlowCollector.() -> Unit) : AbstractFlow() { + private var delayMs: Long = 5000 + private var init: (suspend FlowCollector.() -> Unit)? = null + private var waiter: (suspend () -> Unit)? = null + + override suspend fun collectSafely(collector: FlowCollector) { + init?.invoke(collector) + while (true) { + waiter?.invoke() + collector.block() + delay(delayMs) + } + } + + companion object { + fun Flow.delayFor(delay: Long) = apply { + @Suppress("UnusedFlow") + if (this is LoopFlow) + this.delayMs = delay; + else + throw IllegalStateException() + } + + fun Flow.initWith(block: suspend FlowCollector.() -> Unit) = apply { + @Suppress("UnusedFlow") + if (this is LoopFlow) + this.init = block; + else + throw IllegalStateException() + } + + fun Flow.waitFor(waiter: suspend () -> Unit) = apply { + @Suppress("UnusedFlow") + if (this is LoopFlow) + this.waiter = waiter; + else + throw IllegalStateException() + } + + fun Flow.waitUntilSubscribed(other: MutableStateFlow<*>) = waitFor { + val blocked = other.subscriptionCount.value == 0 + if (blocked) + log("LoopFlow", "blocking flow") + other.subscriptionCount.first { it > 0 } + if (blocked) + log("LoopFlow", "unblocking flow") + } + } +} + +@OptIn(ExperimentalTypeInference::class) +fun loopFlow(@BuilderInference block: suspend FlowCollector.() -> Unit) = LoopFlow(block) diff --git a/shared/src/iosMain/kotlin/moe/lava/banksia/Logging.ios.kt b/shared/src/iosMain/kotlin/moe/lava/banksia/Logging.ios.kt index 5053352..b16e01c 100644 --- a/shared/src/iosMain/kotlin/moe/lava/banksia/Logging.ios.kt +++ b/shared/src/iosMain/kotlin/moe/lava/banksia/Logging.ios.kt @@ -3,3 +3,7 @@ package moe.lava.banksia actual fun log(tag: String, msg: String) { TODO("Not yet implemented") } + +actual fun error(tag: String, msg: String, throwable: Throwable?) { + TODO("Not yet implemented") +} diff --git a/shared/src/jvmMain/kotlin/moe/lava/banksia/Logging.jvm.kt b/shared/src/jvmMain/kotlin/moe/lava/banksia/Logging.jvm.kt index fd59d3e..df35a04 100644 --- a/shared/src/jvmMain/kotlin/moe/lava/banksia/Logging.jvm.kt +++ b/shared/src/jvmMain/kotlin/moe/lava/banksia/Logging.jvm.kt @@ -3,3 +3,8 @@ package moe.lava.banksia actual fun log(tag: String, msg: String) { println("[$tag] $msg") } + +actual fun error(tag: String, msg: String, throwable: Throwable?) { + println("[$tag] $msg") + throwable?.let { println(it.stackTraceToString()) } +}