Skip to content

Commit

Permalink
feat: parent span for polling operations (#2829)
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Sep 26, 2024
1 parent 435c857 commit 94b75cd
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 10 deletions.
29 changes: 19 additions & 10 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,19 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
// Use min, max backoff if we are running in production, otherwise use
// (1s, 1s) (or develBackoff). Will also wrap the job such that it its next
// runtime is capped at 1s.
maybeDevelTask := func(job scheduledtask.Job, maxNext, minDelay, maxDelay time.Duration, develBackoff ...backoff.Backoff) (backoff.Backoff, scheduledtask.Job) {
maybeDevelTask := func(job scheduledtask.Job, name string, maxNext, minDelay, maxDelay time.Duration, develBackoff ...backoff.Backoff) (backoff.Backoff, scheduledtask.Job) {
if len(develBackoff) > 1 {
panic("too many devel backoffs")
}
chain := job

// Trace controller operations
job = func(ctx context.Context) (time.Duration, error) {
ctx, span := observability.Controller.BeginSpan(ctx, name)
defer span.End()
return chain(ctx)
}

if devel {
chain := job
job = func(ctx context.Context) (time.Duration, error) {
Expand All @@ -311,21 +320,21 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
}

// Parallel tasks.
svc.tasks.Parallel(maybeDevelTask(svc.syncRoutes, time.Second, time.Second, time.Second*5))
svc.tasks.Parallel(maybeDevelTask(svc.heartbeatController, time.Second, time.Second*3, time.Second*5))
svc.tasks.Parallel(maybeDevelTask(svc.updateControllersList, time.Second, time.Second*5, time.Second*5))
svc.tasks.Parallel(maybeDevelTask(svc.executeAsyncCalls, time.Second, time.Second*5, time.Second*10))
svc.tasks.Parallel(maybeDevelTask(svc.syncRoutes, "sync-routes", time.Second, time.Second, time.Second*5))
svc.tasks.Parallel(maybeDevelTask(svc.heartbeatController, "controller-heartbeat", time.Second, time.Second*3, time.Second*5))
svc.tasks.Parallel(maybeDevelTask(svc.updateControllersList, "update-controllers-list", time.Second, time.Second*5, time.Second*5))
svc.tasks.Parallel(maybeDevelTask(svc.executeAsyncCalls, "execute-async-calls", time.Second, time.Second*5, time.Second*10))

// This should be a singleton task, but because this is the task that
// actually expires the leases used to run singleton tasks, it must be
// parallel.
svc.tasks.Parallel(maybeDevelTask(svc.expireStaleLeases, time.Second*2, time.Second, time.Second*5))
svc.tasks.Parallel(maybeDevelTask(svc.expireStaleLeases, "expire-stale-leases", time.Second*2, time.Second, time.Second*5))

// Singleton tasks use leases to only run on a single controller.
svc.tasks.Singleton(maybeDevelTask(svc.reapStaleControllers, time.Second*2, time.Second*20, time.Second*20))
svc.tasks.Singleton(maybeDevelTask(svc.reapStaleRunners, time.Second*2, time.Second, time.Second*10))
svc.tasks.Singleton(maybeDevelTask(svc.reapCallEvents, time.Minute*5, time.Minute, time.Minute*30))
svc.tasks.Singleton(maybeDevelTask(svc.reapAsyncCalls, time.Second*5, time.Second, time.Second*5))
svc.tasks.Singleton(maybeDevelTask(svc.reapStaleControllers, "reap-stale-controllers", time.Second*2, time.Second*20, time.Second*20))
svc.tasks.Singleton(maybeDevelTask(svc.reapStaleRunners, "reap-stale-runners", time.Second*2, time.Second, time.Second*10))
svc.tasks.Singleton(maybeDevelTask(svc.reapCallEvents, "reap-call-events", time.Minute*5, time.Minute, time.Minute*30))
svc.tasks.Singleton(maybeDevelTask(svc.reapAsyncCalls, "reap-async-calls", time.Second*5, time.Second, time.Second*5))
return svc, nil
}

Expand Down
34 changes: 34 additions & 0 deletions backend/controller/observability/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package observability

import (
"context"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

const (
controllerPollingOperation = "ftl.controller.poll"
operation = "operation"
)

type ControllerTracing struct {
polling trace.Tracer
}

func initControllerTracing() *ControllerTracing {
provider := otel.GetTracerProvider()
result := &ControllerTracing{
polling: provider.Tracer(controllerPollingOperation),
}

return result
}

func (m *ControllerTracing) BeginSpan(ctx context.Context, name string) (context.Context, trace.Span) {
attrs := []attribute.KeyValue{
attribute.String(operation, name),
}
return m.polling.Start(ctx, controllerPollingOperation, trace.WithAttributes(attrs...))
}
2 changes: 2 additions & 0 deletions backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var (
Ingress *IngressMetrics
PubSub *PubSubMetrics
Cron *CronMetrics
Controller *ControllerTracing
)

func init() {
Expand All @@ -37,6 +38,7 @@ func init() {
errs = errors.Join(errs, err)
Cron, err = initCronMetrics()
errs = errors.Join(errs, err)
Controller = initControllerTracing()

if err != nil {
panic(fmt.Errorf("could not initialize controller metrics: %w", errs))
Expand Down

0 comments on commit 94b75cd

Please sign in to comment.