Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: schema sync is broken #2843

Merged
merged 1 commit into from
Sep 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 40 additions & 68 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import (
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/modulecontext"
internalobservability "github.com/TBD54566975/ftl/internal/observability"
ftlreflect "github.com/TBD54566975/ftl/internal/reflect"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/rpc/headers"
"github.com/TBD54566975/ftl/internal/sha256"
Expand Down Expand Up @@ -229,9 +228,8 @@ type Service struct {
clients *ttlcache.Cache[string, clients]

// Complete schema synchronised from the database.
schema atomic.Value[*schema.Schema]
schemaState atomic.Value[schemaState]

routes atomic.Value[map[string]Route]
config Config

increaseReplicaFailures map[string]int
Expand Down Expand Up @@ -270,9 +268,7 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
increaseReplicaFailures: map[string]int{},
runnerScaling: runnerScaling,
}
svc.routes.Store(map[string]Route{})
svc.schema.Store(&schema.Schema{})

svc.schemaState.Store(schemaState{routes: map[string]Route{}, schema: &schema.Schema{}})
cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, conn)
svc.cronJobs = cronSvc

Expand All @@ -286,8 +282,6 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca

svc.deploymentLogsSink = newDeploymentLogsSink(ctx, timelineSvc)

go svc.syncSchema(ctx)

// Use min, max backoff if we are running in production, otherwise use
// (1s, 1s) (or develBackoff). Will also wrap the job such that it its next
// runtime is capped at 1s.
Expand Down Expand Up @@ -330,7 +324,7 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
}

// Parallel tasks.
parallelTask(svc.syncRoutes, "sync-routes", time.Second, time.Second, time.Second*5)
parallelTask(svc.syncRoutesAndSchema, "sync-routes-and-schema", time.Second, time.Second, time.Second*5)
parallelTask(svc.heartbeatController, "controller-heartbeat", time.Second, time.Second*3, time.Second*5)
parallelTask(svc.updateControllersList, "update-controllers-list", time.Second, time.Second*5, time.Second*5)
parallelTask(svc.executeAsyncCalls, "execute-async-calls", time.Second, time.Second*5, time.Second*10)
Expand Down Expand Up @@ -362,7 +356,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
requestKey := model.NewRequestKey(model.OriginIngress, fmt.Sprintf("%s %s", r.Method, r.URL.Path))
ingress.Handle(start, s.schema.Load(), requestKey, routes, w, r, s.timeline, s.callWithRequest)
ingress.Handle(start, s.schemaState.Load().schema, requestKey, routes, w, r, s.timeline, s.callWithRequest)
}

func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.ProcessListRequest]) (*connect.Response[ftlv1.ProcessListResponse], error) {
Expand Down Expand Up @@ -405,7 +399,7 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR
if err != nil {
return nil, fmt.Errorf("could not get status: %w", err)
}
sroutes := s.routes.Load()
sroutes := s.schemaState.Load().routes
routes := slices.Map(maps.Values(sroutes), func(route Route) (out *ftlv1.StatusResponse_Route) {
return &ftlv1.StatusResponse_Route{
Module: route.Module,
Expand Down Expand Up @@ -634,7 +628,7 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre
}()
deferredDeregistration = true
}
_, err = s.syncRoutes(ctx)
_, err = s.syncRoutesAndSchema(ctx)
if err != nil {
return nil, fmt.Errorf("could not sync routes: %w", err)
}
Expand Down Expand Up @@ -704,7 +698,7 @@ func (s *Service) Ping(ctx context.Context, req *connect.Request[ftlv1.PingReque
}

// It's not actually ready until it is in the routes table
routes := s.routes.Load()
routes := s.schemaState.Load().routes
var missing []string
for _, module := range s.config.WaitFor {
if _, ok := routes[module]; !ok {
Expand Down Expand Up @@ -874,7 +868,7 @@ func (s *Service) sendFSMEventInTx(ctx context.Context, tx *dal.DAL, instance *d

var candidates []string

sch := s.schema.Load()
sch := s.schemaState.Load().schema

updateCandidates := func(ref *schema.Ref) (brk bool, err error) {
verb := &schema.Verb{}
Expand Down Expand Up @@ -941,7 +935,7 @@ func (s *Service) SetNextFSMEvent(ctx context.Context, req *connect.Request[ftlv
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not start transaction: %w", err))
}
defer tx.CommitOrRollback(ctx, &err)
sch := s.schema.Load()
sch := s.schemaState.Load().schema
msg := req.Msg
fsm, eventType, fsmKey, err := s.resolveFSMEvent(msg)
if err != nil {
Expand Down Expand Up @@ -1003,7 +997,8 @@ func (s *Service) callWithRequest(
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("body is required"))
}

sch := s.schema.Load()
sstate := s.schemaState.Load()
sch := sstate.schema

verbRef := schema.RefFromProto(req.Msg.Verb)
verb := &schema.Verb{}
Expand All @@ -1024,7 +1019,7 @@ func (s *Service) callWithRequest(
}

module := verbRef.Module
route, ok := s.routes.Load()[module]
route, ok := sstate.routes[module]
if !ok {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module"))
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no routes for module %q", module))
Expand Down Expand Up @@ -1374,7 +1369,7 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *
}
logger.Debugf("Catching async call %s with %s", call.Verb, catchVerb)

sch := s.schema.Load()
sch := s.schemaState.Load().schema

verb := &schema.Verb{}
if err := sch.ResolveToType(call.Verb.ToRef(), verb); err != nil {
Expand Down Expand Up @@ -1552,7 +1547,7 @@ func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.DAL, ori
return nil
}

sch := s.schema.Load()
sch := s.schemaState.Load().schema

fsm := &schema.FSM{}
err = sch.ResolveToType(origin.FSM.ToRef(), fsm)
Expand Down Expand Up @@ -1587,7 +1582,7 @@ func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.DAL, ori
}

func (s *Service) resolveFSMEvent(msg *ftlv1.SendFSMEventRequest) (fsm *schema.FSM, eventType schema.Type, fsmKey schema.RefKey, err error) {
sch := s.schema.Load()
sch := s.schemaState.Load().schema

fsm = &schema.FSM{}
if err := sch.ResolveToType(schema.RefFromProto(msg.Fsm), fsm); err != nil {
Expand Down Expand Up @@ -1766,8 +1761,10 @@ func (s *Service) getDeploymentLogger(ctx context.Context, deploymentKey model.D
return log.FromContext(ctx).AddSink(s.deploymentLogsSink).Attrs(attrs)
}

// Periodically sync the routing table from the DB.
func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error) {
// Periodically sync the routing table and schema from the DB.
// We do this in a single function so the routing table and schema are always consistent
// And they both need the same info from the DB
func (s *Service) syncRoutesAndSchema(ctx context.Context) (ret time.Duration, err error) {
deployments, err := s.dal.GetActiveDeployments(ctx)
if errors.Is(err, libdal.ErrNotFound) {
deployments = []dalmodel.Deployment{}
Expand All @@ -1780,8 +1777,15 @@ func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error)
}
defer tx.CommitOrRollback(ctx, &err)

old := s.routes.Load()
old := s.schemaState.Load().routes
newRoutes := map[string]Route{}
modulesByName := map[string]*schema.Module{}

builtins := schema.Builtins().ToProto().(*schemapb.Module) //nolint:forcetypeassert
modulesByName[builtins.Name], err = schema.ModuleFromProto(builtins)
if err != nil {
return 0, fmt.Errorf("failed to convert builtins to schema: %w", err)
}
for _, v := range deployments {
deploymentLogger := s.getDeploymentLogger(ctx, v.Key)
deploymentLogger.Tracef("processing deployment %s for route table", v.Key.String())
Expand Down Expand Up @@ -1820,54 +1824,17 @@ func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error)
}
}
newRoutes[v.Module] = Route{Module: v.Module, Deployment: v.Key, Endpoint: targetEndpoint}
modulesByName[v.Module] = v.Schema
}
}
s.routes.Store(newRoutes)
return time.Second, nil
}

// Synchronises Service.schema from the database.
func (s *Service) syncSchema(ctx context.Context) {
logger := log.FromContext(ctx)
modulesByName := map[string]*schema.Module{}
retry := backoff.Backoff{Max: time.Second * 5}
for {
err := s.watchModuleChanges(ctx, func(response *ftlv1.PullSchemaResponse) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this being used anymore? If not we should remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used for PullSchema

switch response.ChangeType {
case ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED:
moduleSchema, err := schema.ModuleFromProto(response.Schema)
if err != nil {
return err
}
modulesByName[moduleSchema.Name] = moduleSchema

case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED:
delete(modulesByName, response.ModuleName)
}

orderedModules := maps.Values(modulesByName)
sort.SliceStable(orderedModules, func(i, j int) bool {
return orderedModules[i].Name < orderedModules[j].Name
})
combined := &schema.Schema{Modules: orderedModules}
s.schema.Store(ftlreflect.DeepCopy(combined))
return nil
})
if err != nil {
next := retry.Duration()
if ctx.Err() == nil {
// Don't log when the context is done
logger.Warnf("Failed to watch module changes, retrying in %s: %s", next, err)
}
select {
case <-time.After(next):
case <-ctx.Done():
return
}
} else {
retry.Reset()
}
}
orderedModules := maps.Values(modulesByName)
sort.SliceStable(orderedModules, func(i, j int) bool {
return orderedModules[i].Name < orderedModules[j].Name
})
combined := &schema.Schema{Modules: orderedModules}
s.schemaState.Store(schemaState{schema: combined, routes: newRoutes})
return time.Second, nil
}

func (s *Service) reapCallEvents(ctx context.Context) (time.Duration, error) {
Expand Down Expand Up @@ -1936,6 +1903,11 @@ type Route struct {
Endpoint string
}

type schemaState struct {
schema *schema.Schema
routes map[string]Route
}

func (r Route) String() string {
return fmt.Sprintf("%s -> %s", r.Deployment, r.Endpoint)
}
Loading