fix(server/gtfs): chunk stop times into smaller blocks

This commit is contained in:
Cilly Leang 2026-04-01 17:23:59 +11:00
parent 91d4fe25a6
commit c9aeeb99c1
Signed by: cilly
GPG key ID: 6500251E087653C9
2 changed files with 9 additions and 2 deletions

View file

@ -128,7 +128,7 @@ class GtfsParser(
.forEach { fd -> .forEach { fd ->
log.info("parsing stop times for ${fd.parent}...") log.info("parsing stop times for ${fd.parent}...")
parseStopTimes(fd, trips) { seq -> parseStopTimes(fd, trips) { seq ->
seq.chunked(1000000) seq.chunked(10000)
.forEach { emit(GtfsData.StopTimeChunk(it)) } .forEach { emit(GtfsData.StopTimeChunk(it)) }
} }
} }

View file

@ -36,7 +36,7 @@ class GtfsImporter(
is GtfsData.ServiceChunk -> addServices(chunk.services) is GtfsData.ServiceChunk -> addServices(chunk.services)
is GtfsData.ShapeChunk -> addShapes(chunk.shapes) is GtfsData.ShapeChunk -> addShapes(chunk.shapes)
is GtfsData.StopChunk -> addStops(chunk.stops) is GtfsData.StopChunk -> addStops(chunk.stops)
is GtfsData.StopTimeChunk -> addStopTimes(chunk.stopTimes) is GtfsData.StopTimeChunk -> addStopTimes(chunk.stopTimes)
is GtfsData.TripChunk -> addTrips(chunk.trips) is GtfsData.TripChunk -> addTrips(chunk.trips)
} }
} }
@ -50,24 +50,28 @@ class GtfsImporter(
val dao = database.versionMetadataDao val dao = database.versionMetadataDao
log.info("updating metadata...") log.info("updating metadata...")
dao.update(date, listOf("routes", "stops", "shapes", "trips", "stop_times")) dao.update(date, listOf("routes", "stops", "shapes", "trips", "stop_times"))
log.info("done")
} }
private suspend fun addRoutes(routes: List<Route>) { private suspend fun addRoutes(routes: List<Route>) {
val dao = database.routeDao val dao = database.routeDao
log.info("inserting routes...") log.info("inserting routes...")
dao.insertOrReplaceAll(*routes.map { it.asEntity() }.toTypedArray()) dao.insertOrReplaceAll(*routes.map { it.asEntity() }.toTypedArray())
log.info("done")
} }
private suspend fun addServices(services: List<Service>) { private suspend fun addServices(services: List<Service>) {
val dao = database.serviceDao val dao = database.serviceDao
log.info("inserting services...") log.info("inserting services...")
dao.insertOrReplaceAll(*services.map { it.asEntity() }.toTypedArray()) dao.insertOrReplaceAll(*services.map { it.asEntity() }.toTypedArray())
log.info("done")
} }
private suspend fun addShapes(shapes: List<Shape>) { private suspend fun addShapes(shapes: List<Shape>) {
val dao = database.shapeDao val dao = database.shapeDao
log.info("inserting shapes...") log.info("inserting shapes...")
dao.insertOrReplaceAll(*shapes.map { it.asEntity() }.toTypedArray()) dao.insertOrReplaceAll(*shapes.map { it.asEntity() }.toTypedArray())
log.info("done")
} }
private suspend fun addStops(stops: List<Stop>) { private suspend fun addStops(stops: List<Stop>) {
@ -85,17 +89,20 @@ class GtfsImporter(
} }
} }
dao.insertOrReplaceAll(*stops.map { it.asEntity() }.toTypedArray()) dao.insertOrReplaceAll(*stops.map { it.asEntity() }.toTypedArray())
log.info("done")
} }
private suspend fun addStopTimes(stopTimes: List<StopTime>) { private suspend fun addStopTimes(stopTimes: List<StopTime>) {
val dao = database.stopTimeDao val dao = database.stopTimeDao
log.info("inserting ${stopTimes.size} stoptimes...") log.info("inserting ${stopTimes.size} stoptimes...")
dao.insertOrReplaceAll(*stopTimes.map { it.asEntity() }.toTypedArray()) dao.insertOrReplaceAll(*stopTimes.map { it.asEntity() }.toTypedArray())
log.info("done")
} }
private suspend fun addTrips(trips: List<Trip>) { private suspend fun addTrips(trips: List<Trip>) {
val dao = database.tripDao val dao = database.tripDao
log.info("inserting ${trips.size} trips...") log.info("inserting ${trips.size} trips...")
dao.insertOrReplaceAll(*trips.map { it.asEntity() }.toTypedArray()) dao.insertOrReplaceAll(*trips.map { it.asEntity() }.toTypedArray())
log.info("done")
} }
} }