diff --git a/server/gtfs_rt/build.gradle.kts b/server/gtfs_rt/build.gradle.kts index 9e54231..934d8bc 100644 --- a/server/gtfs_rt/build.gradle.kts +++ b/server/gtfs_rt/build.gradle.kts @@ -7,6 +7,7 @@ plugins { kotlin { compilerOptions { freeCompilerArgs.add("-opt-in=kotlin.time.ExperimentalTime") + freeCompilerArgs.add("-Xexplicit-backing-fields") } } @@ -25,7 +26,7 @@ dependencies { wire { sourcePath { - srcDir("src/commonMain/proto") + srcDir("src/main/proto") } kotlin {} } diff --git a/shared/src/commonMain/kotlin/moe/lava/banksia/data/gtfsr/GtfsRealtime.kt b/server/gtfs_rt/src/main/kotlin/moe/lava/banksia/server/gtfsrt/GtfsRealtime.kt similarity index 89% rename from shared/src/commonMain/kotlin/moe/lava/banksia/data/gtfsr/GtfsRealtime.kt rename to server/gtfs_rt/src/main/kotlin/moe/lava/banksia/server/gtfsrt/GtfsRealtime.kt index 172238f..128f141 100644 --- a/shared/src/commonMain/kotlin/moe/lava/banksia/data/gtfsr/GtfsRealtime.kt +++ b/server/gtfs_rt/src/main/kotlin/moe/lava/banksia/server/gtfsrt/GtfsRealtime.kt @@ -1,4 +1,4 @@ -package moe.lava.banksia.data.gtfsr +package moe.lava.banksia.server.gtfsrt import com.google.transit.realtime.FeedMessage diff --git a/server/gtfs_rt/src/main/kotlin/moe/lava/banksia/server/gtfsrt/GtfsrtArchiver.kt b/server/gtfs_rt/src/main/kotlin/moe/lava/banksia/server/gtfsrt/GtfsrtArchiver.kt new file mode 100644 index 0000000..30a9fd3 --- /dev/null +++ b/server/gtfs_rt/src/main/kotlin/moe/lava/banksia/server/gtfsrt/GtfsrtArchiver.kt @@ -0,0 +1,109 @@ +package moe.lava.banksia.server.gtfsrt + +import com.google.transit.realtime.FeedMessage +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.withContext +import kotlinx.datetime.TimeZone +import kotlinx.datetime.toLocalDateTime +import moe.lava.banksia.util.log +import java.io.File +import kotlin.time.Instant + +private const val BASE_DIR = "./data/gtfsr-archive/" + +internal class GtfsrtArchiver { + private var started = false + + suspend fun start(flow: SharedFlow>) { + if (started) { + log("GtfsrtArchiver", "Tried to start when already started") + return + } + started = true + coroutineScope { + launch { compressJob() } + + flow.collect { (type, rawData) -> + val data = try { + FeedMessage.ADAPTER.decode(rawData) + } catch (e: Throwable) { + log("gtfsr $type", "Failed to parse proto: $e") + return@collect + } + val timestamp = data.header_.timestamp + ?: return@collect log("gtfsr $type", "Failed to read proto timestamp") + + val time = Instant.fromEpochSeconds(timestamp).toLocalDateTime(TimeZone.currentSystemDefault()) + + val base = File(BASE_DIR, type) + val previousParent = File(base, "${time.year}-${((time.dayOfYear - 1) / 7).toString().padStart(2, '0')}") + val currentParent = File(base, "${time.year}-${((time.dayOfYear - 1) / 7 + 1).toString().padStart(2, '0')}") + val target = File(currentParent, "${timestamp}.proto") + + if (previousParent.isDirectory) { + enqueueCompression(previousParent) + } + + if (!target.exists()) { + try { + if (!target.parentFile.isDirectory) { + target.parentFile.mkdirs() + } + target.writeBytes(rawData) + } catch (e: Throwable) { + log("gtfsr $type", "Failed to write ${target}: $e") + } + } + } + } + } + + private val cqueue = mutableSetOf() + private val ignore = mutableSetOf() + private val cmut = Mutex() + private suspend fun enqueueCompression(fd: File) { + cmut.withLock { cqueue.add(fd) } + } + + private suspend fun compressJob() { + while(true) { + while(true) { + val next = cmut.withLock { cqueue.firstOrNull() } + ?: break + if (!next.isDirectory) { + cmut.withLock { cqueue.remove(next) } + continue + } + if (next in ignore) continue + + withContext(Dispatchers.IO) { + val proc = ProcessBuilder( + "tar", "-acf", + "${next.absolutePath}.tar.zst", + next.absolutePath + ).start() + val exitCode = proc.waitFor() + if (exitCode == 0) { + if (next.deleteRecursively()) { + cmut.withLock { cqueue.remove(next) } + } else { + log("CompressJob", "Failed to delete $next") + ignore.add(next) + } + } else { + val msg = proc.errorStream.readAllBytes().decodeToString() + log("CompressJob", "Failed to delete $next (exit code $exitCode") + log("CompressJob", msg) + } + } + } + delay(30000) + } + } +} diff --git a/server/gtfs_rt/src/main/kotlin/moe/lava/banksia/server/gtfsrt/GtfsrtService.kt b/server/gtfs_rt/src/main/kotlin/moe/lava/banksia/server/gtfsrt/GtfsrtService.kt new file mode 100644 index 0000000..8b30b2f --- /dev/null +++ b/server/gtfs_rt/src/main/kotlin/moe/lava/banksia/server/gtfsrt/GtfsrtService.kt @@ -0,0 +1,87 @@ +package moe.lava.banksia.server.gtfsrt + +import io.ktor.client.HttpClient +import io.ktor.client.request.get +import io.ktor.client.request.header +import io.ktor.client.request.url +import io.ktor.client.statement.bodyAsText +import io.ktor.client.statement.readRawBytes +import io.ktor.http.isSuccess +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.launch +import moe.lava.banksia.Constants +import moe.lava.banksia.util.LogScope +import moe.lava.banksia.util.log + +private val types = arrayOf( + "metro/trip-updates", + "metro/vehicle-positions", + "metro/service-alerts", + "tram/trip-updates", + "tram/vehicle-positions", + "tram/service-alerts", + "bus/trip-updates", + "bus/vehicle-positions", + "vline/trip-updates", + "vline/vehicle-positions", +) + +class GtfsrtService( + private val client: HttpClient, +) { + private val archiver = GtfsrtArchiver() + private var started = false + + internal val rawMessages: SharedFlow> + field = MutableSharedFlow>() + + fun start( + scope: CoroutineScope, + enableArchiving: Boolean = false, + ) { + if (started) { + log("GtfsrtService", "Tried to start when already started") + return + } + + if (enableArchiving) { + scope.launch { archiver.start(rawMessages) } + } + + scope.launch { fetch() } + } + + private suspend fun fetch() { + coroutineScope { + types.map { type -> + launch(context = Dispatchers.IO) { + val logger = LogScope("gtfsr $type") + try { + val res = client.get { + url("https://api.opendata.transport.vic.gov.au/opendata/public-transport/gtfs/realtime/v1/${type}") + header("KeyId", Constants.opendataKey) + } + if (!res.status.isSuccess()) { + logger.log("${res.status} | ${res.bodyAsText()}") + } else { + val bytes = res.readRawBytes() + rawMessages.emit(type to bytes) + } + } catch (e: Throwable) { + logger.log("$e") + logger.log(e.stackTraceToString()) + } + } + }.joinAll() + } + + delay(10000) + fetch() + } +} diff --git a/shared/src/commonMain/kotlin/moe/lava/banksia/data/gtfsr/RealtimeVehiclePosition.kt b/server/gtfs_rt/src/main/kotlin/moe/lava/banksia/server/gtfsrt/RealtimeVehiclePositions.kt similarity index 94% rename from shared/src/commonMain/kotlin/moe/lava/banksia/data/gtfsr/RealtimeVehiclePosition.kt rename to server/gtfs_rt/src/main/kotlin/moe/lava/banksia/server/gtfsrt/RealtimeVehiclePositions.kt index 979f1f5..abebe76 100644 --- a/shared/src/commonMain/kotlin/moe/lava/banksia/data/gtfsr/RealtimeVehiclePosition.kt +++ b/server/gtfs_rt/src/main/kotlin/moe/lava/banksia/server/gtfsrt/RealtimeVehiclePositions.kt @@ -1,4 +1,4 @@ -package moe.lava.banksia.data.gtfsr +package moe.lava.banksia.server.gtfsrt import com.google.transit.realtime.FeedMessage import moe.lava.banksia.util.Point diff --git a/shared/src/commonMain/proto/gtfs-realtime.proto b/server/gtfs_rt/src/main/proto/gtfs-realtime.proto similarity index 100% rename from shared/src/commonMain/proto/gtfs-realtime.proto rename to server/gtfs_rt/src/main/proto/gtfs-realtime.proto diff --git a/server/src/main/kotlin/moe/lava/banksia/server/Application.kt b/server/src/main/kotlin/moe/lava/banksia/server/Application.kt index 1bfacaa..b33fb9d 100644 --- a/server/src/main/kotlin/moe/lava/banksia/server/Application.kt +++ b/server/src/main/kotlin/moe/lava/banksia/server/Application.kt @@ -26,7 +26,7 @@ import moe.lava.banksia.room.dao.StopDao import moe.lava.banksia.room.dao.StopTimeDao import moe.lava.banksia.room.dao.VersionMetadataDao import moe.lava.banksia.server.di.ServerModules -import moe.lava.banksia.server.gtfsr.GtfsrService +import moe.lava.banksia.server.gtfsrt.GtfsrtService import moe.lava.banksia.util.serialise import org.koin.dsl.module import org.koin.ktor.ext.inject @@ -49,8 +49,8 @@ fun Application.module() { @Suppress("KotlinConstantConditions") if (!Constants.devMode) { - val gtfsr by inject() - launch { gtfsr.start() } + val gtfsr by inject() + launch { gtfsr.start(this, true) } } routing { diff --git a/server/src/main/kotlin/moe/lava/banksia/server/di/ServerModules.kt b/server/src/main/kotlin/moe/lava/banksia/server/di/ServerModules.kt index b4e3878..d51379e 100644 --- a/server/src/main/kotlin/moe/lava/banksia/server/di/ServerModules.kt +++ b/server/src/main/kotlin/moe/lava/banksia/server/di/ServerModules.kt @@ -3,14 +3,14 @@ package moe.lava.banksia.server.di import io.ktor.client.HttpClient import moe.lava.banksia.server.GtfsImporter import moe.lava.banksia.server.gtfs.GtfsParser -import moe.lava.banksia.server.gtfsr.GtfsrService +import moe.lava.banksia.server.gtfsrt.GtfsrtService import org.koin.core.module.dsl.singleOf import org.koin.dsl.module val ServerModules = module { single { HttpClient() } singleOf(::GtfsParser) - singleOf(::GtfsrService) + singleOf(::GtfsrtService) singleOf(::GtfsImporter) } diff --git a/server/src/main/kotlin/moe/lava/banksia/server/gtfsr/GtfsrService.kt b/server/src/main/kotlin/moe/lava/banksia/server/gtfsr/GtfsrService.kt deleted file mode 100644 index 5a0b1dc..0000000 --- a/server/src/main/kotlin/moe/lava/banksia/server/gtfsr/GtfsrService.kt +++ /dev/null @@ -1,164 +0,0 @@ -package moe.lava.banksia.server.gtfsr - -import com.google.transit.realtime.FeedMessage -import io.ktor.client.HttpClient -import io.ktor.client.request.get -import io.ktor.client.request.header -import io.ktor.client.request.url -import io.ktor.client.statement.bodyAsText -import io.ktor.client.statement.readRawBytes -import io.ktor.http.isSuccess -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.asSharedFlow -import kotlinx.coroutines.joinAll -import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import kotlinx.coroutines.withContext -import moe.lava.banksia.Constants -import moe.lava.banksia.util.LogScope -import moe.lava.banksia.util.log -import java.io.File -import java.time.Instant -import java.time.ZoneId - -private const val BASE_DIR = "./data/gtfsr-archive/" - -class GtfsrService(private val client: HttpClient) { - private var started = false - private val latest = mutableMapOf() - - fun latestFor(type: String) = latest[type] - - private val iFlow = MutableSharedFlow>() - val flow = iFlow.asSharedFlow() - - companion object { - val types = arrayOf( - "metro/trip-updates", - "metro/vehicle-positions", - "metro/service-alerts", - "tram/trip-updates", - "tram/vehicle-positions", - "tram/service-alerts", - "bus/trip-updates", - "bus/vehicle-positions", - "vline/trip-updates", - "vline/vehicle-positions", - ) - } - - suspend fun start() { - if (started) { - log("GtfsrService", "Tried to start when already started") - return - } - started = true - coroutineScope { - launch { compressJob() } - - while (true) { - val results = mutableMapOf() - types.map { type -> - launch(context = Dispatchers.IO) { - val logger = LogScope("gtfsr $type") - try { - val res = client.get { - url("https://api.opendata.transport.vic.gov.au/opendata/public-transport/gtfs/realtime/v1/${type}") - header("KeyId", Constants.opendataKey) - } - if (!res.status.isSuccess()) { - logger.log("${res.status} | ${res.bodyAsText()}") - } else { - results[type] = res.readRawBytes() - } - } catch (e: Throwable) { - logger.log("$e") - logger.log(e.stackTraceToString()) - } - } - }.joinAll() - - results.forEach { (type, data) -> - val dec = try { - FeedMessage.ADAPTER.decode(data) - } catch (e: Throwable) { - log("gtfsr $type", "Failed to parse proto: $e") - return@forEach - } - val timestamp = dec.header_.timestamp - ?: return@forEach log("gtfsr $type", "Failed to read proto timestamp") - - val time = Instant.ofEpochSecond(timestamp).atZone(ZoneId.systemDefault()) - - val base = File(BASE_DIR, type) - val previousParent = File(base, "${time.year}-${((time.dayOfYear - 1) / 7).toString().padStart(2, '0')}") - val currentParent = File(base, "${time.year}-${((time.dayOfYear - 1) / 7 + 1).toString().padStart(2, '0')}") - val target = File(currentParent, "${timestamp}.proto") - - if (previousParent.isDirectory) { - enqueueCompression(previousParent) - } - - if (!target.exists()) { - try { - if (!target.parentFile.isDirectory) { - target.parentFile.mkdirs() - } - target.writeBytes(data) - } catch (e: Throwable) { - log("gtfsr $type", "Failed to write ${target}: $e") - } - } - } - delay(10000) - } - } - } - - private val cqueue = mutableSetOf() - private val ignore = mutableSetOf() - private val cmut = Mutex() - private suspend fun enqueueCompression(fd: File) { - cmut.withLock { cqueue.add(fd) } - } - - private suspend fun compressJob() { - while(true) { - while(true) { - val next = cmut.withLock { cqueue.firstOrNull() } - ?: break - if (!next.isDirectory) { - cmut.withLock { cqueue.remove(next) } - continue - } - if (next in ignore) continue - - withContext(Dispatchers.IO) { - val proc = ProcessBuilder( - "tar", "-acf", - "${next.absolutePath}.tar.zst", - next.absolutePath - ).start() - val exitCode = proc.waitFor() - if (exitCode == 0) { - if (next.deleteRecursively()) { - cmut.withLock { cqueue.remove(next) } - } else { - log("CompressJob", "Failed to delete $next") - ignore.add(next) - } - } else { - val msg = proc.errorStream.readAllBytes().decodeToString() - log("CompressJob", "Failed to delete $next (exit code $exitCode") - log("CompressJob", msg) - } - } - } - delay(30000) - } - } -} diff --git a/shared/build.gradle.kts b/shared/build.gradle.kts index 953d790..b4ed8ad 100644 --- a/shared/build.gradle.kts +++ b/shared/build.gradle.kts @@ -6,7 +6,6 @@ plugins { alias(libs.plugins.androidMultiplatformLibrary) alias(libs.plugins.ksp) alias(libs.plugins.room) - alias(libs.plugins.wire) } room { @@ -62,10 +61,3 @@ dependencies { add("kspIosSimulatorArm64", libs.room.compiler) add("kspJvm", libs.room.compiler) } - -wire { - sourcePath { - srcDir("src/commonMain/proto") - } - kotlin {} -}