diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 2fb1ee714..6c0dc85a7 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -64,7 +64,6 @@ import ( "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/modulecontext" internalobservability "github.com/TBD54566975/ftl/internal/observability" - ftlreflect "github.com/TBD54566975/ftl/internal/reflect" "github.com/TBD54566975/ftl/internal/rpc" "github.com/TBD54566975/ftl/internal/rpc/headers" "github.com/TBD54566975/ftl/internal/sha256" @@ -229,9 +228,8 @@ type Service struct { clients *ttlcache.Cache[string, clients] // Complete schema synchronised from the database. - schema atomic.Value[*schema.Schema] + schemaState atomic.Value[schemaState] - routes atomic.Value[map[string]Route] config Config increaseReplicaFailures map[string]int @@ -270,9 +268,7 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca increaseReplicaFailures: map[string]int{}, runnerScaling: runnerScaling, } - svc.routes.Store(map[string]Route{}) - svc.schema.Store(&schema.Schema{}) - + svc.schemaState.Store(schemaState{routes: map[string]Route{}, schema: &schema.Schema{}}) cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, conn) svc.cronJobs = cronSvc @@ -286,8 +282,6 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca svc.deploymentLogsSink = newDeploymentLogsSink(ctx, timelineSvc) - go svc.syncSchema(ctx) - // Use min, max backoff if we are running in production, otherwise use // (1s, 1s) (or develBackoff). Will also wrap the job such that it its next // runtime is capped at 1s. @@ -330,7 +324,7 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca } // Parallel tasks. - parallelTask(svc.syncRoutes, "sync-routes", time.Second, time.Second, time.Second*5) + parallelTask(svc.syncRoutesAndSchema, "sync-routes-and-schema", time.Second, time.Second, time.Second*5) parallelTask(svc.heartbeatController, "controller-heartbeat", time.Second, time.Second*3, time.Second*5) parallelTask(svc.updateControllersList, "update-controllers-list", time.Second, time.Second*5, time.Second*5) parallelTask(svc.executeAsyncCalls, "execute-async-calls", time.Second, time.Second*5, time.Second*10) @@ -362,7 +356,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } requestKey := model.NewRequestKey(model.OriginIngress, fmt.Sprintf("%s %s", r.Method, r.URL.Path)) - ingress.Handle(start, s.schema.Load(), requestKey, routes, w, r, s.timeline, s.callWithRequest) + ingress.Handle(start, s.schemaState.Load().schema, requestKey, routes, w, r, s.timeline, s.callWithRequest) } func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.ProcessListRequest]) (*connect.Response[ftlv1.ProcessListResponse], error) { @@ -405,7 +399,7 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR if err != nil { return nil, fmt.Errorf("could not get status: %w", err) } - sroutes := s.routes.Load() + sroutes := s.schemaState.Load().routes routes := slices.Map(maps.Values(sroutes), func(route Route) (out *ftlv1.StatusResponse_Route) { return &ftlv1.StatusResponse_Route{ Module: route.Module, @@ -634,7 +628,7 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre }() deferredDeregistration = true } - _, err = s.syncRoutes(ctx) + _, err = s.syncRoutesAndSchema(ctx) if err != nil { return nil, fmt.Errorf("could not sync routes: %w", err) } @@ -704,7 +698,7 @@ func (s *Service) Ping(ctx context.Context, req *connect.Request[ftlv1.PingReque } // It's not actually ready until it is in the routes table - routes := s.routes.Load() + routes := s.schemaState.Load().routes var missing []string for _, module := range s.config.WaitFor { if _, ok := routes[module]; !ok { @@ -874,7 +868,7 @@ func (s *Service) sendFSMEventInTx(ctx context.Context, tx *dal.DAL, instance *d var candidates []string - sch := s.schema.Load() + sch := s.schemaState.Load().schema updateCandidates := func(ref *schema.Ref) (brk bool, err error) { verb := &schema.Verb{} @@ -941,7 +935,7 @@ func (s *Service) SetNextFSMEvent(ctx context.Context, req *connect.Request[ftlv return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not start transaction: %w", err)) } defer tx.CommitOrRollback(ctx, &err) - sch := s.schema.Load() + sch := s.schemaState.Load().schema msg := req.Msg fsm, eventType, fsmKey, err := s.resolveFSMEvent(msg) if err != nil { @@ -1003,7 +997,8 @@ func (s *Service) callWithRequest( return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("body is required")) } - sch := s.schema.Load() + sstate := s.schemaState.Load() + sch := sstate.schema verbRef := schema.RefFromProto(req.Msg.Verb) verb := &schema.Verb{} @@ -1024,7 +1019,7 @@ func (s *Service) callWithRequest( } module := verbRef.Module - route, ok := s.routes.Load()[module] + route, ok := sstate.routes[module] if !ok { observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module")) return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no routes for module %q", module)) @@ -1374,7 +1369,7 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call * } logger.Debugf("Catching async call %s with %s", call.Verb, catchVerb) - sch := s.schema.Load() + sch := s.schemaState.Load().schema verb := &schema.Verb{} if err := sch.ResolveToType(call.Verb.ToRef(), verb); err != nil { @@ -1552,7 +1547,7 @@ func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.DAL, ori return nil } - sch := s.schema.Load() + sch := s.schemaState.Load().schema fsm := &schema.FSM{} err = sch.ResolveToType(origin.FSM.ToRef(), fsm) @@ -1587,7 +1582,7 @@ func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.DAL, ori } func (s *Service) resolveFSMEvent(msg *ftlv1.SendFSMEventRequest) (fsm *schema.FSM, eventType schema.Type, fsmKey schema.RefKey, err error) { - sch := s.schema.Load() + sch := s.schemaState.Load().schema fsm = &schema.FSM{} if err := sch.ResolveToType(schema.RefFromProto(msg.Fsm), fsm); err != nil { @@ -1766,8 +1761,10 @@ func (s *Service) getDeploymentLogger(ctx context.Context, deploymentKey model.D return log.FromContext(ctx).AddSink(s.deploymentLogsSink).Attrs(attrs) } -// Periodically sync the routing table from the DB. -func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error) { +// Periodically sync the routing table and schema from the DB. +// We do this in a single function so the routing table and schema are always consistent +// And they both need the same info from the DB +func (s *Service) syncRoutesAndSchema(ctx context.Context) (ret time.Duration, err error) { deployments, err := s.dal.GetActiveDeployments(ctx) if errors.Is(err, libdal.ErrNotFound) { deployments = []dalmodel.Deployment{} @@ -1780,8 +1777,15 @@ func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error) } defer tx.CommitOrRollback(ctx, &err) - old := s.routes.Load() + old := s.schemaState.Load().routes newRoutes := map[string]Route{} + modulesByName := map[string]*schema.Module{} + + builtins := schema.Builtins().ToProto().(*schemapb.Module) //nolint:forcetypeassert + modulesByName[builtins.Name], err = schema.ModuleFromProto(builtins) + if err != nil { + return 0, fmt.Errorf("failed to convert builtins to schema: %w", err) + } for _, v := range deployments { deploymentLogger := s.getDeploymentLogger(ctx, v.Key) deploymentLogger.Tracef("processing deployment %s for route table", v.Key.String()) @@ -1820,54 +1824,17 @@ func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error) } } newRoutes[v.Module] = Route{Module: v.Module, Deployment: v.Key, Endpoint: targetEndpoint} + modulesByName[v.Module] = v.Schema } } - s.routes.Store(newRoutes) - return time.Second, nil -} - -// Synchronises Service.schema from the database. -func (s *Service) syncSchema(ctx context.Context) { - logger := log.FromContext(ctx) - modulesByName := map[string]*schema.Module{} - retry := backoff.Backoff{Max: time.Second * 5} - for { - err := s.watchModuleChanges(ctx, func(response *ftlv1.PullSchemaResponse) error { - switch response.ChangeType { - case ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED: - moduleSchema, err := schema.ModuleFromProto(response.Schema) - if err != nil { - return err - } - modulesByName[moduleSchema.Name] = moduleSchema - - case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED: - delete(modulesByName, response.ModuleName) - } - orderedModules := maps.Values(modulesByName) - sort.SliceStable(orderedModules, func(i, j int) bool { - return orderedModules[i].Name < orderedModules[j].Name - }) - combined := &schema.Schema{Modules: orderedModules} - s.schema.Store(ftlreflect.DeepCopy(combined)) - return nil - }) - if err != nil { - next := retry.Duration() - if ctx.Err() == nil { - // Don't log when the context is done - logger.Warnf("Failed to watch module changes, retrying in %s: %s", next, err) - } - select { - case <-time.After(next): - case <-ctx.Done(): - return - } - } else { - retry.Reset() - } - } + orderedModules := maps.Values(modulesByName) + sort.SliceStable(orderedModules, func(i, j int) bool { + return orderedModules[i].Name < orderedModules[j].Name + }) + combined := &schema.Schema{Modules: orderedModules} + s.schemaState.Store(schemaState{schema: combined, routes: newRoutes}) + return time.Second, nil } func (s *Service) reapCallEvents(ctx context.Context) (time.Duration, error) { @@ -1936,6 +1903,11 @@ type Route struct { Endpoint string } +type schemaState struct { + schema *schema.Schema + routes map[string]Route +} + func (r Route) String() string { return fmt.Sprintf("%s -> %s", r.Deployment, r.Endpoint) }