feat(server): use lazy swappable database
This commit is contained in:
parent
959b022cf9
commit
0524eda5d2
5 changed files with 49 additions and 62 deletions
|
|
@ -135,7 +135,7 @@ class GtfsParser(
|
|||
.forEach { fd ->
|
||||
log.info("parsing stop times for ${fd.parent}...")
|
||||
parseStopTimes(fd, trips) { seq ->
|
||||
seq.chunked(10000)
|
||||
seq.chunked(1000000)
|
||||
.forEach { emit(GtfsData.StopTimeChunk(it)) }
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ import moe.lava.banksia.core.util.serialise
|
|||
import moe.lava.banksia.server.di.ServerModules
|
||||
import moe.lava.banksia.server.gtfsrt.GtfsrtService
|
||||
import org.koin.dsl.module
|
||||
import org.koin.ktor.ext.inject
|
||||
import org.koin.ktor.ext.get
|
||||
import org.koin.ktor.plugin.Koin
|
||||
import kotlin.time.Clock
|
||||
|
||||
|
|
@ -46,16 +46,14 @@ fun Application.module() {
|
|||
modules(ServerModules)
|
||||
}
|
||||
|
||||
val gtfsr by inject<GtfsrtService>()
|
||||
@Suppress("KotlinConstantConditions")
|
||||
launch { gtfsr.start(this, !Constants.devMode) }
|
||||
launch { get<GtfsrtService>().start(this, !Constants.devMode) }
|
||||
|
||||
routing {
|
||||
if (Constants.devMode) {
|
||||
get("/fixup") {
|
||||
call.respondText("received")
|
||||
val fixer by inject<GtfsDataFixer>()
|
||||
fixer.addParentsToStops()
|
||||
get<GtfsDataFixer>().addParentsToStops()
|
||||
}
|
||||
}
|
||||
get("/update") {
|
||||
|
|
@ -70,16 +68,13 @@ fun Application.module() {
|
|||
?: "https://opendata.transport.vic.gov.au/dataset/3f4e292e-7f8a-4ffe-831f-1953be0fe448/resource/${datasetUuid}/download/gtfs.zip"
|
||||
call.respondText("received")
|
||||
launch(context = Dispatchers.IO) {
|
||||
val fixer by inject<GtfsDataFixer>()
|
||||
val importer by inject<GtfsImporter>()
|
||||
importer.import(datasetUrl)
|
||||
|
||||
fixer.addParentsToStops()
|
||||
get<GtfsImporter>().import(datasetUrl)
|
||||
get<GtfsDataFixer>().addParentsToStops()
|
||||
}
|
||||
}
|
||||
|
||||
get("/metadata/{type?}") {
|
||||
val dao by inject<VersionMetadataDao>()
|
||||
val dao = get<VersionMetadataDao>()
|
||||
val type = call.parameters["type"]
|
||||
if (type == null) {
|
||||
call.respond(dao.getAll().map { it.asModel() })
|
||||
|
|
@ -96,7 +91,7 @@ fun Application.module() {
|
|||
|
||||
get("/routes") {
|
||||
val routes = withContext(context = Dispatchers.IO) {
|
||||
inject<RouteDao>().value.getAll()
|
||||
get<RouteDao>().getAll()
|
||||
}
|
||||
val res = routes.map { it.asModel() }
|
||||
call.respond(res)
|
||||
|
|
@ -104,7 +99,7 @@ fun Application.module() {
|
|||
get("/routes/{route_id}") {
|
||||
val routeId = call.parameters["route_id"]!!
|
||||
val route = withContext(context = Dispatchers.IO) {
|
||||
inject<RouteDao>().value.get(routeId)
|
||||
get<RouteDao>().get(routeId)
|
||||
}
|
||||
if (route != null)
|
||||
call.respond(route.asModel())
|
||||
|
|
@ -113,7 +108,7 @@ fun Application.module() {
|
|||
}
|
||||
get("/stops") {
|
||||
val routes = withContext(context = Dispatchers.IO) {
|
||||
inject<StopDao>().value.getAll()
|
||||
get<StopDao>().getAll()
|
||||
}
|
||||
val res = routes.map { it.asModel() }
|
||||
call.respond(res)
|
||||
|
|
@ -121,7 +116,7 @@ fun Application.module() {
|
|||
get("/stops/{stop_id}") {
|
||||
val stopId = call.parameters["stop_id"]!!
|
||||
val stop = withContext(context = Dispatchers.IO) {
|
||||
inject<StopDao>().value.get(stopId)
|
||||
get<StopDao>().get(stopId)
|
||||
}
|
||||
if (stop != null)
|
||||
call.respond(stop.asModel())
|
||||
|
|
@ -132,7 +127,7 @@ fun Application.module() {
|
|||
val routeId = call.parameters["route_id"]!!
|
||||
val useParent = call.queryParameters["parent"] !in listOf("false", "0")
|
||||
val stops = withContext(Dispatchers.IO) {
|
||||
val routeDao by inject<RouteDao>()
|
||||
val routeDao = get<RouteDao>()
|
||||
if (useParent)
|
||||
routeDao.stopsParent(routeId)
|
||||
else
|
||||
|
|
@ -146,7 +141,7 @@ fun Application.module() {
|
|||
?.let { LocalDate.parse(it, LocalDate.Formats.ISO) }
|
||||
?: Clock.System.todayIn(TimeZone.currentSystemDefault())
|
||||
val times = withContext(context = Dispatchers.IO) {
|
||||
inject<StopTimeDao>().value
|
||||
get<StopTimeDao>()
|
||||
.getForStopDated(
|
||||
stopId,
|
||||
listOf(date.dayOfWeek).serialise(),
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ class GtfsDataFixer(
|
|||
name = name,
|
||||
lat = avgLat,
|
||||
lng = avgLng,
|
||||
parent = "",
|
||||
parent = null,
|
||||
hasWheelChairBoarding = stops.all { it.hasWheelChairBoarding },
|
||||
level = "",
|
||||
platformCode = "",
|
||||
|
|
|
|||
|
|
@ -1,7 +1,5 @@
|
|||
package moe.lava.banksia.server
|
||||
|
||||
import androidx.room.immediateTransaction
|
||||
import androidx.room.useWriterConnection
|
||||
import io.ktor.util.logging.Logger
|
||||
import moe.lava.banksia.core.model.Route
|
||||
import moe.lava.banksia.core.model.Service
|
||||
|
|
@ -11,6 +9,7 @@ import moe.lava.banksia.core.model.Stop
|
|||
import moe.lava.banksia.core.model.StopTime
|
||||
import moe.lava.banksia.core.model.Trip
|
||||
import moe.lava.banksia.core.room.Database
|
||||
import moe.lava.banksia.core.room.DatabaseManager
|
||||
import moe.lava.banksia.core.room.entity.asEntity
|
||||
import moe.lava.banksia.server.gtfs.GtfsData
|
||||
import moe.lava.banksia.server.gtfs.GtfsParser
|
||||
|
|
@ -18,74 +17,66 @@ import kotlin.time.Clock
|
|||
|
||||
class GtfsImporter(
|
||||
private val parser: GtfsParser,
|
||||
private val database: Database,
|
||||
private val dbm: DatabaseManager,
|
||||
private val log: Logger,
|
||||
) {
|
||||
suspend fun import(url: String, date: Long = Clock.System.now().epochSeconds) {
|
||||
database.useWriterConnection { transactor ->
|
||||
transactor.immediateTransaction {
|
||||
database.routeDao.deleteAll()
|
||||
database.serviceDao.deleteAll()
|
||||
database.serviceExceptionDao.deleteAll()
|
||||
database.shapeDao.deleteAll()
|
||||
database.stopDao.deleteAll()
|
||||
database.stopTimeDao.deleteAll()
|
||||
database.tripDao.deleteAll()
|
||||
val database = dbm.makeAlt()
|
||||
|
||||
parser.update(url).collect { chunk ->
|
||||
when (chunk) {
|
||||
is GtfsData.RouteChunk -> addRoutes(chunk.routes)
|
||||
is GtfsData.ServiceChunk -> addServices(chunk.services)
|
||||
is GtfsData.ServiceExceptionChunk -> addServiceExceptions(chunk.exceptions)
|
||||
is GtfsData.ShapeChunk -> addShapes(chunk.shapes)
|
||||
is GtfsData.StopChunk -> addStops(chunk.stops)
|
||||
is GtfsData.StopTimeChunk -> addStopTimes(chunk.stopTimes)
|
||||
is GtfsData.TripChunk -> addTrips(chunk.trips)
|
||||
is GtfsData.RouteChunk -> database.addRoutes(chunk.routes)
|
||||
is GtfsData.ServiceChunk -> database.addServices(chunk.services)
|
||||
is GtfsData.ServiceExceptionChunk -> database.addServiceExceptions(chunk.exceptions)
|
||||
is GtfsData.ShapeChunk -> database.addShapes(chunk.shapes)
|
||||
is GtfsData.StopChunk -> database.addStops(chunk.stops)
|
||||
is GtfsData.StopTimeChunk -> database.addStopTimes(chunk.stopTimes)
|
||||
is GtfsData.TripChunk -> database.addTrips(chunk.trips)
|
||||
}
|
||||
}
|
||||
|
||||
updateMetadata(date)
|
||||
}
|
||||
}
|
||||
database.updateMetadata(date)
|
||||
database.close()
|
||||
dbm.swap()
|
||||
}
|
||||
|
||||
private suspend fun updateMetadata(date: Long) {
|
||||
val dao = database.versionMetadataDao
|
||||
private suspend fun Database.updateMetadata(date: Long) {
|
||||
val dao = versionMetadataDao
|
||||
log.info("updating metadata...")
|
||||
dao.update(date, listOf("routes", "stops", "shapes", "trips", "stop_times"))
|
||||
log.info("done")
|
||||
}
|
||||
|
||||
private suspend fun addRoutes(routes: List<Route>) {
|
||||
val dao = database.routeDao
|
||||
private suspend fun Database.addRoutes(routes: List<Route>) {
|
||||
val dao = routeDao
|
||||
log.info("inserting routes...")
|
||||
dao.insertOrReplaceAll(*routes.map { it.asEntity() }.toTypedArray())
|
||||
log.info("done")
|
||||
}
|
||||
|
||||
private suspend fun addServices(services: List<Service>) {
|
||||
val dao = database.serviceDao
|
||||
private suspend fun Database.addServices(services: List<Service>) {
|
||||
val dao = serviceDao
|
||||
log.info("inserting services...")
|
||||
dao.insertOrReplaceAll(*services.map { it.asEntity() }.toTypedArray())
|
||||
log.info("done")
|
||||
}
|
||||
|
||||
private suspend fun addServiceExceptions(exceptions: List<ServiceException>) {
|
||||
val dao = database.serviceExceptionDao
|
||||
private suspend fun Database.addServiceExceptions(exceptions: List<ServiceException>) {
|
||||
val dao = serviceExceptionDao
|
||||
log.info("inserting exceptions...")
|
||||
dao.insertOrReplaceAll(*exceptions.map { it.asEntity() }.toTypedArray())
|
||||
log.info("done")
|
||||
}
|
||||
|
||||
private suspend fun addShapes(shapes: List<Shape>) {
|
||||
val dao = database.shapeDao
|
||||
private suspend fun Database.addShapes(shapes: List<Shape>) {
|
||||
val dao = shapeDao
|
||||
log.info("inserting shapes...")
|
||||
dao.insertOrReplaceAll(*shapes.map { it.asEntity() }.toTypedArray())
|
||||
log.info("done")
|
||||
}
|
||||
|
||||
private suspend fun addStops(stops: List<Stop>) {
|
||||
val dao = database.stopDao
|
||||
private suspend fun Database.addStops(stops: List<Stop>) {
|
||||
val dao = stopDao
|
||||
log.info("inserting stops...")
|
||||
stops
|
||||
.groupBy { it.id }
|
||||
|
|
@ -102,15 +93,15 @@ class GtfsImporter(
|
|||
log.info("done")
|
||||
}
|
||||
|
||||
private suspend fun addStopTimes(stopTimes: List<StopTime>) {
|
||||
val dao = database.stopTimeDao
|
||||
private suspend fun Database.addStopTimes(stopTimes: List<StopTime>) {
|
||||
val dao = stopTimeDao
|
||||
log.info("inserting ${stopTimes.size} stoptimes...")
|
||||
dao.insertOrReplaceAll(*stopTimes.map { it.asEntity() }.toTypedArray())
|
||||
log.info("done")
|
||||
}
|
||||
|
||||
private suspend fun addTrips(trips: List<Trip>) {
|
||||
val dao = database.tripDao
|
||||
private suspend fun Database.addTrips(trips: List<Trip>) {
|
||||
val dao = tripDao
|
||||
log.info("inserting ${trips.size} trips...")
|
||||
dao.insertOrReplaceAll(*trips.map { it.asEntity() }.toTypedArray())
|
||||
log.info("done")
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import moe.lava.banksia.server.GtfsDataFixer
|
|||
import moe.lava.banksia.server.GtfsImporter
|
||||
import moe.lava.banksia.server.gtfs.GtfsParser
|
||||
import moe.lava.banksia.server.gtfsrt.GtfsrtService
|
||||
import org.koin.core.module.dsl.factoryOf
|
||||
import org.koin.core.module.dsl.singleOf
|
||||
import org.koin.dsl.module
|
||||
|
||||
|
|
@ -16,6 +17,6 @@ val ServerModules = module {
|
|||
singleOf(::GtfsParser)
|
||||
singleOf(::GtfsrtService)
|
||||
|
||||
singleOf(::GtfsDataFixer)
|
||||
singleOf(::GtfsImporter)
|
||||
factoryOf(::GtfsDataFixer)
|
||||
factoryOf(::GtfsImporter)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue