refactor(server): split gtfs into its own module

This commit is contained in:
Cilly Leang 2026-03-31 23:12:54 +11:00
parent aad5ae4024
commit 0181497420
Signed by: cilly
GPG key ID: 6500251E087653C9
18 changed files with 241 additions and 132 deletions

View file

@ -26,7 +26,6 @@ import moe.lava.banksia.room.dao.StopDao
import moe.lava.banksia.room.dao.StopTimeDao
import moe.lava.banksia.room.dao.VersionMetadataDao
import moe.lava.banksia.server.di.ServerModules
import moe.lava.banksia.server.gtfs.GtfsHandler
import moe.lava.banksia.server.gtfsr.GtfsrService
import moe.lava.banksia.util.serialise
import org.koin.dsl.module
@ -67,8 +66,8 @@ fun Application.module() {
?: "https://opendata.transport.vic.gov.au/dataset/3f4e292e-7f8a-4ffe-831f-1953be0fe448/resource/${datasetUuid}/download/gtfs.zip"
call.respondText("received")
launch(context = Dispatchers.IO) {
val handler by inject<GtfsHandler>()
handler.update(datasetUrl)
val importer by inject<GtfsImporter>()
importer.import(datasetUrl)
}
}

View file

@ -0,0 +1,95 @@
package moe.lava.banksia.server
import io.ktor.util.logging.Logger
import moe.lava.banksia.model.Route
import moe.lava.banksia.model.Service
import moe.lava.banksia.model.Shape
import moe.lava.banksia.model.Stop
import moe.lava.banksia.model.StopTime
import moe.lava.banksia.model.Trip
import moe.lava.banksia.room.Database
import moe.lava.banksia.room.entity.asEntity
import moe.lava.banksia.server.gtfs.GtfsData
import moe.lava.banksia.server.gtfs.GtfsParser
import kotlin.time.Clock
class GtfsImporter(
private val parser: GtfsParser,
private val database: Database,
private val log: Logger,
) {
suspend fun import(url: String, date: Long = Clock.System.now().epochSeconds) {
database.routeDao.deleteAll()
database.serviceDao.deleteAll()
database.shapeDao.deleteAll()
database.stopDao.deleteAll()
database.stopTimeDao.deleteAll()
database.tripDao.deleteAll()
parser.update(url).collect { chunk ->
when (chunk) {
is GtfsData.RouteChunk -> addRoutes(chunk.routes)
is GtfsData.ServiceChunk -> addServices(chunk.services)
is GtfsData.ShapeChunk -> addShapes(chunk.shapes)
is GtfsData.StopChunk -> addStops(chunk.stops)
is GtfsData.StopTimeChunk -> addStopTimes(chunk.stopTimes)
is GtfsData.TripChunk -> addTrips(chunk.trips)
}
}
updateMetadata(date)
}
private suspend fun updateMetadata(date: Long) {
val dao = database.versionMetadataDao
log.info("updating metadata...")
dao.update(date, listOf("routes", "stops", "shapes", "trips", "stop_times"))
}
private suspend fun addRoutes(routes: List<Route>) {
val dao = database.routeDao
log.info("inserting routes...")
dao.insertOrReplaceAll(*routes.map { it.asEntity() }.toTypedArray())
}
private suspend fun addServices(services: List<Service>) {
val dao = database.serviceDao
log.info("inserting services...")
dao.insertOrReplaceAll(*services.map { it.asEntity() }.toTypedArray())
}
private suspend fun addShapes(shapes: List<Shape>) {
val dao = database.shapeDao
log.info("inserting shapes...")
dao.insertOrReplaceAll(*shapes.map { it.asEntity() }.toTypedArray())
}
private suspend fun addStops(stops: List<Stop>) {
val dao = database.stopDao
log.info("inserting stops...")
stops
.groupBy { it.id }
.forEach { (id, gstops) ->
if (gstops.size > 1) {
if (gstops.withIndex().any { (i, stop) -> i != 0 && stop != gstops[i - 1] }) {
gstops.forEach {
log.warn("duplicate $id: $it")
}
}
}
}
dao.insertOrReplaceAll(*stops.map { it.asEntity() }.toTypedArray())
}
private suspend fun addStopTimes(stopTimes: List<StopTime>) {
val dao = database.stopTimeDao
log.info("inserting ${stopTimes.size} stoptimes...")
dao.insertOrReplaceAll(*stopTimes.map { it.asEntity() }.toTypedArray())
}
private suspend fun addTrips(trips: List<Trip>) {
val dao = database.tripDao
log.info("inserting ${trips.size} trips...")
dao.insertOrReplaceAll(*trips.map { it.asEntity() }.toTypedArray())
}
}

View file

@ -1,13 +1,16 @@
package moe.lava.banksia.server.di
import io.ktor.client.HttpClient
import moe.lava.banksia.server.gtfs.GtfsHandler
import moe.lava.banksia.server.GtfsImporter
import moe.lava.banksia.server.gtfs.GtfsParser
import moe.lava.banksia.server.gtfsr.GtfsrService
import org.koin.core.module.dsl.singleOf
import org.koin.dsl.module
val ServerModules = module {
single { HttpClient() }
singleOf(::GtfsHandler)
singleOf(::GtfsParser)
singleOf(::GtfsrService)
singleOf(::GtfsImporter)
}

View file

@ -1,336 +0,0 @@
package moe.lava.banksia.server.gtfs
import com.lightningkite.kotlinx.serialization.csv.CsvFormat
import com.lightningkite.kotlinx.serialization.csv.StringDeferringConfig
import io.ktor.client.HttpClient
import io.ktor.client.request.prepareRequest
import io.ktor.client.request.url
import io.ktor.client.statement.bodyAsChannel
import io.ktor.util.cio.writeChannel
import io.ktor.util.logging.Logger
import io.ktor.utils.io.copyAndClose
import kotlinx.datetime.DayOfWeek
import kotlinx.datetime.LocalDate
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.modules.EmptySerializersModule
import kotlinx.serialization.serializer
import moe.lava.banksia.Constants
import moe.lava.banksia.model.Route
import moe.lava.banksia.model.Service
import moe.lava.banksia.model.Shape
import moe.lava.banksia.model.Stop
import moe.lava.banksia.model.StopTime
import moe.lava.banksia.model.Trip
import moe.lava.banksia.room.Database
import moe.lava.banksia.room.converter.RouteTypeConverter
import moe.lava.banksia.room.entity.asEntity
import moe.lava.banksia.server.gtfs.structures.GtfsRoute
import moe.lava.banksia.server.gtfs.structures.GtfsService
import moe.lava.banksia.server.gtfs.structures.GtfsShape
import moe.lava.banksia.server.gtfs.structures.GtfsStop
import moe.lava.banksia.server.gtfs.structures.GtfsStopTime
import moe.lava.banksia.server.gtfs.structures.GtfsTrip
import moe.lava.banksia.util.Point
import java.io.File
import java.util.zip.ZipFile
import kotlin.time.Clock
import kotlin.time.ExperimentalTime
class GtfsHandler(
private val log: Logger,
private val client: HttpClient,
private val db: Database,
) {
private val csv = CsvFormat(StringDeferringConfig(EmptySerializersModule()))
private val datasetPath = File("/tmp/banksia", "dataset.zip")
@OptIn(ExperimentalTime::class)
suspend fun update(datasetUrl: String, date: Long? = null) {
val parentDir = datasetPath.parentFile
@Suppress("SimplifyBooleanWithConstants", "KotlinConstantConditions")
if (parentDir.exists() && !Constants.devMode)
parentDir.deleteRecursively()
parentDir.mkdirs()
log.info("fetching..")
client.prepareRequest {
url(datasetUrl)
}.execute { resp ->
if (!datasetPath.exists())
resp.bodyAsChannel().copyAndClose(datasetPath.writeChannel())
log.info("fetched!")
}
log.info("extracting...")
@Suppress("KotlinConstantConditions")
val files = if (Constants.devMode) {
datasetPath.parentFile
.listFiles { it.isDirectory }
.flatMap { d -> d.listFiles { f -> f.extension == "txt" }.toList() }
.ifEmpty { extractAll(datasetPath) }
.filter { it.parentFile.name == "2" }
} else {
extractAll(datasetPath)
}
addRoutes(files)
addStops(files)
addShapes(files)
val services = addServices(files)
val trips = addTrips(files, services.associateBy { it.id })
addStopTimes(files, trips.associateBy { it.id })
updateMetadata(date ?: Clock.System.now().epochSeconds)
@Suppress("KotlinConstantConditions")
if (!Constants.devMode) {
parentDir.deleteRecursively()
}
log.info("done!")
}
private suspend fun updateMetadata(date: Long) {
val dao = db.versionMetadataDao
log.info("updating metadata...")
dao.update(date, listOf("routes", "stops", "shapes", "trips", "stop_times"))
}
private suspend fun addRoutes(files: List<File>) {
val dao = db.routeDao
log.info("parsing routes...")
val routes = files
.filter { it.name == "routes.txt" }
.flatMap { fd -> parseRoutes(fd) }
log.info("inserting routes...")
dao.deleteAll()
dao.insertAll(*routes.map { it.asEntity() }.toTypedArray())
}
private fun parseRoutes(fd: File) =
fd.parseCsv<GtfsRoute>()
.map { with(it) {
Route(
id = route_id,
type = RouteTypeConverter.from(fd.parentFile.name.toInt()),
number = route_short_name,
name = route_long_name,
)
} }
private suspend fun addShapes(files: List<File>) {
val dao = db.shapeDao
log.info("parsing shapes...")
val shapes = files
.filter { it.name == "shapes.txt" }
.flatMap { fd -> parseShapes(fd) }
log.info("inserting shapes...")
dao.deleteAll()
dao.insertAll(*shapes.map { it.asEntity() }.toTypedArray())
}
private fun parseShapes(fd: File) =
fd.parseCsv<GtfsShape>()
.groupBy { it.shape_id }
.map { (id, group) ->
val points = group
.sortedBy { it.shape_pt_sequence }
.map { Point(it.shape_pt_lat, it.shape_pt_lon) }
Shape(id, points)
}
private suspend fun addStops(files: List<File>) {
val dao = db.stopDao
log.info("parsing stops...")
val stops = files
.filter { it.name == "stops.txt" }
.flatMap { fd -> parseStops(fd) }
log.info("inserting stops...")
dao.deleteAll()
stops
.groupBy { it.id }
.forEach { (id, gstops) ->
if (gstops.size > 1) {
if (gstops.withIndex().any { (i, stop) -> i != 0 && stop != gstops[i - 1] }) {
gstops.forEach {
log.info("duplicate $id: $it")
}
}
}
}
dao.insertOrReplaceAll(*stops.map { it.asEntity() }.toTypedArray())
}
private fun parseStops(fd: File) =
fd.parseCsv<GtfsStop>()
.map { with(it) {
Stop(
id = stop_id,
name = stop_name,
pos = Point(stop_lat, stop_lon),
parent = parent_station,
hasWheelChairBoarding = wheelchair_boarding == "1",
level = level_id,
platformCode = platform_code,
)
} }
private suspend fun addStopTimes(files: List<File>, trips: Map<String, Trip>) {
val dao = db.stopTimeDao
dao.deleteAll()
log.info("parsing stop times...")
files
.filter { it.name == "stop_times.txt" }
.forEach { fd ->
log.info("parsing stop times for ${fd.parent}...")
parseStopTimes(fd, trips) { seq ->
seq.chunked(1000000)
.forEach { queue ->
log.info("converting stop times (${queue.size}) for ${fd.parent}...")
val conv = queue.map { it.asEntity() }.toTypedArray()
log.info("inserting stop times (${conv.size}) for ${fd.parent}...")
dao.insertOrReplaceAll(*conv)
}
}
}
}
private inline fun parseStopTimes(fd: File, trips: Map<String, Trip>, block: (Sequence<StopTime>) -> Unit) =
fd.parseCsvSequence<GtfsStopTime> { seq ->
seq
.map { with(it) {
StopTime(
tripId = trip_id,
stopId = stop_id,
arrivalTime = GtfsStopTime.parseGtfsTime(arrival_time),
departureTime = GtfsStopTime.parseGtfsTime(departure_time),
headsign = stop_headsign.ifEmpty { trips[trip_id]!!.tripHeadsign },
pickupType = pickup_type,
dropOffType = drop_off_type,
)
} }
.let { block(it) }
}
private suspend fun addServices(files: List<File>): List<Service> {
val dao = db.serviceDao
log.info("parsing services...")
val services = files
.filter { it.name == "calendar.txt" }
.flatMap { fd -> parseServices(fd) }
log.info("inserting services...")
dao.deleteAll()
dao.insertOrReplaceAll(*services.map { it.asEntity() }.toTypedArray())
return services
}
private fun parseServices(fd: File) =
fd.parseCsv<GtfsService>()
.map { with(it) {
val days = buildList {
if (monday == 1) add(DayOfWeek.MONDAY)
if (tuesday == 1) add(DayOfWeek.TUESDAY)
if (wednesday == 1) add(DayOfWeek.WEDNESDAY)
if (thursday == 1) add(DayOfWeek.THURSDAY)
if (friday == 1) add(DayOfWeek.FRIDAY)
if (saturday == 1) add(DayOfWeek.SATURDAY)
if (sunday == 1) add(DayOfWeek.SUNDAY)
}
Service(
id = service_id,
days = days,
start = LocalDate.parse(start_date, LocalDate.Formats.ISO_BASIC),
end = LocalDate.parse(end_date, LocalDate.Formats.ISO_BASIC),
)
} }
private suspend fun addTrips(files: List<File>, services: Map<String, Service>): List<Trip> {
val dao = db.tripDao
log.info("parsing trips...")
val trips = files
.filter { it.name == "trips.txt" }
.flatMap { fd -> parseTrips(fd, services) }
log.info("inserting trips...")
dao.deleteAll()
dao.insertOrReplaceAll(*trips.map { it.asEntity() }.toTypedArray())
return trips
}
private fun parseTrips(fd: File, services: Map<String, Service>) =
fd.parseCsv<GtfsTrip>()
.map { with(it) {
Trip(
id = trip_id,
routeId = route_id,
service = services[service_id]!!,
shapeId = shape_id.ifEmpty { null },
tripHeadsign = trip_headsign,
directionId = direction_id,
blockId = block_id,
wheelchairAccessible = wheelchair_accessible,
)
} }
private fun extract(fd: File): List<File> {
val outputs = mutableListOf<File>()
ZipFile(fd).use { zip ->
for (entry in zip.entries()) {
zip.getInputStream(entry).use { input ->
val out = File(fd.parentFile, entry.name)
out.parentFile.mkdirs()
out.outputStream().use { output ->
input.copyTo(output)
}
outputs.add(out)
}
}
}
return outputs
}
private fun extractAll(fd: File) = extract(fd).flatMap(::extract)
private inline fun <reified T> File.parseCsv(): List<T> = this
.readText()
.replace("\uFEFF", "") // remove bom
.replace("\r\n", "\n") // crlf -> lf
.let { csv.decodeFromString(it) }
private inline fun <reified T> File.parseCsvSequence(block: (Sequence<T>) -> Unit) = this
.bufferedReader()
.use { reader ->
val iter = object : CharIterator() {
var next: Char? = null
override fun nextChar(): Char {
if (!hasNext()) {
throw NoSuchElementException()
}
val ret = next!!
next = null
return ret
}
override fun hasNext(): Boolean {
if (next == null) {
do {
next = null
val new = reader.read()
if (new != -1) {
next = new.toChar()
}
} while (next == '\uFEFF' || next == '\r')
}
return next != null
}
}
block(csv.decodeToSequence(iter, csv.serializersModule.serializer()))
}
}

View file

@ -1,15 +0,0 @@
package moe.lava.banksia.server.gtfs.structures
import kotlinx.serialization.Serializable
@Suppress("PropertyName")
@Serializable
data class GtfsRoute(
val route_id: String,
val agency_id: String,
val route_short_name: String,
val route_long_name: String,
val route_type: String,
val route_color: String,
val route_text_color: String,
)

View file

@ -1,18 +0,0 @@
package moe.lava.banksia.server.gtfs.structures
import kotlinx.serialization.Serializable
@Suppress("PropertyName")
@Serializable
data class GtfsService(
val service_id: String,
val monday: Int,
val tuesday: Int,
val wednesday: Int,
val thursday: Int,
val friday: Int,
val saturday: Int,
val sunday: Int,
val start_date: String,
val end_date: String,
)

View file

@ -1,13 +0,0 @@
package moe.lava.banksia.server.gtfs.structures
import kotlinx.serialization.Serializable
@Suppress("PropertyName")
@Serializable
data class GtfsShape(
val shape_id: String,
val shape_pt_lat: Double,
val shape_pt_lon: Double,
val shape_pt_sequence: Int,
val shape_dist_traveled: String,
)

View file

@ -1,17 +0,0 @@
package moe.lava.banksia.server.gtfs.structures
import kotlinx.serialization.Serializable
@Suppress("PropertyName")
@Serializable
data class GtfsStop(
val stop_id: String,
val stop_name: String,
val stop_lat: Double,
val stop_lon: Double,
val location_type: String,
val parent_station: String,
val wheelchair_boarding: String,
val level_id: String,
val platform_code: String,
)

View file

@ -1,25 +0,0 @@
package moe.lava.banksia.server.gtfs.structures
import kotlinx.serialization.Serializable
import moe.lava.banksia.model.FutureTime
@Suppress("PropertyName")
@Serializable
data class GtfsStopTime(
val trip_id: String,
val arrival_time: String,
val departure_time: String,
val stop_id: String,
val stop_sequence: Int,
val stop_headsign: String,
val pickup_type: Int,
val drop_off_type: Int,
val shape_dist_traveled: String,
) {
companion object {
fun parseGtfsTime(time: String): FutureTime {
val (hour, minute, second) = time.split(":").map { it.toInt() }
return FutureTime.from(hour, minute, second)
}
}
}

View file

@ -1,16 +0,0 @@
package moe.lava.banksia.server.gtfs.structures
import kotlinx.serialization.Serializable
@Suppress("PropertyName")
@Serializable
data class GtfsTrip(
val route_id: String,
val service_id: String,
val trip_id: String,
val shape_id: String,
val trip_headsign: String,
val direction_id: String,
val block_id: String,
val wheelchair_accessible: String,
)