Skip to content

Commit

Permalink
fix: write timeline events to the DB in async batches (#2873)
Browse files Browse the repository at this point in the history
This should help reduce timeline writes as a bottleneck.

The batches are in a transaction, whereas ideally we'd use PG's
COPYFROM, but that's quite a bit more effort. We can revisit.

@wesbillman this also changes the interface of the timeline service a
bit.

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
alecthomas and github-actions[bot] authored Sep 27, 2024
1 parent cee6c95 commit 9f72cf6
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 83 deletions.
2 changes: 1 addition & 1 deletion backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timeline.TimelineFilter
return query, nil
}

func eventDALToProto(event timeline.TimelineEvent) *pbconsole.Event {
func eventDALToProto(event timeline.Event) *pbconsole.Event {
switch event := event.(type) {
case *timeline.CallEvent:
var requestKey *string
Expand Down
8 changes: 2 additions & 6 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func (s *Service) StreamDeploymentLogs(ctx context.Context, stream *connect.Clie
requestKey = optional.Some(rkey)
}

err = s.timeline.InsertLogEvent(ctx, &timeline.Log{
s.timeline.EnqueueEvent(ctx, &timeline.Log{
DeploymentKey: deploymentKey,
RequestKey: requestKey,
Time: msg.TimeStamp.AsTime(),
Expand All @@ -487,10 +487,6 @@ func (s *Service) StreamDeploymentLogs(ctx context.Context, stream *connect.Clie
Message: msg.Message,
Error: optional.Ptr(msg.Error),
})

if err != nil {
return nil, err
}
}
if stream.Err() != nil {
return nil, stream.Err()
Expand Down Expand Up @@ -1080,7 +1076,7 @@ func (s *Service) callWithRequest(
callResponse = either.RightOf[*ftlv1.CallResponse](err)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed"))
}
s.timeline.InsertCallEvent(ctx, &timeline.Call{
s.timeline.EnqueueEvent(ctx, &timeline.Call{
DeploymentKey: route.Deployment,
RequestKey: requestKey,
ParentRequestKey: parentKey,
Expand Down
5 changes: 1 addition & 4 deletions backend/controller/deployment_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (d *deploymentLogsSink) processLogs(ctx context.Context) {
errorStr = optional.Some(entry.Error.Error())
}

err = d.timeline.InsertLogEvent(ctx, &timeline.Log{
d.timeline.EnqueueEvent(ctx, &timeline.Log{
RequestKey: request,
DeploymentKey: deployment,
Time: entry.Time,
Expand All @@ -80,9 +80,6 @@ func (d *deploymentLogsSink) processLogs(ctx context.Context) {
Message: entry.Message,
Error: errorStr,
})
if err != nil {
fmt.Printf("failed to insert log entry: %v :: error: %v\n", entry, err)
}
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
Expand Down
4 changes: 2 additions & 2 deletions backend/controller/ingress/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func Handle(
if err == nil {
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.None[string]())
ingressEvent.Response.Body = io.NopCloser(strings.NewReader(string(rawBody)))
timelineService.InsertHTTPIngress(r.Context(), &ingressEvent)
timelineService.EnqueueEvent(r.Context(), &ingressEvent)
} else {
logger.Errorf(err, "could not write response body")
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not write response body"))
Expand All @@ -176,7 +176,7 @@ func recordIngressErrorEvent(
) {
ingressEvent.Response.StatusCode = statusCode
ingressEvent.Error = optional.Some(errorMsg)
timelineService.InsertHTTPIngress(ctx, ingressEvent)
timelineService.EnqueueEvent(ctx, ingressEvent)
}

// Copied from the Apache-licensed connect-go source.
Expand Down
5 changes: 4 additions & 1 deletion backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var (
PubSub *PubSubMetrics
Cron *CronMetrics
Controller *ControllerTracing
Timeline *TimelineMetrics
)

func init() {
Expand All @@ -39,8 +40,10 @@ func init() {
Cron, err = initCronMetrics()
errs = errors.Join(errs, err)
Controller = initControllerTracing()
Timeline, err = initTimelineMetrics()
errs = errors.Join(errs, err)

if err != nil {
if errs != nil {
panic(fmt.Errorf("could not initialize controller metrics: %w", errs))
}
}
Expand Down
63 changes: 63 additions & 0 deletions backend/controller/observability/timeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package observability

import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
)

const (
timelineMeterName = "ftl.timeline"
)

type TimelineMetrics struct {
inserted metric.Int64Counter
dropped metric.Int64Counter
failed metric.Int64Counter
}

func initTimelineMetrics() (*TimelineMetrics, error) {
result := &TimelineMetrics{
inserted: noop.Int64Counter{},
dropped: noop.Int64Counter{},
failed: noop.Int64Counter{},
}

var err error
meter := otel.Meter(timelineMeterName)

signalName := fmt.Sprintf("%s.inserted", timelineMeterName)
if result.inserted, err = meter.Int64Counter(signalName, metric.WithUnit("1"),
metric.WithDescription("the number of times a timeline event was inserted")); err != nil {
return nil, wrapErr(signalName, err)
}

signalName = fmt.Sprintf("%s.dropped", timelineMeterName)
if result.dropped, err = meter.Int64Counter(signalName, metric.WithUnit("1"),
metric.WithDescription("the number of times a timeline event was dropped due to the queue being full")); err != nil {
return nil, wrapErr(signalName, err)
}

signalName = fmt.Sprintf("%s.failed", timelineMeterName)
if result.dropped, err = meter.Int64Counter(signalName, metric.WithUnit("1"),
metric.WithDescription("the number of times a timeline event failed to be inserted into the database")); err != nil {
return nil, wrapErr(signalName, err)
}

return result, nil
}

func (m *TimelineMetrics) Inserted(ctx context.Context, count int) {
m.inserted.Add(ctx, int64(count))
}

func (m *TimelineMetrics) Dropped(ctx context.Context) {
m.dropped.Add(ctx, 1)
}

func (m *TimelineMetrics) Failed(ctx context.Context, count int) {
m.failed.Add(ctx, int64(count))
}
18 changes: 9 additions & 9 deletions backend/controller/timeline/events_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/alecthomas/types/either"
Expand All @@ -14,7 +15,6 @@ import (
"github.com/TBD54566975/ftl/backend/libdal"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
)

Expand Down Expand Up @@ -55,8 +55,9 @@ type Call struct {
Response either.Either[*ftlv1.CallResponse, error]
}

func (s *Service) InsertCallEvent(ctx context.Context, call *Call) {
logger := log.FromContext(ctx)
func (c *Call) inEvent() {}

func (s *Service) insertCallEvent(ctx context.Context, querier sql.Querier, call *Call) error {
callEvent := callToCallEvent(call)

var sourceModule, sourceVerb optional.Option[string]
Expand Down Expand Up @@ -84,18 +85,16 @@ func (s *Service) InsertCallEvent(ctx context.Context, call *Call) {

data, err := json.Marshal(callJSON)
if err != nil {
logger.Errorf(err, "failed to marshal call event")
return
return fmt.Errorf("failed to marshal call event: %w", err)
}

var payload ftlencryption.EncryptedTimelineColumn
err = s.encryption.EncryptJSON(json.RawMessage(data), &payload)
if err != nil {
logger.Errorf(err, "failed to encrypt call event")
return
return fmt.Errorf("failed to encrypt call event: %w", err)
}

err = libdal.TranslatePGError(s.db.InsertTimelineCallEvent(ctx, sql.InsertTimelineCallEventParams{
err = libdal.TranslatePGError(querier.InsertTimelineCallEvent(ctx, sql.InsertTimelineCallEventParams{
DeploymentKey: call.DeploymentKey,
RequestKey: requestKey,
ParentRequestKey: parentRequestKey,
Expand All @@ -107,8 +106,9 @@ func (s *Service) InsertCallEvent(ctx context.Context, call *Call) {
Payload: payload,
}))
if err != nil {
logger.Errorf(err, "failed to insert call event")
return fmt.Errorf("failed to insert call event: %w", err)
}
return nil
}

func callToCallEvent(call *Call) *CallEvent {
Expand Down
29 changes: 12 additions & 17 deletions backend/controller/timeline/events_ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package timeline
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
Expand All @@ -13,7 +14,6 @@ import (
"github.com/TBD54566975/ftl/backend/controller/timeline/internal/sql"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
)

Expand Down Expand Up @@ -60,13 +60,12 @@ type Ingress struct {
Error optional.Option[string]
}

func (s *Service) InsertHTTPIngress(ctx context.Context, ingress *Ingress) {
logger := log.FromContext(ctx)
func (*Ingress) inEvent() {}

func (s *Service) insertHTTPIngress(ctx context.Context, querier sql.Querier, ingress *Ingress) error {
requestBody, err := io.ReadAll(ingress.Request.Body)
if err != nil {
logger.Errorf(err, "failed to read request body")
return
return fmt.Errorf("failed to read request body: %w", err)
}
if len(requestBody) == 0 {
requestBody = []byte("{}")
Expand All @@ -76,8 +75,7 @@ func (s *Service) InsertHTTPIngress(ctx context.Context, ingress *Ingress) {
if ingress.Response.Body != nil {
responseBody, err = io.ReadAll(ingress.Response.Body)
if err != nil {
logger.Errorf(err, "failed to read response body")
return
return fmt.Errorf("failed to read response body: %w", err)
}
}

Expand All @@ -87,17 +85,15 @@ func (s *Service) InsertHTTPIngress(ctx context.Context, ingress *Ingress) {

reqHeaderBytes, err := json.Marshal(ingress.Request.Header)
if err != nil {
logger.Errorf(err, "failed to marshal request header")
return
return fmt.Errorf("failed to marshal request header: %w", err)
}
if len(reqHeaderBytes) == 0 {
reqHeaderBytes = []byte("{}")
}

respHeaderBytes, err := json.Marshal(ingress.Response.Header)
if err != nil {
logger.Errorf(err, "failed to marshal response header")
return
return fmt.Errorf("failed to marshal response header: %w", err)
}
if len(respHeaderBytes) == 0 {
respHeaderBytes = []byte("{}")
Expand All @@ -117,18 +113,16 @@ func (s *Service) InsertHTTPIngress(ctx context.Context, ingress *Ingress) {

data, err := json.Marshal(ingressJSON)
if err != nil {
logger.Errorf(err, "failed to marshal ingress JSON")
return
return fmt.Errorf("failed to marshal ingress JSON: %w", err)
}

var payload ftlencryption.EncryptedTimelineColumn
err = s.encryption.EncryptJSON(json.RawMessage(data), &payload)
if err != nil {
logger.Errorf(err, "failed to encrypt ingress payload")
return
return fmt.Errorf("failed to encrypt ingress payload: %w", err)
}

err = libdal.TranslatePGError(s.db.InsertTimelineIngressEvent(ctx, sql.InsertTimelineIngressEventParams{
err = libdal.TranslatePGError(querier.InsertTimelineIngressEvent(ctx, sql.InsertTimelineIngressEventParams{
DeploymentKey: ingress.DeploymentKey,
RequestKey: optional.Some(ingress.RequestKey.String()),
TimeStamp: ingress.StartTime,
Expand All @@ -138,6 +132,7 @@ func (s *Service) InsertHTTPIngress(ctx context.Context, ingress *Ingress) {
Payload: payload,
}))
if err != nil {
logger.Errorf(err, "failed to insert ingress event")
return fmt.Errorf("failed to insert ingress event: %w", err)
}
return nil
}
6 changes: 4 additions & 2 deletions backend/controller/timeline/events_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type Log struct {
Error optional.Option[string]
}

func (l *Log) inEvent() {}

type LogEvent struct {
ID int64
Log
Expand All @@ -38,7 +40,7 @@ type eventLogJSON struct {
Error optional.Option[string] `json:"error,omitempty"`
}

func (s *Service) InsertLogEvent(ctx context.Context, log *Log) error {
func (s *Service) insertLogEvent(ctx context.Context, querier sql.Querier, log *Log) error {
var requestKey optional.Option[string]
if name, ok := log.RequestKey.Get(); ok {
requestKey = optional.Some(name.String())
Expand All @@ -61,7 +63,7 @@ func (s *Service) InsertLogEvent(ctx context.Context, log *Log) error {
return fmt.Errorf("failed to encrypt log payload: %w", err)
}

return libdal.TranslatePGError(s.db.InsertTimelineLogEvent(ctx, sql.InsertTimelineLogEventParams{
return libdal.TranslatePGError(querier.InsertTimelineLogEvent(ctx, sql.InsertTimelineLogEventParams{
DeploymentKey: log.DeploymentKey,
RequestKey: requestKey,
TimeStamp: log.Time,
Expand Down
10 changes: 5 additions & 5 deletions backend/controller/timeline/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func FilterDescending() TimelineFilter {
}
}

func (s *Service) QueryTimeline(ctx context.Context, limit int, filters ...TimelineFilter) ([]TimelineEvent, error) {
func (s *Service) QueryTimeline(ctx context.Context, limit int, filters ...TimelineFilter) ([]Event, error) {
if limit < 1 {
return nil, fmt.Errorf("limit must be >= 1, got %d", limit)
}
Expand Down Expand Up @@ -158,7 +158,7 @@ func (s *Service) QueryTimeline(ctx context.Context, limit int, filters ...Timel
deploymentQuery += ` WHERE key = ANY($1::TEXT[])`
deploymentArgs = append(deploymentArgs, filter.deployments)
}
rows, err := s.Handle.Connection.QueryContext(ctx, deploymentQuery, deploymentArgs...)
rows, err := s.conn.QueryContext(ctx, deploymentQuery, deploymentArgs...)
if err != nil {
return nil, libdal.TranslatePGError(err)
}
Expand Down Expand Up @@ -214,7 +214,7 @@ func (s *Service) QueryTimeline(ctx context.Context, limit int, filters ...Timel
q += fmt.Sprintf(" LIMIT %d", limit)

// Issue query.
rows, err = s.Handle.Connection.QueryContext(ctx, q, args...)
rows, err = s.conn.QueryContext(ctx, q, args...)
if err != nil {
return nil, fmt.Errorf("%s: %w", q, libdal.TranslatePGError(err))
}
Expand All @@ -227,8 +227,8 @@ func (s *Service) QueryTimeline(ctx context.Context, limit int, filters ...Timel
return events, nil
}

func (s *Service) transformRowsToTimelineEvents(deploymentKeys map[int64]model.DeploymentKey, rows *stdsql.Rows) ([]TimelineEvent, error) {
var out []TimelineEvent
func (s *Service) transformRowsToTimelineEvents(deploymentKeys map[int64]model.DeploymentKey, rows *stdsql.Rows) ([]Event, error) {
var out []Event
for rows.Next() {
row := eventRow{}
var deploymentID int64
Expand Down
Loading

0 comments on commit 9f72cf6

Please sign in to comment.