Skip to content

Commit

Permalink
fix: schema sync is broken
Browse files Browse the repository at this point in the history
As the sync works on module name rather than deployment when a deployment is
removed it will remove any newer version of the module.

This PR changes to build the schema in the same function as the route table,
so they stay in sync.
  • Loading branch information
stuartwdouglas committed Sep 26, 2024
1 parent 9b44784 commit 239c4e6
Showing 1 changed file with 34 additions and 68 deletions.
102 changes: 34 additions & 68 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import (
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/modulecontext"
internalobservability "github.com/TBD54566975/ftl/internal/observability"
ftlreflect "github.com/TBD54566975/ftl/internal/reflect"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/rpc/headers"
"github.com/TBD54566975/ftl/internal/sha256"
Expand Down Expand Up @@ -229,9 +228,8 @@ type Service struct {
clients *ttlcache.Cache[string, clients]

// Complete schema synchronised from the database.
schema atomic.Value[*schema.Schema]
schemaState atomic.Value[schemaState]

routes atomic.Value[map[string]Route]
config Config

increaseReplicaFailures map[string]int
Expand Down Expand Up @@ -270,9 +268,7 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
increaseReplicaFailures: map[string]int{},
runnerScaling: runnerScaling,
}
svc.routes.Store(map[string]Route{})
svc.schema.Store(&schema.Schema{})

svc.schemaState.Store(schemaState{routes: map[string]Route{}, schema: &schema.Schema{}})
cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, conn)
svc.cronJobs = cronSvc

Expand All @@ -286,8 +282,6 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca

svc.deploymentLogsSink = newDeploymentLogsSink(ctx, timelineSvc)

go svc.syncSchema(ctx)

// Use min, max backoff if we are running in production, otherwise use
// (1s, 1s) (or develBackoff). Will also wrap the job such that it its next
// runtime is capped at 1s.
Expand Down Expand Up @@ -320,7 +314,7 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
}

// Parallel tasks.
svc.tasks.Parallel(maybeDevelTask(svc.syncRoutes, "sync-routes", time.Second, time.Second, time.Second*5))
svc.tasks.Parallel(maybeDevelTask(svc.syncRoutesAndSchema, "sync-routes-and-schema", time.Second, time.Second, time.Second*5))
svc.tasks.Parallel(maybeDevelTask(svc.heartbeatController, "controller-heartbeat", time.Second, time.Second*3, time.Second*5))
svc.tasks.Parallel(maybeDevelTask(svc.updateControllersList, "update-controllers-list", time.Second, time.Second*5, time.Second*5))
svc.tasks.Parallel(maybeDevelTask(svc.executeAsyncCalls, "execute-async-calls", time.Second, time.Second*5, time.Second*10))
Expand Down Expand Up @@ -352,7 +346,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
requestKey := model.NewRequestKey(model.OriginIngress, fmt.Sprintf("%s %s", r.Method, r.URL.Path))
ingress.Handle(start, s.schema.Load(), requestKey, routes, w, r, s.timeline, s.callWithRequest)
ingress.Handle(start, s.schemaState.Load().schema, requestKey, routes, w, r, s.timeline, s.callWithRequest)
}

func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.ProcessListRequest]) (*connect.Response[ftlv1.ProcessListResponse], error) {
Expand Down Expand Up @@ -395,7 +389,7 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR
if err != nil {
return nil, fmt.Errorf("could not get status: %w", err)
}
sroutes := s.routes.Load()
sroutes := s.schemaState.Load().routes
routes := slices.Map(maps.Values(sroutes), func(route Route) (out *ftlv1.StatusResponse_Route) {
return &ftlv1.StatusResponse_Route{
Module: route.Module,
Expand Down Expand Up @@ -624,7 +618,7 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre
}()
deferredDeregistration = true
}
_, err = s.syncRoutes(ctx)
_, err = s.syncRoutesAndSchema(ctx)
if err != nil {
return nil, fmt.Errorf("could not sync routes: %w", err)
}
Expand Down Expand Up @@ -694,7 +688,7 @@ func (s *Service) Ping(ctx context.Context, req *connect.Request[ftlv1.PingReque
}

// It's not actually ready until it is in the routes table
routes := s.routes.Load()
routes := s.schemaState.Load().routes
var missing []string
for _, module := range s.config.WaitFor {
if _, ok := routes[module]; !ok {
Expand Down Expand Up @@ -864,7 +858,7 @@ func (s *Service) sendFSMEventInTx(ctx context.Context, tx *dal.DAL, instance *d

var candidates []string

sch := s.schema.Load()
sch := s.schemaState.Load().schema

updateCandidates := func(ref *schema.Ref) (brk bool, err error) {
verb := &schema.Verb{}
Expand Down Expand Up @@ -931,7 +925,7 @@ func (s *Service) SetNextFSMEvent(ctx context.Context, req *connect.Request[ftlv
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not start transaction: %w", err))
}
defer tx.CommitOrRollback(ctx, &err)
sch := s.schema.Load()
sch := s.schemaState.Load().schema
msg := req.Msg
fsm, eventType, fsmKey, err := s.resolveFSMEvent(msg)
if err != nil {
Expand Down Expand Up @@ -993,7 +987,8 @@ func (s *Service) callWithRequest(
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("body is required"))
}

sch := s.schema.Load()
sstate := s.schemaState.Load()
sch := sstate.schema

verbRef := schema.RefFromProto(req.Msg.Verb)
verb := &schema.Verb{}
Expand All @@ -1014,7 +1009,7 @@ func (s *Service) callWithRequest(
}

module := verbRef.Module
route, ok := s.routes.Load()[module]
route, ok := sstate.routes[module]
if !ok {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module"))
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no routes for module %q", module))
Expand Down Expand Up @@ -1364,7 +1359,7 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *
}
logger.Debugf("Catching async call %s with %s", call.Verb, catchVerb)

sch := s.schema.Load()
sch := s.schemaState.Load().schema

verb := &schema.Verb{}
if err := sch.ResolveToType(call.Verb.ToRef(), verb); err != nil {
Expand Down Expand Up @@ -1542,7 +1537,7 @@ func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.DAL, ori
return nil
}

sch := s.schema.Load()
sch := s.schemaState.Load().schema

fsm := &schema.FSM{}
err = sch.ResolveToType(origin.FSM.ToRef(), fsm)
Expand Down Expand Up @@ -1577,7 +1572,7 @@ func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.DAL, ori
}

func (s *Service) resolveFSMEvent(msg *ftlv1.SendFSMEventRequest) (fsm *schema.FSM, eventType schema.Type, fsmKey schema.RefKey, err error) {
sch := s.schema.Load()
sch := s.schemaState.Load().schema

fsm = &schema.FSM{}
if err := sch.ResolveToType(schema.RefFromProto(msg.Fsm), fsm); err != nil {
Expand Down Expand Up @@ -1756,8 +1751,10 @@ func (s *Service) getDeploymentLogger(ctx context.Context, deploymentKey model.D
return log.FromContext(ctx).AddSink(s.deploymentLogsSink).Attrs(attrs)
}

// Periodically sync the routing table from the DB.
func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error) {
// Periodically sync the routing table and schema from the DB.
// We do this in a single function so the routing table and schema are always consistent
// And they both need the same info from the DB
func (s *Service) syncRoutesAndSchema(ctx context.Context) (ret time.Duration, err error) {
deployments, err := s.dal.GetActiveDeployments(ctx)
if errors.Is(err, libdal.ErrNotFound) {
deployments = []dalmodel.Deployment{}
Expand All @@ -1770,8 +1767,9 @@ func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error)
}
defer tx.CommitOrRollback(ctx, &err)

old := s.routes.Load()
old := s.schemaState.Load().routes
newRoutes := map[string]Route{}
modulesByName := map[string]*schema.Module{}
for _, v := range deployments {
deploymentLogger := s.getDeploymentLogger(ctx, v.Key)
deploymentLogger.Tracef("processing deployment %s for route table", v.Key.String())
Expand Down Expand Up @@ -1810,54 +1808,17 @@ func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error)
}
}
newRoutes[v.Module] = Route{Module: v.Module, Deployment: v.Key, Endpoint: targetEndpoint}
modulesByName[v.Module] = v.Schema
}
}
s.routes.Store(newRoutes)
return time.Second, nil
}

// Synchronises Service.schema from the database.
func (s *Service) syncSchema(ctx context.Context) {
logger := log.FromContext(ctx)
modulesByName := map[string]*schema.Module{}
retry := backoff.Backoff{Max: time.Second * 5}
for {
err := s.watchModuleChanges(ctx, func(response *ftlv1.PullSchemaResponse) error {
switch response.ChangeType {
case ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED:
moduleSchema, err := schema.ModuleFromProto(response.Schema)
if err != nil {
return err
}
modulesByName[moduleSchema.Name] = moduleSchema

case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED:
delete(modulesByName, response.ModuleName)
}

orderedModules := maps.Values(modulesByName)
sort.SliceStable(orderedModules, func(i, j int) bool {
return orderedModules[i].Name < orderedModules[j].Name
})
combined := &schema.Schema{Modules: orderedModules}
s.schema.Store(ftlreflect.DeepCopy(combined))
return nil
})
if err != nil {
next := retry.Duration()
if ctx.Err() == nil {
// Don't log when the context is done
logger.Warnf("Failed to watch module changes, retrying in %s: %s", next, err)
}
select {
case <-time.After(next):
case <-ctx.Done():
return
}
} else {
retry.Reset()
}
}
orderedModules := maps.Values(modulesByName)
sort.SliceStable(orderedModules, func(i, j int) bool {
return orderedModules[i].Name < orderedModules[j].Name
})
combined := &schema.Schema{Modules: orderedModules}
s.schemaState.Store(schemaState{schema: combined, routes: newRoutes})
return time.Second, nil
}

func (s *Service) reapCallEvents(ctx context.Context) (time.Duration, error) {
Expand Down Expand Up @@ -1926,6 +1887,11 @@ type Route struct {
Endpoint string
}

type schemaState struct {
schema *schema.Schema
routes map[string]Route
}

func (r Route) String() string {
return fmt.Sprintf("%s -> %s", r.Deployment, r.Endpoint)
}

0 comments on commit 239c4e6

Please sign in to comment.