Skip to content

Commit

Permalink
fix: schema sync is broken (#2843)
Browse files Browse the repository at this point in the history
As the sync works on module name rather than deployment when a
deployment is removed it will remove any newer version of the module.

This PR changes to build the schema in the same function as the route
table, so they stay in sync.

At persent this is not an atomic operation however, so there is a slight
possibility for a request to see a new schema for an old route table,
however this is still way better than the current situation.
  • Loading branch information
stuartwdouglas authored Sep 26, 2024
1 parent 6b1db3b commit b980c86
Showing 1 changed file with 40 additions and 68 deletions.
108 changes: 40 additions & 68 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import (
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/modulecontext"
internalobservability "github.com/TBD54566975/ftl/internal/observability"
ftlreflect "github.com/TBD54566975/ftl/internal/reflect"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/rpc/headers"
"github.com/TBD54566975/ftl/internal/sha256"
Expand Down Expand Up @@ -229,9 +228,8 @@ type Service struct {
clients *ttlcache.Cache[string, clients]

// Complete schema synchronised from the database.
schema atomic.Value[*schema.Schema]
schemaState atomic.Value[schemaState]

routes atomic.Value[map[string]Route]
config Config

increaseReplicaFailures map[string]int
Expand Down Expand Up @@ -270,9 +268,7 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
increaseReplicaFailures: map[string]int{},
runnerScaling: runnerScaling,
}
svc.routes.Store(map[string]Route{})
svc.schema.Store(&schema.Schema{})

svc.schemaState.Store(schemaState{routes: map[string]Route{}, schema: &schema.Schema{}})
cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, conn)
svc.cronJobs = cronSvc

Expand All @@ -286,8 +282,6 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca

svc.deploymentLogsSink = newDeploymentLogsSink(ctx, timelineSvc)

go svc.syncSchema(ctx)

// Use min, max backoff if we are running in production, otherwise use
// (1s, 1s) (or develBackoff). Will also wrap the job such that it its next
// runtime is capped at 1s.
Expand Down Expand Up @@ -330,7 +324,7 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
}

// Parallel tasks.
parallelTask(svc.syncRoutes, "sync-routes", time.Second, time.Second, time.Second*5)
parallelTask(svc.syncRoutesAndSchema, "sync-routes-and-schema", time.Second, time.Second, time.Second*5)
parallelTask(svc.heartbeatController, "controller-heartbeat", time.Second, time.Second*3, time.Second*5)
parallelTask(svc.updateControllersList, "update-controllers-list", time.Second, time.Second*5, time.Second*5)
parallelTask(svc.executeAsyncCalls, "execute-async-calls", time.Second, time.Second*5, time.Second*10)
Expand Down Expand Up @@ -362,7 +356,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
requestKey := model.NewRequestKey(model.OriginIngress, fmt.Sprintf("%s %s", r.Method, r.URL.Path))
ingress.Handle(start, s.schema.Load(), requestKey, routes, w, r, s.timeline, s.callWithRequest)
ingress.Handle(start, s.schemaState.Load().schema, requestKey, routes, w, r, s.timeline, s.callWithRequest)
}

func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.ProcessListRequest]) (*connect.Response[ftlv1.ProcessListResponse], error) {
Expand Down Expand Up @@ -405,7 +399,7 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR
if err != nil {
return nil, fmt.Errorf("could not get status: %w", err)
}
sroutes := s.routes.Load()
sroutes := s.schemaState.Load().routes
routes := slices.Map(maps.Values(sroutes), func(route Route) (out *ftlv1.StatusResponse_Route) {
return &ftlv1.StatusResponse_Route{
Module: route.Module,
Expand Down Expand Up @@ -634,7 +628,7 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre
}()
deferredDeregistration = true
}
_, err = s.syncRoutes(ctx)
_, err = s.syncRoutesAndSchema(ctx)
if err != nil {
return nil, fmt.Errorf("could not sync routes: %w", err)
}
Expand Down Expand Up @@ -704,7 +698,7 @@ func (s *Service) Ping(ctx context.Context, req *connect.Request[ftlv1.PingReque
}

// It's not actually ready until it is in the routes table
routes := s.routes.Load()
routes := s.schemaState.Load().routes
var missing []string
for _, module := range s.config.WaitFor {
if _, ok := routes[module]; !ok {
Expand Down Expand Up @@ -874,7 +868,7 @@ func (s *Service) sendFSMEventInTx(ctx context.Context, tx *dal.DAL, instance *d

var candidates []string

sch := s.schema.Load()
sch := s.schemaState.Load().schema

updateCandidates := func(ref *schema.Ref) (brk bool, err error) {
verb := &schema.Verb{}
Expand Down Expand Up @@ -941,7 +935,7 @@ func (s *Service) SetNextFSMEvent(ctx context.Context, req *connect.Request[ftlv
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not start transaction: %w", err))
}
defer tx.CommitOrRollback(ctx, &err)
sch := s.schema.Load()
sch := s.schemaState.Load().schema
msg := req.Msg
fsm, eventType, fsmKey, err := s.resolveFSMEvent(msg)
if err != nil {
Expand Down Expand Up @@ -1003,7 +997,8 @@ func (s *Service) callWithRequest(
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("body is required"))
}

sch := s.schema.Load()
sstate := s.schemaState.Load()
sch := sstate.schema

verbRef := schema.RefFromProto(req.Msg.Verb)
verb := &schema.Verb{}
Expand All @@ -1024,7 +1019,7 @@ func (s *Service) callWithRequest(
}

module := verbRef.Module
route, ok := s.routes.Load()[module]
route, ok := sstate.routes[module]
if !ok {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module"))
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no routes for module %q", module))
Expand Down Expand Up @@ -1374,7 +1369,7 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *
}
logger.Debugf("Catching async call %s with %s", call.Verb, catchVerb)

sch := s.schema.Load()
sch := s.schemaState.Load().schema

verb := &schema.Verb{}
if err := sch.ResolveToType(call.Verb.ToRef(), verb); err != nil {
Expand Down Expand Up @@ -1552,7 +1547,7 @@ func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.DAL, ori
return nil
}

sch := s.schema.Load()
sch := s.schemaState.Load().schema

fsm := &schema.FSM{}
err = sch.ResolveToType(origin.FSM.ToRef(), fsm)
Expand Down Expand Up @@ -1587,7 +1582,7 @@ func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.DAL, ori
}

func (s *Service) resolveFSMEvent(msg *ftlv1.SendFSMEventRequest) (fsm *schema.FSM, eventType schema.Type, fsmKey schema.RefKey, err error) {
sch := s.schema.Load()
sch := s.schemaState.Load().schema

fsm = &schema.FSM{}
if err := sch.ResolveToType(schema.RefFromProto(msg.Fsm), fsm); err != nil {
Expand Down Expand Up @@ -1766,8 +1761,10 @@ func (s *Service) getDeploymentLogger(ctx context.Context, deploymentKey model.D
return log.FromContext(ctx).AddSink(s.deploymentLogsSink).Attrs(attrs)
}

// Periodically sync the routing table from the DB.
func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error) {
// Periodically sync the routing table and schema from the DB.
// We do this in a single function so the routing table and schema are always consistent
// And they both need the same info from the DB
func (s *Service) syncRoutesAndSchema(ctx context.Context) (ret time.Duration, err error) {
deployments, err := s.dal.GetActiveDeployments(ctx)
if errors.Is(err, libdal.ErrNotFound) {
deployments = []dalmodel.Deployment{}
Expand All @@ -1780,8 +1777,15 @@ func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error)
}
defer tx.CommitOrRollback(ctx, &err)

old := s.routes.Load()
old := s.schemaState.Load().routes
newRoutes := map[string]Route{}
modulesByName := map[string]*schema.Module{}

builtins := schema.Builtins().ToProto().(*schemapb.Module) //nolint:forcetypeassert
modulesByName[builtins.Name], err = schema.ModuleFromProto(builtins)
if err != nil {
return 0, fmt.Errorf("failed to convert builtins to schema: %w", err)
}
for _, v := range deployments {
deploymentLogger := s.getDeploymentLogger(ctx, v.Key)
deploymentLogger.Tracef("processing deployment %s for route table", v.Key.String())
Expand Down Expand Up @@ -1820,54 +1824,17 @@ func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error)
}
}
newRoutes[v.Module] = Route{Module: v.Module, Deployment: v.Key, Endpoint: targetEndpoint}
modulesByName[v.Module] = v.Schema
}
}
s.routes.Store(newRoutes)
return time.Second, nil
}

// Synchronises Service.schema from the database.
func (s *Service) syncSchema(ctx context.Context) {
logger := log.FromContext(ctx)
modulesByName := map[string]*schema.Module{}
retry := backoff.Backoff{Max: time.Second * 5}
for {
err := s.watchModuleChanges(ctx, func(response *ftlv1.PullSchemaResponse) error {
switch response.ChangeType {
case ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED:
moduleSchema, err := schema.ModuleFromProto(response.Schema)
if err != nil {
return err
}
modulesByName[moduleSchema.Name] = moduleSchema

case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED:
delete(modulesByName, response.ModuleName)
}

orderedModules := maps.Values(modulesByName)
sort.SliceStable(orderedModules, func(i, j int) bool {
return orderedModules[i].Name < orderedModules[j].Name
})
combined := &schema.Schema{Modules: orderedModules}
s.schema.Store(ftlreflect.DeepCopy(combined))
return nil
})
if err != nil {
next := retry.Duration()
if ctx.Err() == nil {
// Don't log when the context is done
logger.Warnf("Failed to watch module changes, retrying in %s: %s", next, err)
}
select {
case <-time.After(next):
case <-ctx.Done():
return
}
} else {
retry.Reset()
}
}
orderedModules := maps.Values(modulesByName)
sort.SliceStable(orderedModules, func(i, j int) bool {
return orderedModules[i].Name < orderedModules[j].Name
})
combined := &schema.Schema{Modules: orderedModules}
s.schemaState.Store(schemaState{schema: combined, routes: newRoutes})
return time.Second, nil
}

func (s *Service) reapCallEvents(ctx context.Context) (time.Duration, error) {
Expand Down Expand Up @@ -1936,6 +1903,11 @@ type Route struct {
Endpoint string
}

type schemaState struct {
schema *schema.Schema
routes map[string]Route
}

func (r Route) String() string {
return fmt.Sprintf("%s -> %s", r.Deployment, r.Endpoint)
}

0 comments on commit b980c86

Please sign in to comment.