refactor(server): move gtfsrt to separate module

This commit is contained in:
Cilly Leang 2026-04-01 16:32:01 +11:00
parent 0181497420
commit ed9d294afc
Signed by: cilly
GPG key ID: 6500251E087653C9
10 changed files with 205 additions and 180 deletions

View file

@ -7,6 +7,7 @@ plugins {
kotlin { kotlin {
compilerOptions { compilerOptions {
freeCompilerArgs.add("-opt-in=kotlin.time.ExperimentalTime") freeCompilerArgs.add("-opt-in=kotlin.time.ExperimentalTime")
freeCompilerArgs.add("-Xexplicit-backing-fields")
} }
} }
@ -25,7 +26,7 @@ dependencies {
wire { wire {
sourcePath { sourcePath {
srcDir("src/commonMain/proto") srcDir("src/main/proto")
} }
kotlin {} kotlin {}
} }

View file

@ -1,4 +1,4 @@
package moe.lava.banksia.data.gtfsr package moe.lava.banksia.server.gtfsrt
import com.google.transit.realtime.FeedMessage import com.google.transit.realtime.FeedMessage

View file

@ -0,0 +1,109 @@
package moe.lava.banksia.server.gtfsrt
import com.google.transit.realtime.FeedMessage
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import kotlinx.datetime.TimeZone
import kotlinx.datetime.toLocalDateTime
import moe.lava.banksia.util.log
import java.io.File
import kotlin.time.Instant
private const val BASE_DIR = "./data/gtfsr-archive/"
internal class GtfsrtArchiver {
private var started = false
suspend fun start(flow: SharedFlow<Pair<String, ByteArray>>) {
if (started) {
log("GtfsrtArchiver", "Tried to start when already started")
return
}
started = true
coroutineScope {
launch { compressJob() }
flow.collect { (type, rawData) ->
val data = try {
FeedMessage.ADAPTER.decode(rawData)
} catch (e: Throwable) {
log("gtfsr $type", "Failed to parse proto: $e")
return@collect
}
val timestamp = data.header_.timestamp
?: return@collect log("gtfsr $type", "Failed to read proto timestamp")
val time = Instant.fromEpochSeconds(timestamp).toLocalDateTime(TimeZone.currentSystemDefault())
val base = File(BASE_DIR, type)
val previousParent = File(base, "${time.year}-${((time.dayOfYear - 1) / 7).toString().padStart(2, '0')}")
val currentParent = File(base, "${time.year}-${((time.dayOfYear - 1) / 7 + 1).toString().padStart(2, '0')}")
val target = File(currentParent, "${timestamp}.proto")
if (previousParent.isDirectory) {
enqueueCompression(previousParent)
}
if (!target.exists()) {
try {
if (!target.parentFile.isDirectory) {
target.parentFile.mkdirs()
}
target.writeBytes(rawData)
} catch (e: Throwable) {
log("gtfsr $type", "Failed to write ${target}: $e")
}
}
}
}
}
private val cqueue = mutableSetOf<File>()
private val ignore = mutableSetOf<File>()
private val cmut = Mutex()
private suspend fun enqueueCompression(fd: File) {
cmut.withLock { cqueue.add(fd) }
}
private suspend fun compressJob() {
while(true) {
while(true) {
val next = cmut.withLock { cqueue.firstOrNull() }
?: break
if (!next.isDirectory) {
cmut.withLock { cqueue.remove(next) }
continue
}
if (next in ignore) continue
withContext(Dispatchers.IO) {
val proc = ProcessBuilder(
"tar", "-acf",
"${next.absolutePath}.tar.zst",
next.absolutePath
).start()
val exitCode = proc.waitFor()
if (exitCode == 0) {
if (next.deleteRecursively()) {
cmut.withLock { cqueue.remove(next) }
} else {
log("CompressJob", "Failed to delete $next")
ignore.add(next)
}
} else {
val msg = proc.errorStream.readAllBytes().decodeToString()
log("CompressJob", "Failed to delete $next (exit code $exitCode")
log("CompressJob", msg)
}
}
}
delay(30000)
}
}
}

View file

@ -0,0 +1,87 @@
package moe.lava.banksia.server.gtfsrt
import io.ktor.client.HttpClient
import io.ktor.client.request.get
import io.ktor.client.request.header
import io.ktor.client.request.url
import io.ktor.client.statement.bodyAsText
import io.ktor.client.statement.readRawBytes
import io.ktor.http.isSuccess
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import moe.lava.banksia.Constants
import moe.lava.banksia.util.LogScope
import moe.lava.banksia.util.log
private val types = arrayOf(
"metro/trip-updates",
"metro/vehicle-positions",
"metro/service-alerts",
"tram/trip-updates",
"tram/vehicle-positions",
"tram/service-alerts",
"bus/trip-updates",
"bus/vehicle-positions",
"vline/trip-updates",
"vline/vehicle-positions",
)
class GtfsrtService(
private val client: HttpClient,
) {
private val archiver = GtfsrtArchiver()
private var started = false
internal val rawMessages: SharedFlow<Pair<String, ByteArray>>
field = MutableSharedFlow<Pair<String, ByteArray>>()
fun start(
scope: CoroutineScope,
enableArchiving: Boolean = false,
) {
if (started) {
log("GtfsrtService", "Tried to start when already started")
return
}
if (enableArchiving) {
scope.launch { archiver.start(rawMessages) }
}
scope.launch { fetch() }
}
private suspend fun fetch() {
coroutineScope {
types.map { type ->
launch(context = Dispatchers.IO) {
val logger = LogScope("gtfsr $type")
try {
val res = client.get {
url("https://api.opendata.transport.vic.gov.au/opendata/public-transport/gtfs/realtime/v1/${type}")
header("KeyId", Constants.opendataKey)
}
if (!res.status.isSuccess()) {
logger.log("${res.status} | ${res.bodyAsText()}")
} else {
val bytes = res.readRawBytes()
rawMessages.emit(type to bytes)
}
} catch (e: Throwable) {
logger.log("$e")
logger.log(e.stackTraceToString())
}
}
}.joinAll()
}
delay(10000)
fetch()
}
}

View file

@ -1,4 +1,4 @@
package moe.lava.banksia.data.gtfsr package moe.lava.banksia.server.gtfsrt
import com.google.transit.realtime.FeedMessage import com.google.transit.realtime.FeedMessage
import moe.lava.banksia.util.Point import moe.lava.banksia.util.Point

View file

@ -26,7 +26,7 @@ import moe.lava.banksia.room.dao.StopDao
import moe.lava.banksia.room.dao.StopTimeDao import moe.lava.banksia.room.dao.StopTimeDao
import moe.lava.banksia.room.dao.VersionMetadataDao import moe.lava.banksia.room.dao.VersionMetadataDao
import moe.lava.banksia.server.di.ServerModules import moe.lava.banksia.server.di.ServerModules
import moe.lava.banksia.server.gtfsr.GtfsrService import moe.lava.banksia.server.gtfsrt.GtfsrtService
import moe.lava.banksia.util.serialise import moe.lava.banksia.util.serialise
import org.koin.dsl.module import org.koin.dsl.module
import org.koin.ktor.ext.inject import org.koin.ktor.ext.inject
@ -49,8 +49,8 @@ fun Application.module() {
@Suppress("KotlinConstantConditions") @Suppress("KotlinConstantConditions")
if (!Constants.devMode) { if (!Constants.devMode) {
val gtfsr by inject<GtfsrService>() val gtfsr by inject<GtfsrtService>()
launch { gtfsr.start() } launch { gtfsr.start(this, true) }
} }
routing { routing {

View file

@ -3,14 +3,14 @@ package moe.lava.banksia.server.di
import io.ktor.client.HttpClient import io.ktor.client.HttpClient
import moe.lava.banksia.server.GtfsImporter import moe.lava.banksia.server.GtfsImporter
import moe.lava.banksia.server.gtfs.GtfsParser import moe.lava.banksia.server.gtfs.GtfsParser
import moe.lava.banksia.server.gtfsr.GtfsrService import moe.lava.banksia.server.gtfsrt.GtfsrtService
import org.koin.core.module.dsl.singleOf import org.koin.core.module.dsl.singleOf
import org.koin.dsl.module import org.koin.dsl.module
val ServerModules = module { val ServerModules = module {
single { HttpClient() } single { HttpClient() }
singleOf(::GtfsParser) singleOf(::GtfsParser)
singleOf(::GtfsrService) singleOf(::GtfsrtService)
singleOf(::GtfsImporter) singleOf(::GtfsImporter)
} }

View file

@ -1,164 +0,0 @@
package moe.lava.banksia.server.gtfsr
import com.google.transit.realtime.FeedMessage
import io.ktor.client.HttpClient
import io.ktor.client.request.get
import io.ktor.client.request.header
import io.ktor.client.request.url
import io.ktor.client.statement.bodyAsText
import io.ktor.client.statement.readRawBytes
import io.ktor.http.isSuccess
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import moe.lava.banksia.Constants
import moe.lava.banksia.util.LogScope
import moe.lava.banksia.util.log
import java.io.File
import java.time.Instant
import java.time.ZoneId
private const val BASE_DIR = "./data/gtfsr-archive/"
class GtfsrService(private val client: HttpClient) {
private var started = false
private val latest = mutableMapOf<String, FeedMessage>()
fun latestFor(type: String) = latest[type]
private val iFlow = MutableSharedFlow<Pair<String, FeedMessage>>()
val flow = iFlow.asSharedFlow()
companion object {
val types = arrayOf(
"metro/trip-updates",
"metro/vehicle-positions",
"metro/service-alerts",
"tram/trip-updates",
"tram/vehicle-positions",
"tram/service-alerts",
"bus/trip-updates",
"bus/vehicle-positions",
"vline/trip-updates",
"vline/vehicle-positions",
)
}
suspend fun start() {
if (started) {
log("GtfsrService", "Tried to start when already started")
return
}
started = true
coroutineScope {
launch { compressJob() }
while (true) {
val results = mutableMapOf<String, ByteArray>()
types.map { type ->
launch(context = Dispatchers.IO) {
val logger = LogScope("gtfsr $type")
try {
val res = client.get {
url("https://api.opendata.transport.vic.gov.au/opendata/public-transport/gtfs/realtime/v1/${type}")
header("KeyId", Constants.opendataKey)
}
if (!res.status.isSuccess()) {
logger.log("${res.status} | ${res.bodyAsText()}")
} else {
results[type] = res.readRawBytes()
}
} catch (e: Throwable) {
logger.log("$e")
logger.log(e.stackTraceToString())
}
}
}.joinAll()
results.forEach { (type, data) ->
val dec = try {
FeedMessage.ADAPTER.decode(data)
} catch (e: Throwable) {
log("gtfsr $type", "Failed to parse proto: $e")
return@forEach
}
val timestamp = dec.header_.timestamp
?: return@forEach log("gtfsr $type", "Failed to read proto timestamp")
val time = Instant.ofEpochSecond(timestamp).atZone(ZoneId.systemDefault())
val base = File(BASE_DIR, type)
val previousParent = File(base, "${time.year}-${((time.dayOfYear - 1) / 7).toString().padStart(2, '0')}")
val currentParent = File(base, "${time.year}-${((time.dayOfYear - 1) / 7 + 1).toString().padStart(2, '0')}")
val target = File(currentParent, "${timestamp}.proto")
if (previousParent.isDirectory) {
enqueueCompression(previousParent)
}
if (!target.exists()) {
try {
if (!target.parentFile.isDirectory) {
target.parentFile.mkdirs()
}
target.writeBytes(data)
} catch (e: Throwable) {
log("gtfsr $type", "Failed to write ${target}: $e")
}
}
}
delay(10000)
}
}
}
private val cqueue = mutableSetOf<File>()
private val ignore = mutableSetOf<File>()
private val cmut = Mutex()
private suspend fun enqueueCompression(fd: File) {
cmut.withLock { cqueue.add(fd) }
}
private suspend fun compressJob() {
while(true) {
while(true) {
val next = cmut.withLock { cqueue.firstOrNull() }
?: break
if (!next.isDirectory) {
cmut.withLock { cqueue.remove(next) }
continue
}
if (next in ignore) continue
withContext(Dispatchers.IO) {
val proc = ProcessBuilder(
"tar", "-acf",
"${next.absolutePath}.tar.zst",
next.absolutePath
).start()
val exitCode = proc.waitFor()
if (exitCode == 0) {
if (next.deleteRecursively()) {
cmut.withLock { cqueue.remove(next) }
} else {
log("CompressJob", "Failed to delete $next")
ignore.add(next)
}
} else {
val msg = proc.errorStream.readAllBytes().decodeToString()
log("CompressJob", "Failed to delete $next (exit code $exitCode")
log("CompressJob", msg)
}
}
}
delay(30000)
}
}
}

View file

@ -6,7 +6,6 @@ plugins {
alias(libs.plugins.androidMultiplatformLibrary) alias(libs.plugins.androidMultiplatformLibrary)
alias(libs.plugins.ksp) alias(libs.plugins.ksp)
alias(libs.plugins.room) alias(libs.plugins.room)
alias(libs.plugins.wire)
} }
room { room {
@ -62,10 +61,3 @@ dependencies {
add("kspIosSimulatorArm64", libs.room.compiler) add("kspIosSimulatorArm64", libs.room.compiler)
add("kspJvm", libs.room.compiler) add("kspJvm", libs.room.compiler)
} }
wire {
sourcePath {
srcDir("src/commonMain/proto")
}
kotlin {}
}