diff --git a/server/src/main/kotlin/moe/lava/banksia/server/Application.kt b/server/src/main/kotlin/moe/lava/banksia/server/Application.kt index 0c401ae..11c8c2b 100644 --- a/server/src/main/kotlin/moe/lava/banksia/server/Application.kt +++ b/server/src/main/kotlin/moe/lava/banksia/server/Application.kt @@ -50,7 +50,7 @@ fun Application.module() { return@get } - val datasetUuid = call.parameters["uuid"] ?: "e4966d78-dc64-4a1d-a751-2470c9eaf034" + val datasetUuid = call.parameters["uuid"] ?: "fb152201-859f-4882-9206-b768060b50ad" val datasetUrl = call.parameters["url"] ?: "https://opendata.transport.vic.gov.au/dataset/3f4e292e-7f8a-4ffe-831f-1953be0fe448/resource/${datasetUuid}/download/gtfs.zip" call.respondText("received") diff --git a/server/src/main/kotlin/moe/lava/banksia/server/gtfs/GtfsHandler.kt b/server/src/main/kotlin/moe/lava/banksia/server/gtfs/GtfsHandler.kt index 2d15b16..7bc493e 100644 --- a/server/src/main/kotlin/moe/lava/banksia/server/gtfs/GtfsHandler.kt +++ b/server/src/main/kotlin/moe/lava/banksia/server/gtfs/GtfsHandler.kt @@ -11,6 +11,7 @@ import io.ktor.util.logging.Logger import io.ktor.utils.io.copyAndClose import kotlinx.serialization.decodeFromString import kotlinx.serialization.modules.EmptySerializersModule +import kotlinx.serialization.serializer import moe.lava.banksia.Constants import moe.lava.banksia.model.Route import moe.lava.banksia.model.Shape @@ -163,30 +164,40 @@ class GtfsHandler( private suspend fun addStopTimes(files: List) { val dao = db.stopTimeDao - log.info("parsing stop times...") - val stopTimes = files - .filter { it.name == "stop_times.txt" } - .flatMap { fd -> parseStopTimes(fd) } - - log.info("inserting stop times...") dao.deleteAll() - dao.insertOrReplaceAll(*stopTimes.map { it.asEntity() }.toTypedArray()) + log.info("parsing stop times...") + files + .filter { it.name == "stop_times.txt" } + .forEach { fd -> + log.info("parsing stop times for ${fd.parent}...") + parseStopTimes(fd) { seq -> + seq.chunked(1000000) + .forEach { queue -> + log.info("converting stop times (${queue.size}) for ${fd.parent}...") + val conv = queue.map { it.asEntity() }.toTypedArray() + log.info("inserting stop times (${conv.size}) for ${fd.parent}...") + dao.insertOrReplaceAll(*conv) + } + } + } } - private fun parseStopTimes(fd: File) = - fd.parseCsv() - .map { with(it) { - StopTime( - tripId = trip_id, - stopId = stop_id, - arrivalTime = GtfsStopTime.parseGtfsTime(arrival_time), - departureTime = GtfsStopTime.parseGtfsTime(departure_time), - headsign = stop_headsign, - pickupType = pickup_type, - dropOffType = drop_off_type, - ) - } } - + private inline fun parseStopTimes(fd: File, block: (Sequence) -> Unit) = + fd.parseCsvSequence { seq -> + seq + .map { with(it) { + StopTime( + tripId = trip_id, + stopId = stop_id, + arrivalTime = GtfsStopTime.parseGtfsTime(arrival_time), + departureTime = GtfsStopTime.parseGtfsTime(departure_time), + headsign = stop_headsign, + pickupType = pickup_type, + dropOffType = drop_off_type, + ) + } } + .let { block(it) } + } private suspend fun addTrips(files: List) { val dao = db.tripDao @@ -239,4 +250,33 @@ class GtfsHandler( .replace("\uFEFF", "") // remove bom .replace("\r\n", "\n") // crlf -> lf .let { csv.decodeFromString(it) } + + private inline fun File.parseCsvSequence(block: (Sequence) -> Unit) = this + .bufferedReader() + .use { reader -> + val iter = object : CharIterator() { + var next: Char? = null + override fun nextChar(): Char { + if (!hasNext()) { + throw NoSuchElementException() + } + val ret = next!! + next = null + return ret + } + override fun hasNext(): Boolean { + if (next == null) { + do { + next = null + val new = reader.read() + if (new != -1) { + next = new.toChar() + } + } while (next == '\uFEFF' || next == '\r') + } + return next != null + } + } + block(csv.decodeToSequence(iter, csv.serializersModule.serializer())) + } }