diff --git a/model/config.go b/model/config.go index 204a5cf3..af40fa51 100644 --- a/model/config.go +++ b/model/config.go @@ -10,19 +10,21 @@ import ( ) type Config struct { - Finder Finder - RTFinder RTFinder - GbfsFinder GbfsFinder - Checker Checker - Actions Actions - JobQueue jobs.JobQueue - Clock clock.Clock - Secrets []dmfr.Secret - ValidateLargeFiles bool - DisableImage bool - RestPrefix string - Storage string - RTStorage string + Finder Finder + RTFinder RTFinder + GbfsFinder GbfsFinder + Checker Checker + Actions Actions + JobQueue jobs.JobQueue + Clock clock.Clock + Secrets []dmfr.Secret + ValidateLargeFiles bool + DisableImage bool + RestPrefix string + Storage string + RTStorage string + LoaderBatchSize int + LoaderStopTimeBatchSize int } var finderCtxKey = &contextKey{"finderConfig"} diff --git a/server/gql/loaders.go b/server/gql/loaders.go index b8ab4748..11f0714c 100644 --- a/server/gql/loaders.go +++ b/server/gql/loaders.go @@ -88,70 +88,76 @@ type Loaders struct { } // NewLoaders instantiates data loaders for the middleware -func NewLoaders(dbf model.Finder) *Loaders { +func NewLoaders(dbf model.Finder, batchSize int, stopTimeBatchSize int) *Loaders { + if batchSize == 0 { + batchSize = maxBatch + } + if stopTimeBatchSize == 0 { + stopTimeBatchSize = maxBatch + } loaders := &Loaders{ - AgenciesByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.AgenciesByFeedVersionID), - AgenciesByID: withWaitAndCapacity(waitTime, maxBatch, dbf.AgenciesByID), - AgenciesByOnestopID: withWaitAndCapacity(waitTime, maxBatch, dbf.AgenciesByOnestopID), - AgencyPlacesByAgencyID: withWaitAndCapacity(waitTime, maxBatch, dbf.AgencyPlacesByAgencyID), - CalendarDatesByServiceID: withWaitAndCapacity(waitTime, maxBatch, dbf.CalendarDatesByServiceID), - CalendarsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.CalendarsByID), - CensusGeographiesByEntityID: withWaitAndCapacity(waitTime, maxBatch, dbf.CensusGeographiesByEntityID), - CensusTableByID: withWaitAndCapacity(waitTime, maxBatch, dbf.CensusTableByID), - CensusFieldsByTableID: withWaitAndCapacity(waitTime, maxBatch, dbf.CensusFieldsByTableID), - CensusValuesByGeographyID: withWaitAndCapacity(waitTime, maxBatch, dbf.CensusValuesByGeographyID), - FeedFetchesByFeedID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedFetchesByFeedID), - FeedInfosByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedInfosByFeedVersionID), - FeedsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedsByID), - FeedsByOperatorOnestopID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedsByOperatorOnestopID), - FeedStatesByFeedID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedStatesByFeedID), - FeedVersionFileInfosByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionFileInfosByFeedVersionID), - FeedVersionGeometryByID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionGeometryByID), - FeedVersionGtfsImportByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionGtfsImportByFeedVersionID), FeedVersionServiceWindowByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionServiceWindowByFeedVersionID), - FeedVersionsByFeedID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionsByFeedID), - FeedVersionsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionsByID), - FeedVersionServiceLevelsByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionServiceLevelsByFeedVersionID), - FrequenciesByTripID: withWaitAndCapacity(waitTime, maxBatch, dbf.FrequenciesByTripID), - LevelsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.LevelsByID), - LevelsByParentStationID: withWaitAndCapacity(waitTime, maxBatch, dbf.LevelsByParentStationID), - OperatorsByAgencyID: withWaitAndCapacity(waitTime, maxBatch, dbf.OperatorsByAgencyID), - OperatorsByCOIF: withWaitAndCapacity(waitTime, maxBatch, dbf.OperatorsByCOIF), - OperatorsByFeedID: withWaitAndCapacity(waitTime, maxBatch, dbf.OperatorsByFeedID), - PathwaysByFromStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.PathwaysByFromStopID), - PathwaysByID: withWaitAndCapacity(waitTime, maxBatch, dbf.PathwaysByID), - PathwaysByToStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.PathwaysByToStopID), - RouteAttributesByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.RouteAttributesByRouteID), - RouteGeometriesByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.RouteGeometriesByRouteID), - RouteHeadwaysByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.RouteHeadwaysByRouteID), - RoutesByAgencyID: withWaitAndCapacity(waitTime, maxBatch, dbf.RoutesByAgencyID), - RoutesByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.RoutesByFeedVersionID), - RoutesByID: withWaitAndCapacity(waitTime, maxBatch, dbf.RoutesByID), - RouteStopPatternsByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.RouteStopPatternsByRouteID), - RouteStopsByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.RouteStopsByRouteID), - RouteStopsByStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.RouteStopsByStopID), - SegmentPatternsByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.SegmentPatternsByRouteID), - SegmentPatternsBySegmentID: withWaitAndCapacity(waitTime, maxBatch, dbf.SegmentPatternsBySegmentID), - SegmentsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.SegmentsByID), - SegmentsByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.SegmentsByRouteID), - SegmentsByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.SegmentsByFeedVersionID), - ShapesByID: withWaitAndCapacity(waitTime, maxBatch, dbf.ShapesByID), - StopExternalReferencesByStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopExternalReferencesByStopID), - StopObservationsByStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopObservationsByStopID), - StopPlacesByStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopPlacesByStopID), - StopsByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopsByFeedVersionID), - StopsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopsByID), - StopsByLevelID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopsByLevelID), - StopsByParentStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopsByParentStopID), - StopsByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopsByRouteID), - StopTimesByStopID: withWaitAndCapacity(waitTime, 1, dbf.StopTimesByStopID), - StopTimesByTripID: withWaitAndCapacity(waitTime, maxBatch, dbf.StopTimesByTripID), - TargetStopsByStopID: withWaitAndCapacity(waitTime, maxBatch, dbf.TargetStopsByStopID), - TripsByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.TripsByFeedVersionID), - TripsByID: withWaitAndCapacity(waitTime, maxBatch, dbf.TripsByID), - TripsByRouteID: withWaitAndCapacity(waitTime, maxBatch, dbf.TripsByRouteID), - ValidationReportErrorExemplarsByValidationReportErrorGroupID: withWaitAndCapacity(waitTime, maxBatch, dbf.ValidationReportErrorExemplarsByValidationReportErrorGroupID), - ValidationReportErrorGroupsByValidationReportID: withWaitAndCapacity(waitTime, maxBatch, dbf.ValidationReportErrorGroupsByValidationReportID), - ValidationReportsByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.ValidationReportsByFeedVersionID), + AgenciesByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.AgenciesByFeedVersionID), + AgenciesByID: withWaitAndCapacity(waitTime, batchSize, dbf.AgenciesByID), + AgenciesByOnestopID: withWaitAndCapacity(waitTime, batchSize, dbf.AgenciesByOnestopID), + AgencyPlacesByAgencyID: withWaitAndCapacity(waitTime, batchSize, dbf.AgencyPlacesByAgencyID), + CalendarDatesByServiceID: withWaitAndCapacity(waitTime, batchSize, dbf.CalendarDatesByServiceID), + CalendarsByID: withWaitAndCapacity(waitTime, batchSize, dbf.CalendarsByID), + CensusGeographiesByEntityID: withWaitAndCapacity(waitTime, batchSize, dbf.CensusGeographiesByEntityID), + CensusTableByID: withWaitAndCapacity(waitTime, batchSize, dbf.CensusTableByID), + CensusFieldsByTableID: withWaitAndCapacity(waitTime, batchSize, dbf.CensusFieldsByTableID), + CensusValuesByGeographyID: withWaitAndCapacity(waitTime, batchSize, dbf.CensusValuesByGeographyID), + FeedFetchesByFeedID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedFetchesByFeedID), + FeedInfosByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedInfosByFeedVersionID), + FeedsByID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedsByID), + FeedsByOperatorOnestopID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedsByOperatorOnestopID), + FeedStatesByFeedID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedStatesByFeedID), + FeedVersionFileInfosByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedVersionFileInfosByFeedVersionID), + FeedVersionGeometryByID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedVersionGeometryByID), + FeedVersionGtfsImportByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedVersionGtfsImportByFeedVersionID), FeedVersionServiceWindowByFeedVersionID: withWaitAndCapacity(waitTime, maxBatch, dbf.FeedVersionServiceWindowByFeedVersionID), + FeedVersionsByFeedID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedVersionsByFeedID), + FeedVersionsByID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedVersionsByID), + FeedVersionServiceLevelsByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.FeedVersionServiceLevelsByFeedVersionID), + FrequenciesByTripID: withWaitAndCapacity(waitTime, batchSize, dbf.FrequenciesByTripID), + LevelsByID: withWaitAndCapacity(waitTime, batchSize, dbf.LevelsByID), + LevelsByParentStationID: withWaitAndCapacity(waitTime, batchSize, dbf.LevelsByParentStationID), + OperatorsByAgencyID: withWaitAndCapacity(waitTime, batchSize, dbf.OperatorsByAgencyID), + OperatorsByCOIF: withWaitAndCapacity(waitTime, batchSize, dbf.OperatorsByCOIF), + OperatorsByFeedID: withWaitAndCapacity(waitTime, batchSize, dbf.OperatorsByFeedID), + PathwaysByFromStopID: withWaitAndCapacity(waitTime, batchSize, dbf.PathwaysByFromStopID), + PathwaysByID: withWaitAndCapacity(waitTime, batchSize, dbf.PathwaysByID), + PathwaysByToStopID: withWaitAndCapacity(waitTime, batchSize, dbf.PathwaysByToStopID), + RouteAttributesByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.RouteAttributesByRouteID), + RouteGeometriesByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.RouteGeometriesByRouteID), + RouteHeadwaysByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.RouteHeadwaysByRouteID), + RoutesByAgencyID: withWaitAndCapacity(waitTime, batchSize, dbf.RoutesByAgencyID), + RoutesByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.RoutesByFeedVersionID), + RoutesByID: withWaitAndCapacity(waitTime, batchSize, dbf.RoutesByID), + RouteStopPatternsByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.RouteStopPatternsByRouteID), + RouteStopsByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.RouteStopsByRouteID), + RouteStopsByStopID: withWaitAndCapacity(waitTime, batchSize, dbf.RouteStopsByStopID), + SegmentPatternsByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.SegmentPatternsByRouteID), + SegmentPatternsBySegmentID: withWaitAndCapacity(waitTime, batchSize, dbf.SegmentPatternsBySegmentID), + SegmentsByID: withWaitAndCapacity(waitTime, batchSize, dbf.SegmentsByID), + SegmentsByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.SegmentsByRouteID), + SegmentsByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.SegmentsByFeedVersionID), + ShapesByID: withWaitAndCapacity(waitTime, batchSize, dbf.ShapesByID), + StopExternalReferencesByStopID: withWaitAndCapacity(waitTime, batchSize, dbf.StopExternalReferencesByStopID), + StopObservationsByStopID: withWaitAndCapacity(waitTime, batchSize, dbf.StopObservationsByStopID), + StopPlacesByStopID: withWaitAndCapacity(waitTime, batchSize, dbf.StopPlacesByStopID), + StopsByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.StopsByFeedVersionID), + StopsByID: withWaitAndCapacity(waitTime, batchSize, dbf.StopsByID), + StopsByLevelID: withWaitAndCapacity(waitTime, batchSize, dbf.StopsByLevelID), + StopsByParentStopID: withWaitAndCapacity(waitTime, batchSize, dbf.StopsByParentStopID), + StopsByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.StopsByRouteID), + StopTimesByStopID: withWaitAndCapacity(waitTime, stopTimeBatchSize, dbf.StopTimesByStopID), + StopTimesByTripID: withWaitAndCapacity(waitTime, batchSize, dbf.StopTimesByTripID), + TargetStopsByStopID: withWaitAndCapacity(waitTime, batchSize, dbf.TargetStopsByStopID), + TripsByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.TripsByFeedVersionID), + TripsByID: withWaitAndCapacity(waitTime, batchSize, dbf.TripsByID), + TripsByRouteID: withWaitAndCapacity(waitTime, batchSize, dbf.TripsByRouteID), + ValidationReportErrorExemplarsByValidationReportErrorGroupID: withWaitAndCapacity(waitTime, batchSize, dbf.ValidationReportErrorExemplarsByValidationReportErrorGroupID), + ValidationReportErrorGroupsByValidationReportID: withWaitAndCapacity(waitTime, batchSize, dbf.ValidationReportErrorGroupsByValidationReportID), + ValidationReportsByFeedVersionID: withWaitAndCapacity(waitTime, batchSize, dbf.ValidationReportsByFeedVersionID), } return loaders } @@ -162,7 +168,7 @@ func loaderMiddleware(next http.Handler) http.Handler { // Is this OK to use as a long term cache? ctx := r.Context() cfg := model.ForContext(ctx) - loaders := NewLoaders(cfg.Finder) + loaders := NewLoaders(cfg.Finder, cfg.LoaderBatchSize, cfg.LoaderStopTimeBatchSize) nextCtx := context.WithValue(ctx, loadersKey, loaders) r = r.WithContext(nextCtx) next.ServeHTTP(w, r)