refactor: optimisation around stoptimes
- moved stoptime related functionality into new core:data:stoptime module
- will feature all the different realtime stoptime sources to be
integrated later
- create proper database schema for future migrations
- deduplicate trips into stoppingpatterns, since many trips share the
exact same stopping pattern
- stoptimes are now linked to stoppingpatterns instead
- stoppingpattern ids are generated from a hash composed of all stoptimes
- stoptimes now use deltas for arrival time to save space
This commit is contained in:
parent
f1770744db
commit
102c028407
39 changed files with 396 additions and 223 deletions
|
|
@ -18,6 +18,7 @@ import kotlinx.serialization.decodeFromString
|
|||
import kotlinx.serialization.modules.EmptySerializersModule
|
||||
import kotlinx.serialization.serializer
|
||||
import moe.lava.banksia.core.Constants
|
||||
import moe.lava.banksia.core.model.FutureTime.Companion.asInt
|
||||
import moe.lava.banksia.core.model.Route
|
||||
import moe.lava.banksia.core.model.RouteType
|
||||
import moe.lava.banksia.core.model.Service
|
||||
|
|
@ -25,6 +26,8 @@ import moe.lava.banksia.core.model.ServiceException
|
|||
import moe.lava.banksia.core.model.Shape
|
||||
import moe.lava.banksia.core.model.Stop
|
||||
import moe.lava.banksia.core.model.StopTime
|
||||
import moe.lava.banksia.core.model.StoppingPattern
|
||||
import moe.lava.banksia.core.model.TimeType
|
||||
import moe.lava.banksia.core.model.Trip
|
||||
import moe.lava.banksia.core.util.Point
|
||||
import moe.lava.banksia.server.gtfs.structures.GtfsRoute
|
||||
|
|
@ -35,6 +38,8 @@ import moe.lava.banksia.server.gtfs.structures.GtfsStop
|
|||
import moe.lava.banksia.server.gtfs.structures.GtfsStopTime
|
||||
import moe.lava.banksia.server.gtfs.structures.GtfsTrip
|
||||
import java.io.File
|
||||
import java.nio.ByteBuffer
|
||||
import java.security.MessageDigest
|
||||
import java.util.zip.ZipFile
|
||||
import kotlin.time.ExperimentalTime
|
||||
|
||||
|
|
@ -46,8 +51,7 @@ sealed class GtfsData {
|
|||
data class ServiceExceptionChunk(val exceptions: List<ServiceException>) : GtfsData()
|
||||
data class ShapeChunk(val shapes: List<Shape>) : GtfsData()
|
||||
data class StopChunk(val stops: List<Stop>) : GtfsData()
|
||||
data class StopTimeChunk(val stopTimes: List<StopTime>) : GtfsData()
|
||||
data class TripChunk(val trips: List<Trip>) : GtfsData()
|
||||
data class TripChunk(val trips: List<Trip.Undated>) : GtfsData()
|
||||
}
|
||||
|
||||
class GtfsParser(
|
||||
|
|
@ -129,7 +133,6 @@ class GtfsParser(
|
|||
.filter { it.name == "trips.txt" }
|
||||
.flatMap { fd ->
|
||||
parseTrips(fd, services)
|
||||
.also { emit(GtfsData.TripChunk(it)) }
|
||||
}
|
||||
.associateBy { it.id }
|
||||
|
||||
|
|
@ -137,13 +140,53 @@ class GtfsParser(
|
|||
.filter { it.name == "stop_times.txt" }
|
||||
.forEach { fd ->
|
||||
log.info("parsing stop times for ${fd.parent}...")
|
||||
parseStopTimes(fd, trips) { seq ->
|
||||
seq.chunked(1000000)
|
||||
.forEach { emit(GtfsData.StopTimeChunk(it)) }
|
||||
parseStopTimes(fd) { seq ->
|
||||
val times = ArrayList<Pair<String, StopTime.Undated>>(1000100)
|
||||
seq.forEach { pair ->
|
||||
val (_, stoptime) = pair
|
||||
if (times.size > 1000000 && stoptime.patternId == 1L) {
|
||||
emit(GtfsData.TripChunk(processStoptimes(trips, times)))
|
||||
times.clear()
|
||||
}
|
||||
|
||||
times.add(pair)
|
||||
}
|
||||
emit(GtfsData.TripChunk(processStoptimes(trips, times)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun hashCalc(headsign: String, stops: List<StopTime.Undated>): Long {
|
||||
val inst = MessageDigest.getInstance("SHA-256")
|
||||
inst.update(headsign.toByteArray())
|
||||
stops.forEach {
|
||||
inst.update(it.stopId.toByteArray())
|
||||
val dint = it.time.departure.asInt()
|
||||
inst.update((dint).toByte())
|
||||
inst.update((dint shr 8).toByte())
|
||||
val aint = it.time.arrival.asInt()
|
||||
inst.update((aint).toByte())
|
||||
inst.update((aint shr 8).toByte())
|
||||
}
|
||||
|
||||
val buf = inst.digest().slice(0..7).toByteArray()
|
||||
buf[0] = 0
|
||||
buf[1] = 0
|
||||
return ByteBuffer.wrap(buf).long
|
||||
}
|
||||
|
||||
private fun processStoptimes(trips: Map<String, Trip.Undated>, times: ArrayList<Pair<String, StopTime.Undated>>) =
|
||||
times.groupBy { it.first }
|
||||
.map { (tripId, pairs) ->
|
||||
val trip = trips[tripId]!!
|
||||
val stoptimes = pairs.map { it.second }
|
||||
val hash = hashCalc(trip.pattern.headsign, stoptimes)
|
||||
trip.copy(pattern = trip.pattern.copy(
|
||||
id = hash,
|
||||
stoptimes = stoptimes.map { it.copy(patternId = hash) }
|
||||
))
|
||||
}
|
||||
|
||||
private fun parseRoutes(fd: File) =
|
||||
fd.parseCsv<GtfsRoute>()
|
||||
.map { with(it) {
|
||||
|
|
@ -180,16 +223,17 @@ class GtfsParser(
|
|||
)
|
||||
} }
|
||||
|
||||
private inline fun parseStopTimes(fd: File, trips: Map<String, Trip>, block: (Sequence<StopTime>) -> Unit) =
|
||||
private inline fun parseStopTimes(fd: File, block: (Sequence<Pair<String, StopTime.Undated>>) -> Unit) =
|
||||
fd.parseCsvSequence<GtfsStopTime> { seq ->
|
||||
seq
|
||||
.map { with(it) {
|
||||
StopTime(
|
||||
tripId = trip_id,
|
||||
it.trip_id to StopTime(
|
||||
patternId = stop_sequence,
|
||||
stopId = stop_id,
|
||||
arrivalTime = GtfsStopTime.parseGtfsTime(arrival_time),
|
||||
departureTime = GtfsStopTime.parseGtfsTime(departure_time),
|
||||
headsign = stop_headsign.ifEmpty { trips[trip_id]!!.tripHeadsign },
|
||||
time = TimeType.Undated(
|
||||
arrival = GtfsStopTime.parseGtfsTime(arrival_time),
|
||||
departure = GtfsStopTime.parseGtfsTime(departure_time),
|
||||
),
|
||||
pickupType = pickup_type,
|
||||
dropOffType = drop_off_type,
|
||||
)
|
||||
|
|
@ -230,15 +274,19 @@ class GtfsParser(
|
|||
private fun parseTrips(fd: File, services: Map<String, Service>) =
|
||||
fd.parseCsv<GtfsTrip>()
|
||||
.map { with(it) {
|
||||
Trip(
|
||||
Trip.Undated(
|
||||
id = trip_id,
|
||||
routeId = route_id,
|
||||
pattern = StoppingPattern(
|
||||
id = 0,
|
||||
routeId = route_id,
|
||||
shapeId = shape_id,
|
||||
headsign = trip_headsign,
|
||||
wheelchairAccessible = wheelchair_accessible == "1",
|
||||
stoptimes = listOf()
|
||||
),
|
||||
service = services["${fd.parentFile.name}_${service_id}"]!!,
|
||||
shapeId = shape_id,
|
||||
tripHeadsign = trip_headsign,
|
||||
directionId = direction_id,
|
||||
directionId = direction_id.toInt(),
|
||||
blockId = block_id.ifEmpty { null },
|
||||
wheelchairAccessible = wheelchair_accessible == "1",
|
||||
)
|
||||
} }
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ internal data class GtfsStopTime(
|
|||
val arrival_time: String,
|
||||
val departure_time: String,
|
||||
val stop_id: String,
|
||||
val stop_sequence: Int,
|
||||
val stop_sequence: Long,
|
||||
val stop_headsign: String,
|
||||
val pickup_type: Int,
|
||||
val drop_off_type: Int,
|
||||
|
|
|
|||
|
|
@ -151,7 +151,7 @@ fun Application.module() {
|
|||
)
|
||||
.executeAsList()
|
||||
.map { it.asModel().atDate(date) }
|
||||
.sortedBy { it.departureTime }
|
||||
.sortedBy { it.time.departure }
|
||||
}
|
||||
call.respond(times)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ import moe.lava.banksia.core.model.Service
|
|||
import moe.lava.banksia.core.model.ServiceException
|
||||
import moe.lava.banksia.core.model.Shape
|
||||
import moe.lava.banksia.core.model.Stop
|
||||
import moe.lava.banksia.core.model.StopTime
|
||||
import moe.lava.banksia.core.model.Trip
|
||||
import moe.lava.banksia.core.sqld.DatabaseManager
|
||||
import moe.lava.banksia.core.sqld.mappers.asDb
|
||||
|
|
@ -30,7 +29,6 @@ class GtfsImporter(
|
|||
is GtfsData.ServiceExceptionChunk -> database.addServiceExceptions(chunk.exceptions)
|
||||
is GtfsData.ShapeChunk -> database.addShapes(chunk.shapes)
|
||||
is GtfsData.StopChunk -> database.addStops(chunk.stops)
|
||||
is GtfsData.StopTimeChunk -> database.addStopTimes(chunk.stopTimes)
|
||||
is GtfsData.TripChunk -> database.addTrips(chunk.trips)
|
||||
}
|
||||
}
|
||||
|
|
@ -101,21 +99,15 @@ class GtfsImporter(
|
|||
log.info("done")
|
||||
}
|
||||
|
||||
private fun Database.addStopTimes(stopTimes: List<StopTime>) {
|
||||
log.info("inserting ${stopTimes.size} stoptimes...")
|
||||
stopTimeQueries.transaction {
|
||||
stopTimes.forEach {
|
||||
stopTimeQueries.insert(it.asDb())
|
||||
}
|
||||
}
|
||||
log.info("done")
|
||||
}
|
||||
|
||||
private fun Database.addTrips(trips: List<Trip>) {
|
||||
private fun Database.addTrips(trips: List<Trip.Undated>) {
|
||||
log.info("inserting ${trips.size} trips...")
|
||||
tripQueries.transaction {
|
||||
trips.forEach {
|
||||
tripQueries.insert(it.asDb())
|
||||
transaction {
|
||||
trips.forEach { trip ->
|
||||
stoppingPatternQueries.insert(trip.pattern.asDb())
|
||||
trip.pattern.stoptimes.forEach { stoptime ->
|
||||
stopTimeQueries.insert(stoptime.asDb())
|
||||
}
|
||||
tripQueries.insert(trip.asDb())
|
||||
}
|
||||
}
|
||||
log.info("done")
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@
|
|||
<pattern>%d{YYYY-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
<root level="trace">
|
||||
<root level="debug">
|
||||
<appender-ref ref="FILE"/>
|
||||
<appender-ref ref="STDOUT"/>
|
||||
</root>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue