fix(server/gtfs): update default gtfs version, and use buffered read

Stop times are now huge files and will oom, so buffer and use sequences
instead to save some memory
This commit is contained in:
Cilly Leang 2025-12-13 20:32:55 +11:00
parent 9acfb52de9
commit 302bda4f17
Signed by: cilly
GPG key ID: 6500251E087653C9
2 changed files with 62 additions and 22 deletions

View file

@ -50,7 +50,7 @@ fun Application.module() {
return@get
}
val datasetUuid = call.parameters["uuid"] ?: "e4966d78-dc64-4a1d-a751-2470c9eaf034"
val datasetUuid = call.parameters["uuid"] ?: "fb152201-859f-4882-9206-b768060b50ad"
val datasetUrl = call.parameters["url"]
?: "https://opendata.transport.vic.gov.au/dataset/3f4e292e-7f8a-4ffe-831f-1953be0fe448/resource/${datasetUuid}/download/gtfs.zip"
call.respondText("received")

View file

@ -11,6 +11,7 @@ import io.ktor.util.logging.Logger
import io.ktor.utils.io.copyAndClose
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.modules.EmptySerializersModule
import kotlinx.serialization.serializer
import moe.lava.banksia.Constants
import moe.lava.banksia.model.Route
import moe.lava.banksia.model.Shape
@ -163,30 +164,40 @@ class GtfsHandler(
private suspend fun addStopTimes(files: List<File>) {
val dao = db.stopTimeDao
log.info("parsing stop times...")
val stopTimes = files
.filter { it.name == "stop_times.txt" }
.flatMap { fd -> parseStopTimes(fd) }
log.info("inserting stop times...")
dao.deleteAll()
dao.insertOrReplaceAll(*stopTimes.map { it.asEntity() }.toTypedArray())
log.info("parsing stop times...")
files
.filter { it.name == "stop_times.txt" }
.forEach { fd ->
log.info("parsing stop times for ${fd.parent}...")
parseStopTimes(fd) { seq ->
seq.chunked(1000000)
.forEach { queue ->
log.info("converting stop times (${queue.size}) for ${fd.parent}...")
val conv = queue.map { it.asEntity() }.toTypedArray()
log.info("inserting stop times (${conv.size}) for ${fd.parent}...")
dao.insertOrReplaceAll(*conv)
}
}
}
}
private fun parseStopTimes(fd: File) =
fd.parseCsv<GtfsStopTime>()
.map { with(it) {
StopTime(
tripId = trip_id,
stopId = stop_id,
arrivalTime = GtfsStopTime.parseGtfsTime(arrival_time),
departureTime = GtfsStopTime.parseGtfsTime(departure_time),
headsign = stop_headsign,
pickupType = pickup_type,
dropOffType = drop_off_type,
)
} }
private inline fun parseStopTimes(fd: File, block: (Sequence<StopTime>) -> Unit) =
fd.parseCsvSequence<GtfsStopTime> { seq ->
seq
.map { with(it) {
StopTime(
tripId = trip_id,
stopId = stop_id,
arrivalTime = GtfsStopTime.parseGtfsTime(arrival_time),
departureTime = GtfsStopTime.parseGtfsTime(departure_time),
headsign = stop_headsign,
pickupType = pickup_type,
dropOffType = drop_off_type,
)
} }
.let { block(it) }
}
private suspend fun addTrips(files: List<File>) {
val dao = db.tripDao
@ -239,4 +250,33 @@ class GtfsHandler(
.replace("\uFEFF", "") // remove bom
.replace("\r\n", "\n") // crlf -> lf
.let { csv.decodeFromString(it) }
private inline fun <reified T> File.parseCsvSequence(block: (Sequence<T>) -> Unit) = this
.bufferedReader()
.use { reader ->
val iter = object : CharIterator() {
var next: Char? = null
override fun nextChar(): Char {
if (!hasNext()) {
throw NoSuchElementException()
}
val ret = next!!
next = null
return ret
}
override fun hasNext(): Boolean {
if (next == null) {
do {
next = null
val new = reader.read()
if (new != -1) {
next = new.toChar()
}
} while (next == '\uFEFF' || next == '\r')
}
return next != null
}
}
block(csv.decodeToSequence(iter, csv.serializersModule.serializer()))
}
}