diff --git a/go.mod b/go.mod index 05eff486e..1e4adddb0 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.60.1 github.com/rs/zerolog v1.33.0 + github.com/sourcegraph/conc v0.3.0 github.com/spf13/cobra v1.8.1 github.com/stealthrocket/wazergo v0.19.1 github.com/tetratelabs/wazero v1.8.1 @@ -312,7 +313,6 @@ require ( github.com/sivchari/containedctx v1.0.3 // indirect github.com/sivchari/tenv v1.10.0 // indirect github.com/sonatard/noctx v0.0.2 // indirect - github.com/sourcegraph/conc v0.3.0 // indirect github.com/sourcegraph/go-diff v0.7.0 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.7.0 // indirect diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index 2d33618ba..0f9e3639e 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -112,6 +112,11 @@ type Config struct { } } + Preview struct { + // PipelineArchV2 enables the new pipeline architecture. + PipelineArchV2 bool + } + dev struct { cpuprofile string memprofile string diff --git a/pkg/conduit/entrypoint.go b/pkg/conduit/entrypoint.go index f55a598af..a3bf84b4b 100644 --- a/pkg/conduit/entrypoint.go +++ b/pkg/conduit/entrypoint.go @@ -161,6 +161,8 @@ func Flags(cfg *Config) *flag.FlagSet { flags.StringVar(&cfg.SchemaRegistry.Type, "schema-registry.type", cfg.SchemaRegistry.Type, "schema registry type; accepts builtin,confluent") flags.StringVar(&cfg.SchemaRegistry.Confluent.ConnectionString, "schema-registry.confluent.connection-string", cfg.SchemaRegistry.Confluent.ConnectionString, "confluent schema registry connection string") + flags.BoolVar(&cfg.Preview.PipelineArchV2, "preview.pipeline-arch-v2", cfg.Preview.PipelineArchV2, "enables experimental pipeline architecture v2 (note that the new architecture currently supports only 1 source and 1 destination per pipeline)") + // NB: flags with prefix dev.* are hidden from help output by default, they only show up using '-dev -help' showDevHelp := flags.Bool("dev", false, "used together with the dev flag it shows dev flags") flags.StringVar(&cfg.dev.cpuprofile, "dev.cpuprofile", "", "write cpu profile to file") diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index f1fcc7de6..74df8dcb2 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -46,6 +46,7 @@ import ( "github.com/conduitio/conduit/pkg/foundation/metrics/measure" "github.com/conduitio/conduit/pkg/foundation/metrics/prometheus" "github.com/conduitio/conduit/pkg/lifecycle" + lifecycle_v2 "github.com/conduitio/conduit/pkg/lifecycle-poc" "github.com/conduitio/conduit/pkg/orchestrator" "github.com/conduitio/conduit/pkg/pipeline" conn_plugin "github.com/conduitio/conduit/pkg/plugin/connector" @@ -77,7 +78,7 @@ import ( ) const ( - exitTimeout = 10 * time.Second + exitTimeout = 30 * time.Second ) // Runtime sets up all services for serving and monitoring a Conduit instance. @@ -95,7 +96,7 @@ type Runtime struct { pipelineService *pipeline.Service connectorService *connector.Service processorService *processor.Service - lifecycleService *lifecycle.Service + lifecycleService lifecycleService connectorPluginService *conn_plugin.PluginService processorPluginService *proc_plugin.PluginService @@ -107,6 +108,14 @@ type Runtime struct { logger log.CtxLogger } +// lifecycleService is an interface that we use temporarily to allow for +// both the old and new lifecycle services to be used interchangeably. +type lifecycleService interface { + Start(ctx context.Context, pipelineID string) error + Stop(ctx context.Context, pipelineID string, force bool) error + Init(ctx context.Context) error +} + // NewRuntime sets up a Runtime instance and primes it for start. func NewRuntime(cfg Config) (*Runtime, error) { if err := cfg.Validate(); err != nil { @@ -203,21 +212,28 @@ func createServices(r *Runtime) error { tokenService, ) - // Error recovery configuration - errRecoveryCfg := &lifecycle.ErrRecoveryCfg{ - MinDelay: r.Config.Pipelines.ErrorRecovery.MinDelay, - MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay, - BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor, - MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries, - MaxRetriesWindow: r.Config.Pipelines.ErrorRecovery.MaxRetriesWindow, - } - plService := pipeline.NewService(r.logger, r.DB) connService := connector.NewService(r.logger, r.DB, r.connectorPersister) procService := processor.NewService(r.logger, r.DB, procPluginService) - lifecycleService := lifecycle.NewService(r.logger, errRecoveryCfg, connService, procService, connPluginService, plService) - provisionService := provisioning.NewService(r.DB, r.logger, plService, connService, procService, connPluginService, lifecycleService, r.Config.Pipelines.Path) + var lifecycleService lifecycleService + if r.Config.Preview.PipelineArchV2 { + r.logger.Info(context.Background()).Msg("using lifecycle service v2") + lifecycleService = lifecycle_v2.NewService(r.logger, connService, procService, connPluginService, plService) + } else { + // Error recovery configuration + errRecoveryCfg := &lifecycle.ErrRecoveryCfg{ + MinDelay: r.Config.Pipelines.ErrorRecovery.MinDelay, + MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay, + BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor, + MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries, + MaxRetriesWindow: r.Config.Pipelines.ErrorRecovery.MaxRetriesWindow, + } + + lifecycleService = lifecycle.NewService(r.logger, errRecoveryCfg, connService, procService, connPluginService, plService) + } + + provisionService := provisioning.NewService(r.DB, r.logger, plService, connService, procService, connPluginService, lifecycleService, r.Config.Pipelines.Path) orc := orchestrator.NewOrchestrator(r.DB, r.logger, plService, connService, procService, connPluginService, procPluginService, lifecycleService) r.Orchestrator = orc @@ -415,6 +431,15 @@ func (r *Runtime) initProfiling(ctx context.Context) (deferred func(), err error } func (r *Runtime) registerCleanup(t *tomb.Tomb) { + if r.Config.Preview.PipelineArchV2 { + r.registerCleanupV2(t) + } else { + r.registerCleanupV1(t) + } +} + +func (r *Runtime) registerCleanupV1(t *tomb.Tomb) { + ls := r.lifecycleService.(*lifecycle.Service) t.Go(func() error { <-t.Dying() // start cleanup with a fresh context @@ -423,12 +448,12 @@ func (r *Runtime) registerCleanup(t *tomb.Tomb) { // t.Err() can be nil, when we had a call: t.Kill(nil) // t.Err() will be context.Canceled, if the tomb's context was canceled if t.Err() == nil || cerrors.Is(t.Err(), context.Canceled) { - r.lifecycleService.StopAll(ctx, pipeline.ErrGracefulShutdown) + ls.StopAll(ctx, pipeline.ErrGracefulShutdown) } else { // tomb died due to a real error - r.lifecycleService.StopAll(ctx, cerrors.Errorf("conduit experienced an error: %w", t.Err())) + ls.StopAll(ctx, cerrors.Errorf("conduit experienced an error: %w", t.Err())) } - err := r.lifecycleService.Wait(exitTimeout) + err := ls.Wait(exitTimeout) t.Go(func() error { r.connectorPersister.Wait() return r.DB.Close() @@ -437,6 +462,62 @@ func (r *Runtime) registerCleanup(t *tomb.Tomb) { }) } +func (r *Runtime) registerCleanupV2(t *tomb.Tomb) { + ls := r.lifecycleService.(*lifecycle_v2.Service) + t.Go(func() error { + <-t.Dying() + // start cleanup with a fresh context + ctx := context.Background() + + err := ls.StopAll(ctx, false) + if err != nil { + r.logger.Err(ctx, err).Msg("some pipelines stopped with an error") + } + + // Wait for the pipelines to stop + const ( + count = 6 + interval = exitTimeout / count + ) + + pipelinesStopped := make(chan struct{}) + go func() { + for i := count; i > 0; i-- { + if i == 1 { + // on last try, stop forcefully + _ = ls.StopAll(ctx, true) + } + + r.logger.Info(ctx).Msgf("waiting for pipelines to stop running (time left: %s)", time.Duration(i)*interval) + select { + case <-time.After(interval): + case <-pipelinesStopped: + return + } + } + }() + + err = ls.Wait(exitTimeout) + switch { + case err != nil && err != context.DeadlineExceeded: + r.logger.Warn(ctx).Err(err).Msg("some pipelines stopped with an error") + case err == context.DeadlineExceeded: + r.logger.Warn(ctx).Msg("some pipelines did not stop in time") + default: + r.logger.Info(ctx).Msg("all pipelines stopped gracefully") + } + + pipelinesStopped <- struct{}{} + + t.Go(func() error { + r.connectorPersister.Wait() + return r.DB.Close() + }) + + return nil + }) +} + func (r *Runtime) newHTTPMetricsHandler() http.Handler { return promhttp.Handler() } @@ -770,13 +851,25 @@ func (r *Runtime) initServices(ctx context.Context, t *tomb.Tomb) error { } if r.Config.Pipelines.ExitOnDegraded { - r.lifecycleService.OnFailure(func(e lifecycle.FailureEvent) { - r.logger.Warn(ctx). - Err(e.Error). - Str(log.PipelineIDField, e.ID). - Msg("Conduit will shut down due to a pipeline failure and 'exit-on-degraded' enabled") - t.Kill(cerrors.Errorf("shut down due to 'exit-on-degraded' error: %w", e.Error)) - }) + if r.Config.Preview.PipelineArchV2 { + ls := r.lifecycleService.(*lifecycle_v2.Service) + ls.OnFailure(func(e lifecycle_v2.FailureEvent) { + r.logger.Warn(ctx). + Err(e.Error). + Str(log.PipelineIDField, e.ID). + Msg("Conduit will shut down due to a pipeline failure and 'exit-on-degraded' enabled") + t.Kill(cerrors.Errorf("shut down due to 'exit-on-degraded' error: %w", e.Error)) + }) + } else { + ls := r.lifecycleService.(*lifecycle.Service) + ls.OnFailure(func(e lifecycle.FailureEvent) { + r.logger.Warn(ctx). + Err(e.Error). + Str(log.PipelineIDField, e.ID). + Msg("Conduit will shut down due to a pipeline failure and 'exit-on-degraded' enabled") + t.Kill(cerrors.Errorf("shut down due to 'exit-on-degraded' error: %w", e.Error)) + }) + } } err = r.pipelineService.Init(ctx) if err != nil { diff --git a/pkg/connector/source.go b/pkg/connector/source.go index 3ce1820d1..1b84161dd 100644 --- a/pkg/connector/source.go +++ b/pkg/connector/source.go @@ -16,7 +16,9 @@ package connector import ( "context" + "strconv" "sync" + "time" "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-protocol/pconnector" @@ -153,6 +155,7 @@ func (s *Source) Teardown(ctx context.Context) error { return plugin.ErrPluginNotRunning } + s.Instance.logger.Debug(ctx).Msg("closing stream") // close stream if s.stopStream != nil { s.stopStream() @@ -192,8 +195,9 @@ func (s *Source) Read(ctx context.Context) ([]opencdc.Record, error) { return nil, err } + now := strconv.FormatInt(time.Now().UnixNano(), 10) for _, r := range resp.Records { - s.sanitizeRecord(&r) + s.sanitizeRecord(&r, now) } s.Instance.inspector.Send(ctx, resp.Records) @@ -375,7 +379,7 @@ func (s *Source) triggerLifecycleEvent(ctx context.Context, oldConfig, newConfig } } -func (s *Source) sanitizeRecord(r *opencdc.Record) { +func (s *Source) sanitizeRecord(r *opencdc.Record, now string) { if r.Key == nil { r.Key = opencdc.RawData{} } @@ -385,12 +389,19 @@ func (s *Source) sanitizeRecord(r *opencdc.Record) { if r.Payload.After == nil { r.Payload.After = opencdc.RawData{} } - if r.Metadata == nil { - r.Metadata = opencdc.Metadata{} + r.Metadata = opencdc.Metadata{ + opencdc.MetadataReadAt: now, + opencdc.MetadataConduitSourceConnectorID: s.Instance.ID, + } + } else { + if r.Metadata[opencdc.MetadataReadAt] == "" { + r.Metadata[opencdc.MetadataReadAt] = now + } + if r.Metadata[opencdc.MetadataConduitSourceConnectorID] == "" { + r.Metadata[opencdc.MetadataConduitSourceConnectorID] = s.Instance.ID + } } - // source connector ID is added to all records - r.Metadata.SetConduitSourceConnectorID(s.Instance.ID) } func (*Source) isEqual(cfg1, cfg2 map[string]string) bool { diff --git a/pkg/foundation/metrics/metrics.go b/pkg/foundation/metrics/metrics.go index 60cb0971c..1d9871b29 100644 --- a/pkg/foundation/metrics/metrics.go +++ b/pkg/foundation/metrics/metrics.go @@ -405,14 +405,18 @@ func (mt *labeledHistogram) WithValues(vs ...string) Histogram { // RecordBytesHistogram wraps a histrogram metric and allows to observe record // sizes in bytes. type RecordBytesHistogram struct { - h Histogram + H Histogram } func NewRecordBytesHistogram(h Histogram) RecordBytesHistogram { - return RecordBytesHistogram{h} + return RecordBytesHistogram{H: h} } func (m RecordBytesHistogram) Observe(r opencdc.Record) { + m.H.Observe(m.SizeOf(r)) +} + +func (m RecordBytesHistogram) SizeOf(r opencdc.Record) float64 { // TODO for now we call method Bytes() on key and payload to get the // bytes representation. In case of a structured payload or key it // is marshaled into JSON, which might not be the correct way to @@ -429,5 +433,5 @@ func (m RecordBytesHistogram) Observe(r opencdc.Record) { if r.Payload.After != nil { bytes += len(r.Payload.After.Bytes()) } - m.h.Observe(float64(bytes)) + return float64(bytes) } diff --git a/pkg/lifecycle-poc/funnel/batch.go b/pkg/lifecycle-poc/funnel/batch.go new file mode 100644 index 000000000..1b3e4ef1f --- /dev/null +++ b/pkg/lifecycle-poc/funnel/batch.go @@ -0,0 +1,178 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:generate stringer -type=RecordFlag -linecomment + +package funnel + +import ( + "fmt" + "slices" + + "github.com/conduitio/conduit-commons/opencdc" +) + +// Batch represents a batch of records that are processed together. It keeps +// track of the status of each record in the batch, and provides methods to +// update the status of records. +type Batch struct { + records []opencdc.Record + recordStatuses []RecordStatus + positions []opencdc.Position + + // filterCount is updated any time a record is marked as filtered, to make it + // easier to construct the set of active records. + filterCount int + + // If a batch is tainted it means that parts need to be either nacked or + // retried. Such a batch needs to be split into multiple batches, each + // containing only records with the same status (filtered counts as acked). + tainted bool +} + +func NewBatch(records []opencdc.Record) *Batch { + // Store positions separately, as we need the original positions when acking + // records in the source, in case a processor tries to change the position. + positions := make([]opencdc.Position, len(records)) + for i, r := range records { + positions[i] = r.Position + } + + return &Batch{ + records: records, + recordStatuses: make([]RecordStatus, len(records)), + positions: positions, + filterCount: 0, + tainted: false, + } +} + +// Nack marks the record at index i as nacked. If multiple errors are provided, +// they are assigned to the records starting at index i. +func (b *Batch) Nack(i int, errs ...error) { + b.setFlagWithErr(RecordFlagNack, i, errs) + b.tainted = true +} + +// Retry marks the record at index i to be retried. If a second index is +// provided, all records between i and j are marked as acked. If multiple +// indices are provided, the method panics. +func (b *Batch) Retry(i int, j ...int) { + b.setFlagNoErr(RecordFlagRetry, i, j...) + b.tainted = true +} + +// Filter marks the record at index i as filtered out. If a second index is +// provided, all records between i and j are marked as filtered. If multiple +// indices are provided, the method panics. +func (b *Batch) Filter(i int, j ...int) { + b.setFlagNoErr(RecordFlagFilter, i, j...) + end := i + 1 + if len(j) == 1 { + end = j[0] + } + b.filterCount += end - i +} + +// SetRecords replaces the records in the batch starting at index i with the +// provided records. +func (b *Batch) SetRecords(i int, recs []opencdc.Record) { + copy(b.records[i:], recs) +} + +func (b *Batch) setFlagNoErr(f RecordFlag, i int, j ...int) { + switch len(j) { + case 0: + b.recordStatuses[i].Flag = f + b.recordStatuses[i].Error = nil + case 1: + if i >= j[0] { + panic(fmt.Sprintf("invalid range (%d >= %d)", i, j[0])) + } + for k := i; k < j[0]; k++ { + b.recordStatuses[k].Flag = f + b.recordStatuses[k].Error = nil + } + default: + panic(fmt.Sprintf("too many arguments (%d)", len(j))) + } +} + +func (b *Batch) setFlagWithErr(f RecordFlag, i int, errs []error) { + for k, err := range errs { + b.recordStatuses[i+k].Flag = f + b.recordStatuses[i+k].Error = err + } +} + +func (b *Batch) clone() *Batch { + records := make([]opencdc.Record, len(b.records)) + for i, r := range b.records { + records[i] = r.Clone() + } + + return &Batch{ + records: records, + recordStatuses: slices.Clone(b.recordStatuses), + positions: b.positions, + tainted: b.tainted, + filterCount: b.filterCount, + } +} + +func (b *Batch) sub(from, to int) *Batch { + filterCount := 0 + for _, status := range b.recordStatuses[from:to] { + if status.Flag == RecordFlagFilter { + filterCount++ + } + } + return &Batch{ + records: b.records[from:to], + recordStatuses: b.recordStatuses[from:to], + positions: b.positions[from:to], + filterCount: filterCount, + tainted: false, + } +} + +// ActiveRecords returns the records that are not filtered. +func (b *Batch) ActiveRecords() []opencdc.Record { + if b.filterCount == 0 { + return b.records + } + active := make([]opencdc.Record, 0, len(b.records)-b.filterCount) + for i, r := range b.records { + if b.recordStatuses[i].Flag != RecordFlagFilter { + active = append(active, r) + } + } + return active +} + +// RecordStatus holds the status of a record in a batch. The flag indicates the +// status of the record, and the error is set if the record was nacked. +type RecordStatus struct { + Flag RecordFlag + Error error +} + +type RecordFlag int + +const ( + RecordFlagAck RecordFlag = iota // ack + RecordFlagNack // nack + RecordFlagRetry // retry + RecordFlagFilter // filter +) diff --git a/pkg/lifecycle-poc/funnel/destination.go b/pkg/lifecycle-poc/funnel/destination.go new file mode 100644 index 000000000..4c0932ff9 --- /dev/null +++ b/pkg/lifecycle-poc/funnel/destination.go @@ -0,0 +1,145 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:generate mockgen -typed -destination=mock/destination.go -package=mock -mock_names=Destination=Destination . Destination + +package funnel + +import ( + "bytes" + "context" + "time" + + "github.com/conduitio/conduit-commons/opencdc" + "github.com/conduitio/conduit/pkg/connector" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/foundation/metrics" +) + +type DestinationTask struct { + id string + destination Destination + logger log.CtxLogger + + timer metrics.Timer + histogram metrics.RecordBytesHistogram +} + +type Destination interface { + ID() string + Open(context.Context) error + Write(context.Context, []opencdc.Record) error + Ack(context.Context) ([]connector.DestinationAck, error) + Teardown(context.Context) error + // TODO figure out if we want to handle these errors. This returns errors + // coming from the persister, which persists the connector asynchronously. + // Are we even interested in these errors in the pipeline? Sounds like + // something we could surface and handle globally in the runtime instead. + Errors() <-chan error +} + +func NewDestinationTask( + id string, + destination Destination, + logger log.CtxLogger, + timer metrics.Timer, + histogram metrics.Histogram, +) *DestinationTask { + logger = logger.WithComponent("task:destination") + logger.Logger = logger.With().Str(log.ConnectorIDField, id).Logger() + return &DestinationTask{ + id: id, + destination: destination, + logger: logger, + timer: timer, + histogram: metrics.NewRecordBytesHistogram(histogram), + } +} + +func (t *DestinationTask) ID() string { + return t.id +} + +func (t *DestinationTask) Open(ctx context.Context) error { + t.logger.Debug(ctx).Msg("opening destination") + err := t.destination.Open(ctx) + if err != nil { + return cerrors.Errorf("failed to open destination connector: %w", err) + } + t.logger.Debug(ctx).Msg("destination open") + return nil +} + +func (t *DestinationTask) Close(ctx context.Context) error { + return t.destination.Teardown(ctx) +} + +func (t *DestinationTask) Do(ctx context.Context, batch *Batch) error { + records := batch.ActiveRecords() + positions := make([]opencdc.Position, len(records)) + for i, rec := range records { + positions[i] = rec.Position + } + + start := time.Now() + err := t.destination.Write(ctx, records) + if err != nil { + return cerrors.Errorf("failed to write %d records to destination: %w", len(positions), err) + } + + acks := make([]connector.DestinationAck, 0, len(positions)) + for range len(positions) { + acksResp, err := t.destination.Ack(ctx) + if err != nil { + return cerrors.Errorf("failed to receive acks for %d records from destination: %w", len(positions), err) + } + t.observeMetrics(records[len(acks):len(acks)+len(acksResp)], start) + acks = append(acks, acksResp...) + if len(acks) >= len(positions) { + break + } + } + if len(acks) != len(positions) { + return cerrors.Errorf("received %d acks, but expected %d", len(acks), len(positions)) + } + + for i, ack := range acks { + if !bytes.Equal(positions[i], ack.Position) { + // TODO is this a fatal error? Looks like a bug in the connector + return cerrors.Errorf("received unexpected ack, expected position %q but got %q", positions[i], ack.Position) + } + if ack.Error != nil { + batch.Nack(i, ack.Error) + } + } + + return nil +} + +func (t *DestinationTask) observeMetrics(records []opencdc.Record, start time.Time) { + // Precalculate sizes so that we don't need to hold a reference to records + // and observations can happen in a goroutine. + sizes := make([]float64, len(records)) + for i, rec := range records { + sizes[i] = t.histogram.SizeOf(rec) + } + tookPerRecord := time.Since(start) / time.Duration(len(sizes)) + go func() { + for i := range len(sizes) { + t.timer.Update(tookPerRecord) + t.histogram.H.Observe(sizes[i]) + } + }() +} diff --git a/pkg/lifecycle-poc/funnel/dlq.go b/pkg/lifecycle-poc/funnel/dlq.go new file mode 100644 index 000000000..31d101783 --- /dev/null +++ b/pkg/lifecycle-poc/funnel/dlq.go @@ -0,0 +1,244 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package funnel + +import ( + "context" + "sync" + "time" + + "github.com/conduitio/conduit-commons/opencdc" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/foundation/metrics" +) + +type DLQ struct { + task *DestinationTask + + windowSize int + windowNackThreshold int + + // window keeps track of the last N acks and nacks + window *dlqWindow + m sync.Mutex +} + +func NewDLQ( + id string, + destination Destination, + logger log.CtxLogger, + timer metrics.Timer, + histogram metrics.Histogram, + + windowSize int, + windowNackThreshold int, +) *DLQ { + return &DLQ{ + task: NewDestinationTask(id, destination, logger, timer, histogram), + windowSize: windowSize, + windowNackThreshold: windowNackThreshold, + + window: newDLQWindow(windowSize, windowNackThreshold), + } +} + +func (d *DLQ) ID() string { + return d.task.id +} + +func (d *DLQ) Open(ctx context.Context) error { + return d.task.Open(ctx) +} + +func (d *DLQ) Close(ctx context.Context) error { + return d.task.Close(ctx) +} + +func (d *DLQ) Ack(_ context.Context, batch *Batch) { + if len(batch.records) == 0 { + return + } + + d.m.Lock() + defer d.m.Unlock() + + d.window.Ack(len(batch.records)) +} + +func (d *DLQ) Nack(ctx context.Context, batch *Batch, taskID string) (int, error) { + if len(batch.records) == 0 { + return 0, nil + } + + d.m.Lock() + defer d.m.Unlock() + + nacked := d.window.Nack(len(batch.records)) + + if nacked > 0 { + b := batch + if nacked < len(batch.records) { + b = batch.sub(0, nacked) + } + + // The window successfully accepted nacks (at least some of them), send + // them to the DLQ. + successCount, err := d.sendToDLQ(ctx, b, taskID) + if err != nil { + // The DLQ write failed, we need to stop the pipeline, as recovering + // could lead to an endless loop of restarts. + return successCount, cerrors.FatalError(err) + } + } + if nacked < len(batch.records) { + // Not all records were nacked, we need to return an error. + if d.windowNackThreshold > 0 { + // If the threshold is greater than 0 the DLQ is enabled and we + // need to respect the threshold by stopping the pipeline with a + // fatal error. + return nacked, cerrors.FatalError( + cerrors.Errorf( + "DLQ nack threshold exceeded (%d/%d), original error: %w", + d.windowNackThreshold, d.windowSize, batch.recordStatuses[nacked].Error, + ), + ) + } + // DLQ is disabled, we don't need to wrap the error message + return nacked, batch.recordStatuses[nacked].Error + } + + return nacked, nil +} + +func (d *DLQ) sendToDLQ(ctx context.Context, batch *Batch, taskID string) (int, error) { + // Create a new batch with the DLQ records and write it to the destination. + dlqRecords := make([]opencdc.Record, len(batch.records)) + for i, req := range batch.records { + dlqRecords[i] = d.dlqRecord(req, batch.recordStatuses[i], taskID) + } + dlqBatch := NewBatch(dlqRecords) + + err := d.task.Do(ctx, dlqBatch) + if err != nil { + return 0, cerrors.Errorf("failed to write %d records to the DLQ: %w", len(dlqRecords), err) + } + + ackCount := 0 + for ackCount < len(dlqRecords) && dlqBatch.recordStatuses[ackCount].Flag == RecordFlagAck { + ackCount++ + } + if ackCount < len(dlqRecords) { + // Not all records were acked, we need to return an error. + return ackCount, cerrors.Errorf("failed to write record %d to the DLQ: %w", ackCount, dlqBatch.recordStatuses[ackCount].Error) + } + + return ackCount, nil +} + +func (d *DLQ) dlqRecord(r opencdc.Record, status RecordStatus, taskID string) opencdc.Record { + out := opencdc.Record{ + Position: r.Position, + Operation: opencdc.OperationCreate, + Metadata: opencdc.Metadata{}, + Key: nil, + Payload: opencdc.Change{ + Before: nil, + After: opencdc.StructuredData(r.Map()), // failed record is stored here + }, + } + + connID, _ := r.Metadata.GetConduitSourceConnectorID() + + out.Metadata.SetCreatedAt(time.Now()) + out.Metadata.SetConduitSourceConnectorID(connID) + out.Metadata.SetConduitDLQNackError(status.Error.Error()) + out.Metadata.SetConduitDLQNackNodeID(taskID) // TODO rename to DLQNackTaskID + return out +} + +// dlqWindow is responsible for tracking the last N nacks/acks and enforce a +// threshold of nacks that should not be exceeded. +type dlqWindow struct { + // window acts as a ring buffer for storing acks/nacks (true = nack). + // When initialized it contains only acks. + window []bool + // cursor is the index pointing to the last message in the window. + cursor int + // nackThreshold represents the number of tolerated nacks, if the threshold + // is exceeded the window is frozen and returns an error for all further + // nacks. + nackThreshold int + + ackCount int + nackCount int +} + +func newDLQWindow(size, threshold int) *dlqWindow { + if size > 0 && threshold == 0 { + // optimization - if threshold is 0 the window size does not matter, + // setting it to 1 ensures we don't use more memory than needed + size = 1 + } + return &dlqWindow{ + window: make([]bool, size), + cursor: 0, + nackThreshold: threshold, + + ackCount: size, + nackCount: 0, + } +} + +// Ack stores an ack in the window. +func (w *dlqWindow) Ack(count int) { + _ = w.store(count, false) +} + +// Nack stores a nack in the window and returns true (ok). If the nack threshold +// gets exceeded the window will be frozen and will return false for all further +// calls to Nack. +func (w *dlqWindow) Nack(count int) int { + return w.store(count, true) +} + +func (w *dlqWindow) store(count int, nacked bool) int { + if len(w.window) == 0 || w.nackThreshold < w.nackCount { + return 0 // window disabled or threshold already reached + } + + for i := range count { + // move cursor before updating the window + w.cursor = (w.cursor + 1) % len(w.window) + if w.window[w.cursor] == nacked { + continue // the old message has the same status, nothing changes + } + + w.window[w.cursor] = nacked + switch nacked { + case false: + w.nackCount-- + w.ackCount++ + case true: + w.nackCount++ + w.ackCount-- + if w.nackThreshold < w.nackCount { + return i + } + } + } + + return count +} diff --git a/pkg/lifecycle-poc/funnel/funnel_test.go b/pkg/lifecycle-poc/funnel/funnel_test.go new file mode 100644 index 000000000..f48183c89 --- /dev/null +++ b/pkg/lifecycle-poc/funnel/funnel_test.go @@ -0,0 +1,330 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package funnel + +import ( + "context" + "fmt" + "strconv" + "testing" + "time" + + "github.com/conduitio/conduit-commons/csync" + "github.com/conduitio/conduit-commons/opencdc" + "github.com/conduitio/conduit/pkg/connector" + "github.com/conduitio/conduit/pkg/foundation/ctxutil" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/foundation/metrics/noop" + funnelmock "github.com/conduitio/conduit/pkg/lifecycle-poc/funnel/mock" + "github.com/rs/zerolog" + "go.uber.org/mock/gomock" +) + +func Example_simpleStream() { + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + + logger := newLogger() + ctrl := gomockCtrl(logger) + + batchCount := 10 + batchSize := 1 + + dlq := NewDLQ( + "dlq", + noopDLQDestination(ctrl), + logger, + noop.Timer{}, + noop.Histogram{}, + 1, + 0, + ) + srcTask := NewSourceTask( + "generator", + generatorSource(ctrl, logger, "generator", batchSize, batchCount), + logger, + noop.Timer{}, + noop.Histogram{}, + ) + destTask := NewDestinationTask( + "printer", + printerDestination(ctrl, logger, "printer", batchSize), + logger, + noop.Timer{}, + noop.Histogram{}, + ) + + w, err := NewWorker( + []Task{srcTask, destTask}, + [][]int{{1}, {}}, + dlq, + logger, + noop.Timer{}, + ) + if err != nil { + panic(err) + } + + err = w.Open(ctx) + if err != nil { + panic(err) + } + + var wg csync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.Do(ctx) + if err != nil { + panic(err) + } + }() + + // stop node after 150ms, which should be enough to process the 10 messages + time.AfterFunc(150*time.Millisecond, func() { _ = w.Stop(ctx) }) + + if err := wg.WaitTimeout(ctx, time.Second); err != nil { + killAll() + } else { + logger.Info(ctx).Msg("finished successfully") + } + + err = w.Close(ctx) + if err != nil { + panic(err) + } + + // Output: + // DBG opening source component=task:source connector_id=generator + // DBG source open component=task:source connector_id=generator + // DBG opening destination component=task:destination connector_id=printer + // DBG destination open component=task:destination connector_id=printer + // DBG opening destination component=task:destination connector_id=dlq + // DBG destination open component=task:destination connector_id=dlq + // DBG got record node_id=printer position=generator/0 + // DBG received ack node_id=generator + // DBG got record node_id=printer position=generator/1 + // DBG received ack node_id=generator + // DBG got record node_id=printer position=generator/2 + // DBG received ack node_id=generator + // DBG got record node_id=printer position=generator/3 + // DBG received ack node_id=generator + // DBG got record node_id=printer position=generator/4 + // DBG received ack node_id=generator + // DBG got record node_id=printer position=generator/5 + // DBG received ack node_id=generator + // DBG got record node_id=printer position=generator/6 + // DBG received ack node_id=generator + // DBG got record node_id=printer position=generator/7 + // DBG received ack node_id=generator + // DBG got record node_id=printer position=generator/8 + // DBG received ack node_id=generator + // DBG got record node_id=printer position=generator/9 + // DBG received ack node_id=generator + // INF finished successfully +} + +func BenchmarkStreamNew(b *testing.B) { + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + + logger := log.Nop() + ctrl := gomockCtrl(logger) + + b.ReportAllocs() + b.StopTimer() + for i := 0; i < b.N; i++ { + batchCount := 100 + batchSize := 1000 + + dlq := NewDLQ( + "dlq", + noopDLQDestination(ctrl), + logger, + noop.Timer{}, + noop.Histogram{}, + 1, + 0, + ) + srcTask := NewSourceTask( + "generator", + generatorSource(ctrl, logger, "generator", batchSize, batchCount), + logger, + noop.Timer{}, + noop.Histogram{}, + ) + destTask := NewDestinationTask( + "printer", + printerDestination(ctrl, logger, "printer", batchSize), + logger, + noop.Timer{}, + noop.Histogram{}, + ) + + w, err := NewWorker( + []Task{srcTask, destTask}, + [][]int{{1}, {}}, + dlq, + logger, + noop.Timer{}, + ) + if err != nil { + panic(err) + } + + b.StartTimer() + + var wg csync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.Do(ctx) + if err != nil { + panic(err) + } + }() + + // stop node after 150ms, which should be enough to process the 10 messages + time.AfterFunc(150*time.Millisecond, func() { _ = w.Stop(ctx) }) + + if err := wg.WaitTimeout(ctx, time.Second); err != nil { + killAll() + } + + err = w.Close(ctx) + if err != nil { + panic(err) + } + + b.StopTimer() + } +} + +func newLogger() log.CtxLogger { + w := zerolog.NewConsoleWriter() + w.NoColor = true + w.PartsExclude = []string{zerolog.TimestampFieldName} + + zlogger := zerolog.New(w) + zlogger = zlogger.Level(zerolog.DebugLevel) + logger := log.New(zlogger) + logger.Logger = logger.Hook(ctxutil.MessageIDLogCtxHook{}) + + return logger +} + +func generatorSource(ctrl *gomock.Controller, logger log.CtxLogger, nodeID string, batchSize, batchCount int) Source { + position := 0 + + teardown := make(chan struct{}) + source := funnelmock.NewSource(ctrl) + source.EXPECT().ID().Return(nodeID).AnyTimes() + source.EXPECT().Open(gomock.Any()).Return(nil) + source.EXPECT().Teardown(gomock.Any()).DoAndReturn(func(context.Context) error { + close(teardown) + return nil + }) + source.EXPECT().Ack(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, p []opencdc.Position) error { + logger.Debug(ctx).Str("node_id", nodeID).Msg("received ack") + return nil + }).Times(batchCount * batchSize) + source.EXPECT().Read(gomock.Any()).DoAndReturn(func(ctx context.Context) ([]opencdc.Record, error) { + if position == batchCount*batchSize { + // block until Teardown is called + <-teardown + return nil, context.Canceled + } + + recs := make([]opencdc.Record, batchSize) + for i := 0; i < batchSize; i++ { + recs[i] = opencdc.Record{ + Metadata: opencdc.Metadata{ + opencdc.MetadataConduitSourceConnectorID: nodeID, + }, + Position: opencdc.Position(strconv.Itoa(position)), + } + position++ + } + + return recs, nil + }).MinTimes(batchCount + 1) + source.EXPECT().Errors().Return(make(chan error)) + + return source +} + +func printerDestination(ctrl *gomock.Controller, logger log.CtxLogger, nodeID string, batchSize int) Destination { + var lastPosition opencdc.Position + _ = lastPosition + rchan := make(chan opencdc.Record, batchSize) + destination := funnelmock.NewDestination(ctrl) + destination.EXPECT().Open(gomock.Any()).Return(nil) + destination.EXPECT().Write(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, recs []opencdc.Record) error { + for _, r := range recs { + connID, _ := r.Metadata.GetConduitSourceConnectorID() + logger.Debug(ctx). + Str("position", fmt.Sprintf("%s/%s", connID, r.Position)). + Str("node_id", nodeID). + Msg("got record") + lastPosition = r.Position + rchan <- r + } + return nil + }).AnyTimes() + destination.EXPECT().Ack(gomock.Any()).DoAndReturn(func(ctx context.Context) ([]connector.DestinationAck, error) { + acks := make([]connector.DestinationAck, 0, batchSize) + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case r, ok := <-rchan: + if !ok { + return nil, nil + } + acks = append(acks, connector.DestinationAck{Position: r.Position}) + default: + return acks, nil + } + } + }).AnyTimes() + destination.EXPECT().Teardown(gomock.Any()).DoAndReturn(func(ctx context.Context) error { + close(rchan) + return nil + }) + destination.EXPECT().Errors().Return(make(chan error)) + + return destination +} + +func noopDLQDestination(ctrl *gomock.Controller) Destination { + destination := funnelmock.NewDestination(ctrl) + destination.EXPECT().Open(gomock.Any()).Return(nil) + destination.EXPECT().Teardown(gomock.Any()).Return(nil) + return destination +} + +func gomockCtrl(logger log.CtxLogger) *gomock.Controller { + return gomock.NewController(gomockLogger(logger)) +} + +type gomockLogger log.CtxLogger + +func (g gomockLogger) Errorf(format string, args ...interface{}) { + g.Error().Msgf(format, args...) +} + +func (g gomockLogger) Fatalf(format string, args ...interface{}) { + g.Fatal().Msgf(format, args...) +} diff --git a/pkg/lifecycle-poc/funnel/mock/destination.go b/pkg/lifecycle-poc/funnel/mock/destination.go new file mode 100644 index 000000000..11ab25c6a --- /dev/null +++ b/pkg/lifecycle-poc/funnel/mock/destination.go @@ -0,0 +1,272 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/conduitio/conduit/pkg/lifecycle-poc/funnel (interfaces: Destination) +// +// Generated by this command: +// +// mockgen -typed -destination=mock/destination.go -package=mock -mock_names=Destination=Destination . Destination +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + opencdc "github.com/conduitio/conduit-commons/opencdc" + connector "github.com/conduitio/conduit/pkg/connector" + gomock "go.uber.org/mock/gomock" +) + +// Destination is a mock of Destination interface. +type Destination struct { + ctrl *gomock.Controller + recorder *DestinationMockRecorder + isgomock struct{} +} + +// DestinationMockRecorder is the mock recorder for Destination. +type DestinationMockRecorder struct { + mock *Destination +} + +// NewDestination creates a new mock instance. +func NewDestination(ctrl *gomock.Controller) *Destination { + mock := &Destination{ctrl: ctrl} + mock.recorder = &DestinationMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *Destination) EXPECT() *DestinationMockRecorder { + return m.recorder +} + +// Ack mocks base method. +func (m *Destination) Ack(arg0 context.Context) ([]connector.DestinationAck, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Ack", arg0) + ret0, _ := ret[0].([]connector.DestinationAck) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Ack indicates an expected call of Ack. +func (mr *DestinationMockRecorder) Ack(arg0 any) *DestinationAckCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ack", reflect.TypeOf((*Destination)(nil).Ack), arg0) + return &DestinationAckCall{Call: call} +} + +// DestinationAckCall wrap *gomock.Call +type DestinationAckCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *DestinationAckCall) Return(arg0 []connector.DestinationAck, arg1 error) *DestinationAckCall { + c.Call = c.Call.Return(arg0, arg1) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *DestinationAckCall) Do(f func(context.Context) ([]connector.DestinationAck, error)) *DestinationAckCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *DestinationAckCall) DoAndReturn(f func(context.Context) ([]connector.DestinationAck, error)) *DestinationAckCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// Errors mocks base method. +func (m *Destination) Errors() <-chan error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Errors") + ret0, _ := ret[0].(<-chan error) + return ret0 +} + +// Errors indicates an expected call of Errors. +func (mr *DestinationMockRecorder) Errors() *DestinationErrorsCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Errors", reflect.TypeOf((*Destination)(nil).Errors)) + return &DestinationErrorsCall{Call: call} +} + +// DestinationErrorsCall wrap *gomock.Call +type DestinationErrorsCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *DestinationErrorsCall) Return(arg0 <-chan error) *DestinationErrorsCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *DestinationErrorsCall) Do(f func() <-chan error) *DestinationErrorsCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *DestinationErrorsCall) DoAndReturn(f func() <-chan error) *DestinationErrorsCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// ID mocks base method. +func (m *Destination) ID() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ID") + ret0, _ := ret[0].(string) + return ret0 +} + +// ID indicates an expected call of ID. +func (mr *DestinationMockRecorder) ID() *DestinationIDCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ID", reflect.TypeOf((*Destination)(nil).ID)) + return &DestinationIDCall{Call: call} +} + +// DestinationIDCall wrap *gomock.Call +type DestinationIDCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *DestinationIDCall) Return(arg0 string) *DestinationIDCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *DestinationIDCall) Do(f func() string) *DestinationIDCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *DestinationIDCall) DoAndReturn(f func() string) *DestinationIDCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// Open mocks base method. +func (m *Destination) Open(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Open", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Open indicates an expected call of Open. +func (mr *DestinationMockRecorder) Open(arg0 any) *DestinationOpenCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*Destination)(nil).Open), arg0) + return &DestinationOpenCall{Call: call} +} + +// DestinationOpenCall wrap *gomock.Call +type DestinationOpenCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *DestinationOpenCall) Return(arg0 error) *DestinationOpenCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *DestinationOpenCall) Do(f func(context.Context) error) *DestinationOpenCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *DestinationOpenCall) DoAndReturn(f func(context.Context) error) *DestinationOpenCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// Teardown mocks base method. +func (m *Destination) Teardown(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Teardown", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Teardown indicates an expected call of Teardown. +func (mr *DestinationMockRecorder) Teardown(arg0 any) *DestinationTeardownCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Teardown", reflect.TypeOf((*Destination)(nil).Teardown), arg0) + return &DestinationTeardownCall{Call: call} +} + +// DestinationTeardownCall wrap *gomock.Call +type DestinationTeardownCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *DestinationTeardownCall) Return(arg0 error) *DestinationTeardownCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *DestinationTeardownCall) Do(f func(context.Context) error) *DestinationTeardownCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *DestinationTeardownCall) DoAndReturn(f func(context.Context) error) *DestinationTeardownCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// Write mocks base method. +func (m *Destination) Write(arg0 context.Context, arg1 []opencdc.Record) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Write", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Write indicates an expected call of Write. +func (mr *DestinationMockRecorder) Write(arg0, arg1 any) *DestinationWriteCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Write", reflect.TypeOf((*Destination)(nil).Write), arg0, arg1) + return &DestinationWriteCall{Call: call} +} + +// DestinationWriteCall wrap *gomock.Call +type DestinationWriteCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *DestinationWriteCall) Return(arg0 error) *DestinationWriteCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *DestinationWriteCall) Do(f func(context.Context, []opencdc.Record) error) *DestinationWriteCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *DestinationWriteCall) DoAndReturn(f func(context.Context, []opencdc.Record) error) *DestinationWriteCall { + c.Call = c.Call.DoAndReturn(f) + return c +} diff --git a/pkg/lifecycle-poc/funnel/mock/processor.go b/pkg/lifecycle-poc/funnel/mock/processor.go new file mode 100644 index 000000000..f6c404300 --- /dev/null +++ b/pkg/lifecycle-poc/funnel/mock/processor.go @@ -0,0 +1,157 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/conduitio/conduit/pkg/lifecycle-poc/funnel (interfaces: Processor) +// +// Generated by this command: +// +// mockgen -typed -destination=mock/processor.go -package=mock -mock_names=Processor=Processor . Processor +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + opencdc "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-processor-sdk" + gomock "go.uber.org/mock/gomock" +) + +// Processor is a mock of Processor interface. +type Processor struct { + ctrl *gomock.Controller + recorder *ProcessorMockRecorder + isgomock struct{} +} + +// ProcessorMockRecorder is the mock recorder for Processor. +type ProcessorMockRecorder struct { + mock *Processor +} + +// NewProcessor creates a new mock instance. +func NewProcessor(ctrl *gomock.Controller) *Processor { + mock := &Processor{ctrl: ctrl} + mock.recorder = &ProcessorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *Processor) EXPECT() *ProcessorMockRecorder { + return m.recorder +} + +// Open mocks base method. +func (m *Processor) Open(ctx context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Open", ctx) + ret0, _ := ret[0].(error) + return ret0 +} + +// Open indicates an expected call of Open. +func (mr *ProcessorMockRecorder) Open(ctx any) *ProcessorOpenCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*Processor)(nil).Open), ctx) + return &ProcessorOpenCall{Call: call} +} + +// ProcessorOpenCall wrap *gomock.Call +type ProcessorOpenCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *ProcessorOpenCall) Return(arg0 error) *ProcessorOpenCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *ProcessorOpenCall) Do(f func(context.Context) error) *ProcessorOpenCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *ProcessorOpenCall) DoAndReturn(f func(context.Context) error) *ProcessorOpenCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// Process mocks base method. +func (m *Processor) Process(arg0 context.Context, arg1 []opencdc.Record) []sdk.ProcessedRecord { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Process", arg0, arg1) + ret0, _ := ret[0].([]sdk.ProcessedRecord) + return ret0 +} + +// Process indicates an expected call of Process. +func (mr *ProcessorMockRecorder) Process(arg0, arg1 any) *ProcessorProcessCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Process", reflect.TypeOf((*Processor)(nil).Process), arg0, arg1) + return &ProcessorProcessCall{Call: call} +} + +// ProcessorProcessCall wrap *gomock.Call +type ProcessorProcessCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *ProcessorProcessCall) Return(arg0 []sdk.ProcessedRecord) *ProcessorProcessCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *ProcessorProcessCall) Do(f func(context.Context, []opencdc.Record) []sdk.ProcessedRecord) *ProcessorProcessCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *ProcessorProcessCall) DoAndReturn(f func(context.Context, []opencdc.Record) []sdk.ProcessedRecord) *ProcessorProcessCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// Teardown mocks base method. +func (m *Processor) Teardown(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Teardown", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Teardown indicates an expected call of Teardown. +func (mr *ProcessorMockRecorder) Teardown(arg0 any) *ProcessorTeardownCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Teardown", reflect.TypeOf((*Processor)(nil).Teardown), arg0) + return &ProcessorTeardownCall{Call: call} +} + +// ProcessorTeardownCall wrap *gomock.Call +type ProcessorTeardownCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *ProcessorTeardownCall) Return(arg0 error) *ProcessorTeardownCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *ProcessorTeardownCall) Do(f func(context.Context) error) *ProcessorTeardownCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *ProcessorTeardownCall) DoAndReturn(f func(context.Context) error) *ProcessorTeardownCall { + c.Call = c.Call.DoAndReturn(f) + return c +} diff --git a/pkg/lifecycle-poc/funnel/mock/source.go b/pkg/lifecycle-poc/funnel/mock/source.go new file mode 100644 index 000000000..30225260f --- /dev/null +++ b/pkg/lifecycle-poc/funnel/mock/source.go @@ -0,0 +1,271 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/conduitio/conduit/pkg/lifecycle-poc/funnel (interfaces: Source) +// +// Generated by this command: +// +// mockgen -typed -destination=mock/source.go -package=mock -mock_names=Source=Source . Source +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + opencdc "github.com/conduitio/conduit-commons/opencdc" + gomock "go.uber.org/mock/gomock" +) + +// Source is a mock of Source interface. +type Source struct { + ctrl *gomock.Controller + recorder *SourceMockRecorder + isgomock struct{} +} + +// SourceMockRecorder is the mock recorder for Source. +type SourceMockRecorder struct { + mock *Source +} + +// NewSource creates a new mock instance. +func NewSource(ctrl *gomock.Controller) *Source { + mock := &Source{ctrl: ctrl} + mock.recorder = &SourceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *Source) EXPECT() *SourceMockRecorder { + return m.recorder +} + +// Ack mocks base method. +func (m *Source) Ack(arg0 context.Context, arg1 []opencdc.Position) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Ack", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Ack indicates an expected call of Ack. +func (mr *SourceMockRecorder) Ack(arg0, arg1 any) *SourceAckCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ack", reflect.TypeOf((*Source)(nil).Ack), arg0, arg1) + return &SourceAckCall{Call: call} +} + +// SourceAckCall wrap *gomock.Call +type SourceAckCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *SourceAckCall) Return(arg0 error) *SourceAckCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *SourceAckCall) Do(f func(context.Context, []opencdc.Position) error) *SourceAckCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *SourceAckCall) DoAndReturn(f func(context.Context, []opencdc.Position) error) *SourceAckCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// Errors mocks base method. +func (m *Source) Errors() <-chan error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Errors") + ret0, _ := ret[0].(<-chan error) + return ret0 +} + +// Errors indicates an expected call of Errors. +func (mr *SourceMockRecorder) Errors() *SourceErrorsCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Errors", reflect.TypeOf((*Source)(nil).Errors)) + return &SourceErrorsCall{Call: call} +} + +// SourceErrorsCall wrap *gomock.Call +type SourceErrorsCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *SourceErrorsCall) Return(arg0 <-chan error) *SourceErrorsCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *SourceErrorsCall) Do(f func() <-chan error) *SourceErrorsCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *SourceErrorsCall) DoAndReturn(f func() <-chan error) *SourceErrorsCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// ID mocks base method. +func (m *Source) ID() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ID") + ret0, _ := ret[0].(string) + return ret0 +} + +// ID indicates an expected call of ID. +func (mr *SourceMockRecorder) ID() *SourceIDCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ID", reflect.TypeOf((*Source)(nil).ID)) + return &SourceIDCall{Call: call} +} + +// SourceIDCall wrap *gomock.Call +type SourceIDCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *SourceIDCall) Return(arg0 string) *SourceIDCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *SourceIDCall) Do(f func() string) *SourceIDCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *SourceIDCall) DoAndReturn(f func() string) *SourceIDCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// Open mocks base method. +func (m *Source) Open(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Open", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Open indicates an expected call of Open. +func (mr *SourceMockRecorder) Open(arg0 any) *SourceOpenCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*Source)(nil).Open), arg0) + return &SourceOpenCall{Call: call} +} + +// SourceOpenCall wrap *gomock.Call +type SourceOpenCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *SourceOpenCall) Return(arg0 error) *SourceOpenCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *SourceOpenCall) Do(f func(context.Context) error) *SourceOpenCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *SourceOpenCall) DoAndReturn(f func(context.Context) error) *SourceOpenCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// Read mocks base method. +func (m *Source) Read(arg0 context.Context) ([]opencdc.Record, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Read", arg0) + ret0, _ := ret[0].([]opencdc.Record) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Read indicates an expected call of Read. +func (mr *SourceMockRecorder) Read(arg0 any) *SourceReadCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Read", reflect.TypeOf((*Source)(nil).Read), arg0) + return &SourceReadCall{Call: call} +} + +// SourceReadCall wrap *gomock.Call +type SourceReadCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *SourceReadCall) Return(arg0 []opencdc.Record, arg1 error) *SourceReadCall { + c.Call = c.Call.Return(arg0, arg1) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *SourceReadCall) Do(f func(context.Context) ([]opencdc.Record, error)) *SourceReadCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *SourceReadCall) DoAndReturn(f func(context.Context) ([]opencdc.Record, error)) *SourceReadCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// Teardown mocks base method. +func (m *Source) Teardown(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Teardown", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Teardown indicates an expected call of Teardown. +func (mr *SourceMockRecorder) Teardown(arg0 any) *SourceTeardownCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Teardown", reflect.TypeOf((*Source)(nil).Teardown), arg0) + return &SourceTeardownCall{Call: call} +} + +// SourceTeardownCall wrap *gomock.Call +type SourceTeardownCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *SourceTeardownCall) Return(arg0 error) *SourceTeardownCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *SourceTeardownCall) Do(f func(context.Context) error) *SourceTeardownCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *SourceTeardownCall) DoAndReturn(f func(context.Context) error) *SourceTeardownCall { + c.Call = c.Call.DoAndReturn(f) + return c +} diff --git a/pkg/lifecycle-poc/funnel/processor.go b/pkg/lifecycle-poc/funnel/processor.go new file mode 100644 index 000000000..d03695cce --- /dev/null +++ b/pkg/lifecycle-poc/funnel/processor.go @@ -0,0 +1,163 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:generate mockgen -typed -destination=mock/processor.go -package=mock -mock_names=Processor=Processor . Processor + +package funnel + +import ( + "context" + "time" + + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-processor-sdk" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/foundation/metrics" +) + +type ProcessorTask struct { + id string + processor Processor + logger log.CtxLogger + timer metrics.Timer +} + +type Processor interface { + // Open configures and opens a processor plugin + Open(ctx context.Context) error + Process(context.Context, []opencdc.Record) []sdk.ProcessedRecord + // Teardown tears down a processor plugin. + // In case of standalone plugins, that means stopping the WASM module. + Teardown(context.Context) error +} + +func NewProcessorTask( + id string, + processor Processor, + logger log.CtxLogger, + timer metrics.Timer, +) *ProcessorTask { + logger = logger.WithComponent("task:processor") + logger.Logger = logger.With().Str(log.ProcessorIDField, id).Logger() + return &ProcessorTask{ + id: id, + processor: processor, + logger: logger, + timer: timer, + } +} + +func (t *ProcessorTask) ID() string { + return t.id +} + +func (t *ProcessorTask) Open(ctx context.Context) error { + t.logger.Debug(ctx).Msg("opening processor") + err := t.processor.Open(ctx) + if err != nil { + return cerrors.Errorf("failed to open processor: %w", err) + } + t.logger.Debug(ctx).Msg("processor open") + return nil +} + +func (t *ProcessorTask) Close(ctx context.Context) error { + t.logger.Debug(ctx).Msg("tearing down processor") + return t.processor.Teardown(ctx) +} + +func (t *ProcessorTask) Do(ctx context.Context, b *Batch) error { + start := time.Now() + recsIn := b.ActiveRecords() + recsOut := t.processor.Process(ctx, recsIn) + + if len(recsOut) == 0 { + return cerrors.Errorf("processor didn't return any records") + } + t.observeMetrics(len(recsOut), start) + + // Mark records in the batch as processed, filtered or errored. + // We do this a bit smarter, by collecting ranges of records that are + // processed, filtered or errored, and then marking them in one go. + + from := 0 // Start of the current range of records with the same type + rangeType := 0 // 0 = SingleRecord, 1 = FilterRecord, 2 = ErrorRecord + + for i, rec := range recsOut { + var currentType int + switch rec.(type) { + case sdk.SingleRecord: + currentType = 0 + case sdk.FilterRecord: + currentType = 1 + case sdk.ErrorRecord: + currentType = 2 + default: + err := cerrors.Errorf("processor returned unknown record type: %T", rec) + return cerrors.FatalError(err) + } + + if currentType == rangeType { + continue + } + + t.markBatchRecords(b, from, recsOut[from:i]) + from, rangeType = i, currentType + } + + // Mark the last range of records. + t.markBatchRecords(b, from, recsOut[from:]) + + if len(recsIn) > len(recsOut) { + // Processor skipped some records, mark them to be retried. + b.Retry(len(recsOut), len(recsIn)) + } + + return nil +} + +// markBatchRecords marks a range of records in a batch as processed, filtered or +// errored, based on the type of records returned by the processor. The worker +// can then use this information to continue processing the batch. +func (t *ProcessorTask) markBatchRecords(b *Batch, from int, records []sdk.ProcessedRecord) { + if len(records) == 0 { + return // This can happen if the first record is not a SingleRecord. + } + switch records[0].(type) { + case sdk.SingleRecord: + recs := make([]opencdc.Record, len(records)) + for i, rec := range records { + recs[i] = opencdc.Record(rec.(sdk.SingleRecord)) + } + b.SetRecords(from, recs) + case sdk.FilterRecord: + b.Filter(from, len(records)) + case sdk.ErrorRecord: + errs := make([]error, len(records)) + for i, rec := range records { + errs[i] = rec.(sdk.ErrorRecord).Error + } + b.Nack(from, errs...) + } +} + +func (t *ProcessorTask) observeMetrics(n int, start time.Time) { + tookPerRecord := time.Since(start) / time.Duration(n) + go func() { + for range n { + t.timer.Update(tookPerRecord) + } + }() +} diff --git a/pkg/lifecycle-poc/funnel/recordflag_string.go b/pkg/lifecycle-poc/funnel/recordflag_string.go new file mode 100644 index 000000000..f183e9d24 --- /dev/null +++ b/pkg/lifecycle-poc/funnel/recordflag_string.go @@ -0,0 +1,26 @@ +// Code generated by "stringer -type=RecordFlag -linecomment"; DO NOT EDIT. + +package funnel + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[RecordFlagAck-0] + _ = x[RecordFlagNack-1] + _ = x[RecordFlagRetry-2] + _ = x[RecordFlagFilter-3] +} + +const _RecordFlag_name = "acknackretryfilter" + +var _RecordFlag_index = [...]uint8{0, 3, 7, 12, 18} + +func (i RecordFlag) String() string { + if i < 0 || i >= RecordFlag(len(_RecordFlag_index)-1) { + return "RecordFlag(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _RecordFlag_name[_RecordFlag_index[i]:_RecordFlag_index[i+1]] +} diff --git a/pkg/lifecycle-poc/funnel/source.go b/pkg/lifecycle-poc/funnel/source.go new file mode 100644 index 000000000..13ff88a5b --- /dev/null +++ b/pkg/lifecycle-poc/funnel/source.go @@ -0,0 +1,121 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:generate mockgen -typed -destination=mock/source.go -package=mock -mock_names=Source=Source . Source + +package funnel + +import ( + "context" + "time" + + "github.com/conduitio/conduit-commons/opencdc" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/foundation/metrics" +) + +type SourceTask struct { + id string + source Source + logger log.CtxLogger + + timer metrics.Timer + histogram metrics.RecordBytesHistogram +} + +type Source interface { + ID() string + Open(context.Context) error + Read(context.Context) ([]opencdc.Record, error) + Ack(context.Context, []opencdc.Position) error + Teardown(context.Context) error + // TODO figure out if we want to handle these errors. This returns errors + // coming from the persister, which persists the connector asynchronously. + // Are we even interested in these errors in the pipeline? Sounds like + // something we could surface and handle globally in the runtime instead. + Errors() <-chan error +} + +func NewSourceTask( + id string, + source Source, + logger log.CtxLogger, + timer metrics.Timer, + histogram metrics.Histogram, +) *SourceTask { + logger = logger.WithComponent("task:source") + logger.Logger = logger.With().Str(log.ConnectorIDField, id).Logger() + return &SourceTask{ + id: id, + source: source, + logger: logger, + timer: timer, + histogram: metrics.NewRecordBytesHistogram(histogram), + } +} + +func (t *SourceTask) ID() string { + return t.id +} + +func (t *SourceTask) Open(ctx context.Context) error { + t.logger.Debug(ctx).Msg("opening source") + err := t.source.Open(ctx) + if err != nil { + return cerrors.Errorf("failed to open source connector: %w", err) + } + t.logger.Debug(ctx).Msg("source open") + return nil +} + +func (t *SourceTask) Close(context.Context) error { + // source is torn down in the worker on stop + return nil +} + +func (t *SourceTask) Do(ctx context.Context, b *Batch) error { + start := time.Now() + + recs, err := t.source.Read(ctx) + if err != nil { + return cerrors.Errorf("failed to read from source: %w", err) + } + + t.observeMetrics(recs, start) + + // Overwrite the batch with the new records. + *b = *NewBatch(recs) + return nil +} + +func (t *SourceTask) observeMetrics(records []opencdc.Record, start time.Time) { + // Precalculate sizes so that we don't need to hold a reference to records + // and observations can happen in a goroutine. + sizes := make([]float64, len(records)) + for i, rec := range records { + sizes[i] = t.histogram.SizeOf(rec) + } + tookPerRecord := time.Since(start) / time.Duration(len(sizes)) + go func() { + for i := range len(sizes) { + t.timer.Update(tookPerRecord) + t.histogram.H.Observe(sizes[i]) + } + }() +} + +func (t *SourceTask) GetSource() Source { + return t.source +} diff --git a/pkg/lifecycle-poc/funnel/worker.go b/pkg/lifecycle-poc/funnel/worker.go new file mode 100644 index 000000000..08fb5bd24 --- /dev/null +++ b/pkg/lifecycle-poc/funnel/worker.go @@ -0,0 +1,576 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package funnel + +import ( + "context" + "sync/atomic" + "time" + + "github.com/conduitio/conduit-commons/opencdc" + "github.com/conduitio/conduit-commons/rollback" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/foundation/metrics" + "github.com/conduitio/conduit/pkg/plugin" + "github.com/sourcegraph/conc/pool" +) + +// Task is a unit of work that can be executed by a Worker. Each Task in a +// pipeline is executed sequentially, except for tasks related to different +// destinations, which can be executed in parallel. +type Task interface { + // ID returns the identifier of this Task. Each Task in a pipeline must be + // uniquely identified by the ID. + ID() string + + // Open opens the Task for processing. It is called once before the worker + // starts processing records. + Open(context.Context) error + // Close closes the Task. It is called once after the worker has stopped + // processing records. + Close(context.Context) error + // Do processes the given batch of records. It is called for each batch of + // records that the worker processes. + Do(context.Context, *Batch) error +} + +// Worker collects the tasks that need to be executed in a pipeline for a +// specific source. It processes records from the source through the tasks until +// it is stopped. The worker is responsible for coordinating tasks and +// acking/nacking records. +// +// Batches are processed in the following way: +// - The first task is always a source task which reads a batch of records +// from the source. The batch is then passed to the next task. +// - Any task between the source and the destination can process the batch by +// updating the records or their status (see [RecordStatus]). If a record in +// the batch is marked as filtered, the next task will skip processing it +// and consider it as already processed. If a record is marked as nacked, +// the record will be sent to the DLQ. If a record is marked as retry, the +// record will be reprocessed by the same task (relevant if a task processed +// only part of the batch, experienced an error and skipped the rest). +// - The last task is always a destination task which writes the batch of +// records to the destination. The batch is then acked. +// +// Note that if a task marks a record in the middle of a batch as nacked, the +// batch is split into sub-batches. The records that were successfully processed +// continue to the next task (and ideally to the end of the pipeline), because +// Conduit provides ordering guarantees. Only once the records before the nacked +// record are end-to-end processed, will the nacked record be sent to the DLQ. +// The rest of the records are processed as a sub-batch, and the same rules +// apply to them. +type Worker struct { + Source Source + Tasks []Task + // Order defines the next task to be executed. Multiple indices are used to + // show parallel execution of tasks. + // + // Example: + // [[1], [2], [3,5], [4], [], []] + // + // /-> 3 -> 4 + // 0 -> 1 -> 2 + // \-> 5 + Order Order + DLQ *DLQ + + lastReadAt time.Time + timer metrics.Timer + + // processingLock is a lock in form of a channel with a buffer size of 1 to + // be able to acquire the lock with a context timeout. + processingLock chan struct{} + // stop stores the information if a graceful stop was triggered. + stop atomic.Bool + + logger log.CtxLogger +} + +func NewWorker( + tasks []Task, + order Order, + dlq *DLQ, + logger log.CtxLogger, + timer metrics.Timer, +) (*Worker, error) { + err := validateTaskOrder(tasks, order) + if err != nil { + return nil, cerrors.Errorf("invalid task order: %w", err) + } + + st, ok := tasks[0].(interface{ GetSource() Source }) + if !ok { + return nil, cerrors.Errorf("first task must be a source task, got %T", tasks[0]) + } + + return &Worker{ + Source: st.GetSource(), + Tasks: tasks, + Order: order, + DLQ: dlq, + logger: logger.WithComponent("funnel.Worker"), + timer: timer, + + processingLock: make(chan struct{}, 1), + }, nil +} + +func validateTaskOrder(tasks []Task, order Order) error { + // Traverse the tasks according to the order and validate that each task + // is included exactly once. + if len(order) != len(tasks) { + return cerrors.Errorf("order length (%d) does not match tasks length (%d)", len(order), len(tasks)) + } + seenCount := make([]int, len(tasks)) + var traverse func(i int) error + traverse = func(i int) error { + if i < 0 || i >= len(tasks) { + return cerrors.Errorf("invalid index (%d), expected a number between 0 and %d", i, len(tasks)-1) + } + seenCount[i]++ + if seenCount[i] > 1 { + return cerrors.Errorf("task %d included multiple times in order", i) + } + for _, nextIdx := range order[i] { + if nextIdx == i { + return cerrors.Errorf("task %d cannot call itself as next task", i) + } + err := traverse(nextIdx) + if err != nil { + return err + } + } + return nil + } + err := traverse(0) + if err != nil { + return err + } + for i, count := range seenCount { + if count == 0 { + return cerrors.Errorf("task %d not included in order", i) + } + } + return nil +} + +// Open opens the worker for processing. It opens all tasks and the DLQ. If any +// task fails to open, the worker is not opened and the error is returned. +// Once a worker is opened, it can start processing records. The worker should +// be closed using Close after it is no longer needed. +func (w *Worker) Open(ctx context.Context) (err error) { + var r rollback.R + defer func() { + rollbackErr := r.Execute() + err = cerrors.LogOrReplace(err, rollbackErr, func() { + w.logger.Err(ctx, rollbackErr).Msg("failed to execute rollback") + }) + }() + + for _, task := range w.Tasks { + err = task.Open(ctx) + if err != nil { + return cerrors.Errorf("task %s failed to open: %w", task.ID(), err) + } + + r.Append(func() error { + return task.Close(ctx) + }) + } + + err = w.DLQ.Open(ctx) + if err != nil { + return cerrors.Errorf("failed to open DLQ: %w", err) + } + + r.Skip() + return nil +} + +// Stop stops the worker from processing more records. It does not stop the +// current batch from being processed. If a batch is currently being processed, +// the method will block and trigger the stop after the batch is processed. +func (w *Worker) Stop(ctx context.Context) error { + // The lock is locked every time a batch is being processed. We lock it + // to be sure no batch is currently being processed. + release, err := w.acquireProcessingLock(ctx) + if err != nil { + return err + } + defer release() + + // Lock acquired, teardown the source and set stop to true to signal the + // worker it should stop processing, since it won't be able to deliver + // any acks. + err = w.Source.Teardown(ctx) + if err != nil { + return cerrors.Errorf("failed to tear down source: %w", err) + } + w.stop.Store(true) + return nil +} + +// acquireProcessingLock tries to acquire the processing lock. It returns a +// release function that should be called to release the lock. If the context is +// canceled before the lock is acquired, it returns the context error. +func (w *Worker) acquireProcessingLock(ctx context.Context) (release func(), err error) { + select { + case w.processingLock <- struct{}{}: + return func() { <-w.processingLock }, nil + case <-ctx.Done(): + // lock not acquired + return func() {}, ctx.Err() + } +} + +func (w *Worker) Close(ctx context.Context) error { + var errs []error + + for _, task := range w.Tasks { + err := task.Close(ctx) + if err != nil { + errs = append(errs, cerrors.Errorf("task %s failed to close: %w", task.ID(), err)) + } + } + + err := w.DLQ.Close(ctx) + if err != nil { + errs = append(errs, cerrors.Errorf("failed to close DLQ: %w", err)) + } + + return cerrors.Join(errs...) +} + +// Do processes records from the source until the worker is stopped. It returns +// no error if the worker is stopped gracefully. +func (w *Worker) Do(ctx context.Context) error { + for !w.stop.Load() { + w.logger.Trace(ctx).Msg("starting next batch") + if err := w.doTask(ctx, 0, &Batch{}, w); err != nil { + return err + } + w.logger.Trace(ctx).Msg("batch done") + } + return nil +} + +//nolint:gocyclo // TODO: refactor +func (w *Worker) doTask(ctx context.Context, currentIndex int, b *Batch, acker ackNacker) error { + t := w.Tasks[currentIndex] + + w.logger.Trace(ctx). + Str("task_id", t.ID()). + Int("batch_size", len(b.records)). + Msg("executing task") + + err := t.Do(ctx, b) + + w.logger.Trace(ctx). + Err(err). + Str("task_id", t.ID()). + Int("batch_size", len(b.records)). + Msg("task done") + + if err != nil { + // Canceled error can be returned if the worker is stopped while reading + // the next batch from the source (graceful stop). + // ErrPluginNotRunning can be returned if the plugin is stopped before + // trying to read the next batch. + // Both are considered as graceful stop, just return the context error, if any. + if currentIndex == 0 && (cerrors.Is(err, context.Canceled) || + (cerrors.Is(err, plugin.ErrPluginNotRunning) && w.stop.Load())) { + return ctx.Err() + } + return cerrors.Errorf("task %s: %w", t.ID(), err) + } + + if currentIndex == 0 { + // The first task has some specifics: + // - Store last time we read a batch from the source for metrics. + // - It locks the stop lock, so that no stop signal can be received while + // the batch is being processed. + // - It checks if the source was torn down after receiving the batch and + // before acquiring the lock. + w.lastReadAt = time.Now() + + release, err := w.acquireProcessingLock(ctx) + if err != nil { + return err + } + // Unlock after the batch is end-to-end processed. + defer release() + + if w.stop.Load() { + // The source was already torn down, we won't be able to deliver + // any acks so throw away the batch and gracefully return. + w.logger.Warn(ctx). + Str("task_id", t.ID()). + Int("batch_size", len(b.records)). + Msg("stop signal received just before starting to process next batch, gracefully stopping without flushing the batch") + return nil + } + } + + if !b.tainted { + w.logger.Trace(ctx). + Str("task_id", t.ID()). + Msg("task returned clean batch") + + // Shortcut. + if !w.hasNextTask(currentIndex) { + // This is the last task, the batch has made it end-to-end, let's ack! + return acker.Ack(ctx, b) + } + // There is at least one task after this one, let's continue. + return w.nextTask(ctx, currentIndex, b, acker) + } + + w.logger.Trace(ctx). + Str("task_id", t.ID()). + Msg("task returned tainted batch, splitting into sub-batches") + + // Batch is tainted, we need to go through all statuses and group them by + // status before further processing. + idx := 0 + for { + subBatch := w.subBatchByFlag(b, idx) + if subBatch == nil { + w.logger.Trace(ctx).Msg("processed last batch") + break + } + + w.logger.Trace(ctx). + Str("task_id", t.ID()). + Int("batch_size", len(b.records)). + Str("record_flag", b.recordStatuses[0].Flag.String()). + Msg("collected sub-batch") + + switch subBatch.recordStatuses[0].Flag { + case RecordFlagAck, RecordFlagFilter: + if !w.hasNextTask(currentIndex) { + // This is the last task, the batch has made it end-to-end, let's ack! + // We need to ack all the records in the batch, not only active + // ones, filtered ones should also be acked. + err := acker.Ack(ctx, subBatch) + if err != nil { + return err + } + break // break switch + } + // There is at least one task after this one, let's continue. + err := w.nextTask(ctx, currentIndex, subBatch, acker) + if err != nil { + return err + } + case RecordFlagNack: + err := acker.Nack(ctx, subBatch, t.ID()) + if err != nil { + return err + } + case RecordFlagRetry: + err := w.doTask(ctx, currentIndex, subBatch, acker) + if err != nil { + return err + } + } + + idx += len(subBatch.positions) + } + + return nil +} + +// subBatchByFlag collects a sub-batch of records with the same status starting +// from the given index. It returns nil if firstIndex is out of bounds. +func (w *Worker) subBatchByFlag(b *Batch, firstIndex int) *Batch { + if firstIndex >= len(b.recordStatuses) { + return nil + } + + flags := make([]RecordFlag, 0, 2) + flags = append(flags, b.recordStatuses[firstIndex].Flag) + // Collect Filters and Acks together in the same batch. + if flags[0] == RecordFlagFilter { + flags = append(flags, RecordFlagAck) + } else if flags[0] == RecordFlagAck { + flags = append(flags, RecordFlagFilter) + } + + lastIndex := firstIndex +OUTER: + for _, status := range b.recordStatuses[firstIndex:] { + for _, f := range flags { + if status.Flag == f { + lastIndex++ + // Record has matching status, let's continue. + continue OUTER + } + } + // Record has a different status, we're done. + break + } + + return b.sub(firstIndex, lastIndex) +} + +func (w *Worker) hasNextTask(currentIndex int) bool { + return len(w.Order[currentIndex]) > 0 +} + +func (w *Worker) nextTask(ctx context.Context, currentIndex int, b *Batch, acker ackNacker) error { + nextIndices := w.Order[currentIndex] + switch len(nextIndices) { + case 0: + // no next task, we're done + return nil + case 1: + // single next task, let's pass the batch to it + return w.doTask(ctx, nextIndices[0], b, acker) + default: + // TODO(multi-connector): remove error + return cerrors.Errorf("multiple next tasks not supported yet") + + // multiple next tasks, let's clone the batch and pass it to them + // concurrently + //nolint:govet // TODO implement multi ack nacker + multiAcker := newMultiAckNacker(acker, len(nextIndices)) + p := pool.New().WithErrors() // TODO WithContext? + for _, i := range nextIndices { + b := b.clone() + p.Go(func() error { + return w.doTask(ctx, i, b, multiAcker) + }) + } + err := p.Wait() + if err != nil { + return err // no need to wrap, it already contains the task ID + } + + // TODO merge batch statuses? + return nil + } +} + +func (w *Worker) Ack(ctx context.Context, batch *Batch) error { + err := w.Source.Ack(ctx, batch.positions) + if err != nil { + return cerrors.Errorf("failed to ack %d records in source: %w", len(batch.records), err) + } + + w.DLQ.Ack(ctx, batch) + w.updateTimer(batch.records) + return nil +} + +func (w *Worker) Nack(ctx context.Context, batch *Batch, taskID string) error { + n, err := w.DLQ.Nack(ctx, batch, taskID) + if n > 0 { + // Successfully nacked n records, let's ack them, as they reached + // the end of the pipeline (in this case the DLQ). + err := w.Source.Ack(ctx, batch.positions[:n]) + if err != nil { + return cerrors.Errorf("task %s failed to ack %d records in source: %w", n, err) + } + + w.updateTimer(batch.records[:n]) + } + + if err != nil { + return cerrors.Errorf("failed to nack %d records: %w", len(batch.records)-n, err) + } + return nil +} + +func (w *Worker) updateTimer(records []opencdc.Record) { + for _, rec := range records { + readAt, err := rec.Metadata.GetReadAt() + if err != nil { + // If the record metadata has changed and does not include ReadAt + // fallback to the time the worker received the record. + readAt = w.lastReadAt + } + w.timer.UpdateSince(readAt) + } +} + +// Order represents the order of tasks in a pipeline. Each index in the slice +// represents a task, and the value at that index is a slice of indices of the +// next tasks to be executed. If the slice is empty, the task is the last one in +// the pipeline. +type Order [][]int + +// AppendSingle appends a single element to the current order. +func (o Order) AppendSingle(next []int) Order { + if len(o) == 0 { + return Order{next} + } + o[len(o)-1] = append(o[len(o)-1], len(o)) + return append(o, next) +} + +// AppendOrder appends the next order to the current order. The next order indices +// are adjusted to match the new order length. +func (o Order) AppendOrder(next Order) Order { + if len(o) == 0 { + return next + } else if len(next) == 0 { + return o + } + + next.Increase(len(o)) + o[len(o)-1] = append(o[len(o)-1], len(o)) + return append(o, next...) +} + +// Increase increases all indices in the order by the given increment. +func (o Order) Increase(incr int) Order { + for _, v := range o { + for i := range v { + v[i] += incr + } + } + return o +} + +type ackNacker interface { + Ack(context.Context, *Batch) error + Nack(context.Context, *Batch, string) error +} + +// multiAckNacker is an ackNacker that expects multiple acks/nacks for the same +// batch. It keeps track of the number of acks/nacks and only acks/nacks the +// batch when all expected acks/nacks are received. +type multiAckNacker struct { + parent ackNacker + count *atomic.Int32 +} + +func newMultiAckNacker(parent ackNacker, count int) *multiAckNacker { + c := atomic.Int32{} + c.Add(int32(count)) //nolint:gosec // no risk of overflow + return &multiAckNacker{ + parent: parent, + count: &c, + } +} + +func (m *multiAckNacker) Ack(ctx context.Context, batch *Batch) error { + panic("not implemented") +} + +func (m *multiAckNacker) Nack(ctx context.Context, batch *Batch, taskID string) error { + panic("not implemented") +} diff --git a/pkg/lifecycle-poc/service.go b/pkg/lifecycle-poc/service.go new file mode 100644 index 000000000..f386625fd --- /dev/null +++ b/pkg/lifecycle-poc/service.go @@ -0,0 +1,668 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package lifecycle contains the logic to manage the lifecycle of pipelines. +// It is responsible for starting, stopping and managing pipelines. +package lifecycle + +import ( + "context" + "fmt" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/conduitio/conduit-commons/csync" + "github.com/conduitio/conduit/pkg/connector" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/foundation/metrics/measure" + "github.com/conduitio/conduit/pkg/lifecycle-poc/funnel" + "github.com/conduitio/conduit/pkg/pipeline" + connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector" + "github.com/conduitio/conduit/pkg/processor" + "gopkg.in/tomb.v2" +) + +type FailureEvent struct { + // ID is the ID of the pipeline which failed. + ID string + Error error +} + +type FailureHandler func(FailureEvent) + +// Service manages pipelines. +type Service struct { + logger log.CtxLogger + + pipelines PipelineService + connectors ConnectorService + + processors ProcessorService + connectorPlugins ConnectorPluginService + + handlers []FailureHandler + runningPipelines *csync.Map[string, *runnablePipeline] + + isGracefulShutdown atomic.Bool +} + +// NewService initializes and returns a lifecycle.Service. +func NewService( + logger log.CtxLogger, + connectors ConnectorService, + processors ProcessorService, + connectorPlugins ConnectorPluginService, + pipelines PipelineService, +) *Service { + return &Service{ + logger: logger.WithComponent("lifecycle.Service"), + connectors: connectors, + processors: processors, + connectorPlugins: connectorPlugins, + pipelines: pipelines, + runningPipelines: csync.NewMap[string, *runnablePipeline](), + } +} + +type runnablePipeline struct { + pipeline *pipeline.Instance + w *funnel.Worker + t *tomb.Tomb +} + +// ConnectorService can fetch and create a connector instance. +type ConnectorService interface { + Get(ctx context.Context, id string) (*connector.Instance, error) + Create(ctx context.Context, id string, t connector.Type, plugin string, pipelineID string, cfg connector.Config, p connector.ProvisionType) (*connector.Instance, error) +} + +// ProcessorService can fetch a processor instance and make a runnable processor from it. +type ProcessorService interface { + Get(ctx context.Context, id string) (*processor.Instance, error) + MakeRunnableProcessor(ctx context.Context, i *processor.Instance) (*processor.RunnableProcessor, error) +} + +// ConnectorPluginService can create a connector plugin dispenser. +type ConnectorPluginService interface { + NewDispenser(logger log.CtxLogger, name string, connectorID string) (connectorPlugin.Dispenser, error) +} + +// PipelineService can fetch, list and update the status of a pipeline instance. +type PipelineService interface { + Get(ctx context.Context, pipelineID string) (*pipeline.Instance, error) + List(ctx context.Context) map[string]*pipeline.Instance + UpdateStatus(ctx context.Context, pipelineID string, status pipeline.Status, errMsg string) error +} + +// OnFailure registers a handler for a lifecycle.FailureEvent. +// Only errors which happen after a pipeline has been started +// are being sent. +func (s *Service) OnFailure(handler FailureHandler) { + s.handlers = append(s.handlers, handler) +} + +// Init starts all pipelines that have the StatusSystemStopped. +func (s *Service) Init( + ctx context.Context, +) error { + var errs []error + s.logger.Debug(ctx).Msg("initializing pipelines statuses") + + instances := s.pipelines.List(ctx) + for _, instance := range instances { + if instance.GetStatus() == pipeline.StatusSystemStopped { + err := s.Start(ctx, instance.ID) + if err != nil { + // try to start remaining pipelines and gather errors + errs = append(errs, err) + } + } + } + + return cerrors.Join(errs...) +} + +// Start builds and starts a pipeline with the given ID. +// If the pipeline is already running, Start returns ErrPipelineRunning. +func (s *Service) Start( + ctx context.Context, + pipelineID string, +) error { + pl, err := s.pipelines.Get(ctx, pipelineID) + if err != nil { + return err + } + + if pl.GetStatus() == pipeline.StatusRunning { + return cerrors.Errorf("can't start pipeline %s: %w", pl.ID, pipeline.ErrPipelineRunning) + } + + s.logger.Debug(ctx).Str(log.PipelineIDField, pl.ID).Msg("starting pipeline") + s.logger.Trace(ctx).Str(log.PipelineIDField, pl.ID).Msg("building tasks") + + rp, err := s.buildRunnablePipeline(ctx, pl) + if err != nil { + return cerrors.Errorf("could not build tasks for pipeline %s: %w", pl.ID, err) + } + + s.logger.Trace(ctx).Str(log.PipelineIDField, pl.ID).Msg("running pipeline") + + if err := s.runPipeline(rp); err != nil { + return cerrors.Errorf("failed to run pipeline %s: %w", pl.ID, err) + } + s.logger.Info(ctx).Str(log.PipelineIDField, pl.ID).Msg("pipeline started") + + s.runningPipelines.Set(pl.ID, rp) + + return nil +} + +// Stop will attempt to gracefully stop a given pipeline by calling each worker's +// Stop method. If the force flag is set to true, the pipeline will be stopped +// forcefully by cancelling the context. +func (s *Service) Stop(ctx context.Context, pipelineID string, force bool) error { + rp, ok := s.runningPipelines.Get(pipelineID) + + if !ok { + return cerrors.Errorf("pipeline %s is not running: %w", pipelineID, pipeline.ErrPipelineNotRunning) + } + + if rp.pipeline.GetStatus() != pipeline.StatusRunning && rp.pipeline.GetStatus() != pipeline.StatusRecovering { + return cerrors.Errorf("can't stop pipeline with status %q: %w", rp.pipeline.GetStatus(), pipeline.ErrPipelineNotRunning) + } + + return s.stopRunnablePipeline(ctx, rp, force) +} + +// StopAll will ask all the running pipelines to stop gracefully +// (i.e. that existing messages get processed but not new messages get produced). +func (s *Service) StopAll(ctx context.Context, force bool) error { + // Set graceful shutdown flag to true, so pipelines know the system triggered the stop. + s.isGracefulShutdown.Store(true) + + l := s.runningPipelines.Len() + if l == 0 { + return nil + } + + switch force { + case false: + s.logger.Info(ctx).Msgf("stopping %d pipelines gracefully", l) + case true: + s.logger.Info(ctx).Msgf("stopping %d pipelines forcefully", l) + } + + var errs []error + for _, rp := range s.runningPipelines.All() { + if rp.pipeline.GetStatus() != pipeline.StatusRunning && rp.pipeline.GetStatus() != pipeline.StatusRecovering { + continue + } + errs = append(errs, s.stopRunnablePipeline(ctx, rp, force)) + } + return cerrors.Join(errs...) +} + +func (s *Service) stopRunnablePipeline(ctx context.Context, rp *runnablePipeline, force bool) error { + switch force { + case false: + s.logger.Info(ctx). + Str(log.PipelineIDField, rp.pipeline.ID). + Any(log.PipelineStatusField, rp.pipeline.GetStatus()). + Msg("gracefully stopping pipeline") + return rp.w.Stop(ctx) + case true: + s.logger.Info(ctx). + Str(log.PipelineIDField, rp.pipeline.ID). + Any(log.PipelineStatusField, rp.pipeline.GetStatus()). + Msg("force stopping pipeline") + rp.t.Kill(pipeline.ErrForceStop) + return nil + } + panic("unreachable") +} + +// Wait blocks until all pipelines are stopped or until the timeout is reached. +// Returns: +// +// (1) nil if all the pipelines are gracefully stopped, +// +// (2) an error, if the pipelines could not have been gracefully stopped, +// +// (3) context.DeadlineExceeded if the pipelines were not stopped within the given timeout. +func (s *Service) Wait(timeout time.Duration) error { + gracefullyStopped := make(chan struct{}) + var err error + go func() { + defer close(gracefullyStopped) + err = s.waitInternal() + }() + + select { + case <-gracefullyStopped: + return err + case <-time.After(timeout): + return context.DeadlineExceeded + } +} + +// waitInternal blocks until all pipelines are stopped and returns an error if any of +// the pipelines failed to stop gracefully. +func (s *Service) waitInternal() error { + var errs []error + + // copy pipelines to keep the map unlocked while we iterate it + pipelines := s.runningPipelines.Copy() + + for _, rp := range pipelines.All() { + if rp.t == nil { + continue + } + err := rp.t.Wait() + if err != nil { + errs = append(errs, cerrors.Errorf("pipeline %s: %w", rp.pipeline.ID, err)) + } + } + return cerrors.Join(errs...) +} + +// WaitPipeline blocks until the pipeline with the given ID is stopped. +func (s *Service) WaitPipeline(id string) error { + p, ok := s.runningPipelines.Get(id) + if !ok || p.t == nil { + return nil + } + return p.t.Wait() +} + +// buildRunnablePipeline will build and connect all tasks configured in the pipeline. +func (s *Service) buildRunnablePipeline( + ctx context.Context, + pl *pipeline.Instance, +) (*runnablePipeline, error) { + pipelineLogger := s.logger + pipelineLogger.Logger = pipelineLogger.Logger.With().Str(log.PipelineIDField, pl.ID).Logger() + + srcTasks, srcOrder, err := s.buildSourceTasks(ctx, pl, pipelineLogger) + if err != nil { + return nil, cerrors.Errorf("failed to build source tasks: %w", err) + } + if len(srcTasks) == 0 { + return nil, cerrors.New("can't build pipeline without any source connectors") + } + + destTasks, destOrder, err := s.buildDestinationTasks(ctx, pl, pipelineLogger) + if err != nil { + return nil, cerrors.Errorf("failed to build destination tasks: %w", err) + } + if len(destTasks) == 0 { + return nil, cerrors.New("can't build pipeline without any destination connectors") + } + + procTasks, procOrder, err := s.buildProcessorTasks(ctx, pl, pl.ProcessorIDs, pipelineLogger) + if err != nil { + return nil, cerrors.Errorf("failed to build pipeline processor tasks: %w", err) + } + + dlq, err := s.buildDLQ(ctx, pl, pipelineLogger) + if err != nil { + return nil, cerrors.Errorf("failed to build DLQ: %w", err) + } + + tasks, order := s.combineTasksAndOrders(srcTasks, destTasks, procTasks, srcOrder, destOrder, procOrder) + + // log the tasks and order for debugging purposes + taskTypes := make([]string, len(tasks)) + for i, task := range tasks { + taskTypes[i] = fmt.Sprintf("%s(%T)", task.ID(), task) + } + pipelineLogger.Info(ctx).Any("tasks", taskTypes).Any("order", order).Msg("pipeline tasks and order") + + worker, err := funnel.NewWorker( + tasks, + order, + dlq, + pipelineLogger, + measure.PipelineExecutionDurationTimer.WithValues(pl.Config.Name), + ) + if err != nil { + return nil, cerrors.Errorf("failed to create worker: %w", err) + } + return &runnablePipeline{ + pipeline: pl, + w: worker, + }, nil +} + +func (s *Service) combineTasksAndOrders( + srcTasks, destTasks, procTasks []funnel.Task, + srcOrder, destOrder, procOrder funnel.Order, +) ([]funnel.Task, funnel.Order) { + tasks := make([]funnel.Task, 0, len(srcTasks)+len(procTasks)+len(destTasks)) + tasks = append(tasks, srcTasks...) + tasks = append(tasks, procTasks...) + tasks = append(tasks, destTasks...) + + // TODO(multi-connector): when we have multiple connectors this will not be as straight forward + order := srcOrder.AppendOrder(procOrder).AppendOrder(destOrder) + return tasks, order +} + +func (s *Service) buildSourceTasks( + ctx context.Context, + pl *pipeline.Instance, + logger log.CtxLogger, +) ([]funnel.Task, funnel.Order, error) { + var tasks []funnel.Task + var order funnel.Order + + for _, connID := range pl.ConnectorIDs { + instance, err := s.connectors.Get(ctx, connID) + if err != nil { + return nil, nil, cerrors.Errorf("could not fetch connector: %w", err) + } + + if instance.Type != connector.TypeSource { + continue // skip any connector that's not a source + } + + if len(tasks) > 1 { + // TODO(multi-connector): remove check + return nil, nil, cerrors.New("pipelines with multiple source connectors currently not supported, please disable the experimental feature flag") + } + + src, err := instance.Connector(ctx, s.connectorPlugins) + if err != nil { + return nil, nil, err + } + + srcTask := funnel.NewSourceTask( + instance.ID, + src.(*connector.Source), + logger, + measure.ConnectorExecutionDurationTimer.WithValues( + pl.Config.Name, + instance.Plugin, + strings.ToLower(instance.Type.String()), + ), + measure.ConnectorBytesHistogram.WithValues( + pl.Config.Name, + instance.Plugin, + strings.ToLower(instance.Type.String()), + ), + ) + + // Add processor tasks + processorTasks, processorOrder, err := s.buildProcessorTasks(ctx, pl, instance.ProcessorIDs, logger) + if err != nil { + return nil, nil, cerrors.Errorf("failed to build source processor tasks: %w", err) + } + + // Adjust order to include new task and the processor order + tasks = append(tasks, srcTask) + tasks = append(tasks, processorTasks...) + + order = append(order, nil) // Add new task to order without attaching to previous tasks + order = order.AppendOrder(processorOrder) + } + + return tasks, order, nil +} + +func (s *Service) buildDestinationTasks( + ctx context.Context, + pl *pipeline.Instance, + logger log.CtxLogger, +) ([]funnel.Task, funnel.Order, error) { + var tasks []funnel.Task + var order funnel.Order + + for _, connID := range pl.ConnectorIDs { + instance, err := s.connectors.Get(ctx, connID) + if err != nil { + return nil, nil, cerrors.Errorf("could not fetch connector: %w", err) + } + + if instance.Type != connector.TypeDestination { + continue // skip any connector that's not a destination + } + + if len(tasks) > 1 { + // TODO(multi-connector): remove check + return nil, nil, cerrors.New("pipelines with multiple destination connectors currently not supported, please disable the experimental feature flag") + } + + dest, err := instance.Connector(ctx, s.connectorPlugins) + if err != nil { + return nil, nil, err + } + + destTask := funnel.NewDestinationTask( + instance.ID, + dest.(*connector.Destination), + logger, + measure.ConnectorExecutionDurationTimer.WithValues( + pl.Config.Name, + instance.Plugin, + strings.ToLower(instance.Type.String()), + ), + measure.ConnectorBytesHistogram.WithValues( + pl.Config.Name, + instance.Plugin, + strings.ToLower(instance.Type.String()), + ), + ) + + // Add processor tasks + processorTasks, processorOrder, err := s.buildProcessorTasks(ctx, pl, instance.ProcessorIDs, logger) + if err != nil { + return nil, nil, cerrors.Errorf("failed to build destination processor tasks: %w", err) + } + + // Adjust order to include new task and the processor order + tasks = append(tasks, processorTasks...) + tasks = append(tasks, destTask) + + order = append(order, processorOrder.Increase(len(order))...) // Add processor task order without attaching to previous tasks + order = order.AppendSingle(nil) + } + + return tasks, order, nil +} + +func (s *Service) buildProcessorTasks( + ctx context.Context, + pl *pipeline.Instance, + processorIDs []string, + logger log.CtxLogger, +) ([]funnel.Task, funnel.Order, error) { + var tasks []funnel.Task + var order funnel.Order + + for _, procID := range processorIDs { + instance, err := s.processors.Get(ctx, procID) + if err != nil { + return nil, nil, cerrors.Errorf("could not fetch processor: %w", err) + } + + runnableProc, err := s.processors.MakeRunnableProcessor(ctx, instance) + if err != nil { + return nil, nil, err + } + + tasks = append( + tasks, + funnel.NewProcessorTask( + instance.ID, + runnableProc, + logger, + measure.ProcessorExecutionDurationTimer.WithValues(pl.Config.Name, instance.Plugin), + ), + ) + order = order.AppendSingle(nil) + } + + return tasks, order, nil +} + +func (s *Service) buildDLQ( + ctx context.Context, + pl *pipeline.Instance, + logger log.CtxLogger, +) (*funnel.DLQ, error) { + conn, err := s.connectors.Create( + ctx, + pl.ID+"-dlq", + connector.TypeDestination, + pl.DLQ.Plugin, + pl.ID, + connector.Config{ + Name: pl.ID + "-dlq", + Settings: pl.DLQ.Settings, + }, + connector.ProvisionTypeDLQ, // the provision type ensures the connector won't be persisted + ) + if err != nil { + return nil, cerrors.Errorf("failed to create DLQ destination: %w", err) + } + + dest, err := conn.Connector(ctx, s.connectorPlugins) + if err != nil { + return nil, err + } + + return funnel.NewDLQ( + "dlq", + dest.(*connector.Destination), + logger, + measure.DLQExecutionDurationTimer.WithValues(pl.Config.Name, conn.Plugin), + measure.DLQBytesHistogram.WithValues(pl.Config.Name, conn.Plugin), + pl.DLQ.WindowSize, + pl.DLQ.WindowNackThreshold, + ), nil +} + +func (s *Service) runPipeline(rp *runnablePipeline) error { + if rp.t != nil && rp.t.Alive() { + return pipeline.ErrPipelineRunning + } + + // the tomb is responsible for running goroutines related to the pipeline + rp.t = &tomb.Tomb{} + ctx := rp.t.Context(nil) //nolint:staticcheck // this is the correct usage of tomb + + err := rp.w.Open(ctx) + if err != nil { + return cerrors.Errorf("failed to open worker: %w", err) + } + + var workersWg sync.WaitGroup + + // TODO(multi-connector): when we have multiple connectors spawn a worker for each source + workersWg.Add(1) + rp.t.Go(func() error { + defer workersWg.Done() + + doErr := rp.w.Do(ctx) + s.logger.Err(ctx, doErr).Str(log.PipelineIDField, rp.pipeline.ID).Msg("pipeline worker stopped") + + closeErr := rp.w.Close(context.Background()) + err := cerrors.Join(doErr, closeErr) + if err != nil { + return cerrors.Errorf("worker stopped with error: %w", err) + } + + return nil + }) + rp.t.Go(func() error { + // Use fresh context for cleanup function, otherwise the updated status + // will potentially fail to be stored. + ctx := context.Background() + + workersWg.Wait() + err := rp.t.Err() + + switch err { + case tomb.ErrStillAlive: + // not an actual error, the pipeline stopped gracefully + err = nil + var status pipeline.Status + if s.isGracefulShutdown.Load() { + // it was triggered by a graceful shutdown of Conduit + status = pipeline.StatusSystemStopped + } else { + // it was manually triggered by a user + status = pipeline.StatusUserStopped + } + if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, status, ""); err != nil { + return err + } + default: + if cerrors.IsFatalError(err) { + // we use %+v to get the stack trace too + if err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", err)); err != nil { + return err + } + } else { //nolint:staticcheck // TODO: implement recovery + // // try to recover the pipeline + // if recoveryErr := s.recoverPipeline(ctx, rp); recoveryErr != nil { + // s.logger. + // Err(ctx, err). + // Str(log.PipelineIDField, rp.pipeline.ID). + // Msg("pipeline recovery failed") + // + // if updateErr := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", recoveryErr)); updateErr != nil { + // return updateErr + // } + // + // // we assign it to err so it's returned and notified by the cleanup function + // err = recoveryErr + // } else { + // // recovery was triggered didn't error, so no cleanup + // // this is why we return nil to skip the cleanup below. + // return nil + // } + } + } + + s.logger. + Err(ctx, err). + Str(log.PipelineIDField, rp.pipeline.ID). + Msg("pipeline stopped") + + // confirmed that all nodes stopped, we can now remove the pipeline from the running pipelines + s.runningPipelines.Delete(rp.pipeline.ID) + + s.notify(rp.pipeline.ID, err) + return err + }) + + return s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusRunning, "") +} + +// notify notifies all registered FailureHandlers about an error. +func (s *Service) notify(pipelineID string, err error) { + if err == nil { + return + } + e := FailureEvent{ + ID: pipelineID, + Error: err, + } + for _, handler := range s.handlers { + handler(e) + } +} diff --git a/pkg/lifecycle-poc/service_test.go b/pkg/lifecycle-poc/service_test.go new file mode 100644 index 000000000..4c5201248 --- /dev/null +++ b/pkg/lifecycle-poc/service_test.go @@ -0,0 +1,596 @@ +// Copyright © 2022 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lifecycle + +import ( + "context" + "fmt" + "reflect" + "strconv" + "strings" + "testing" + "time" + + "github.com/conduitio/conduit-commons/cchan" + "github.com/conduitio/conduit-commons/database/inmemory" + "github.com/conduitio/conduit-commons/opencdc" + "github.com/conduitio/conduit/pkg/connector" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/lifecycle-poc/funnel" + "github.com/conduitio/conduit/pkg/pipeline" + "github.com/conduitio/conduit/pkg/plugin" + connectorPlugin "github.com/conduitio/conduit/pkg/plugin/connector" + pmock "github.com/conduitio/conduit/pkg/plugin/connector/mock" + "github.com/conduitio/conduit/pkg/processor" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/google/uuid" + "github.com/matryer/is" + "github.com/rs/zerolog" + "go.uber.org/mock/gomock" +) + +const testDLQID = "test-dlq" + +func TestServiceLifecycle_buildRunnablePipeline(t *testing.T) { + is := is.New(t) + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + ctrl := gomock.NewController(t) + logger := log.New(zerolog.Nop()) + db := &inmemory.DB{} + persister := connector.NewPersister(logger, db, time.Second, 3) + + source := dummySource(persister) + destination := dummyDestination(persister) + dlq := dummyDestination(persister) + pl := &pipeline.Instance{ + ID: uuid.NewString(), + Config: pipeline.Config{Name: "test-pipeline"}, + DLQ: pipeline.DLQ{ + Plugin: dlq.Plugin, + Settings: map[string]string{}, + WindowSize: 3, + WindowNackThreshold: 2, + }, + ConnectorIDs: []string{source.ID, destination.ID}, + } + pl.SetStatus(pipeline.StatusUserStopped) + + ls := NewService( + logger, + testConnectorService{ + source.ID: source, + destination.ID: destination, + testDLQID: dlq, + }, + testProcessorService{}, + testConnectorPluginService{ + source.Plugin: pmock.NewDispenser(ctrl), + destination.Plugin: pmock.NewDispenser(ctrl), + dlq.Plugin: pmock.NewDispenser(ctrl), + }, + testPipelineService{}, + ) + + got, err := ls.buildRunnablePipeline( + ctx, + pl, + ) + + is.NoErr(err) + + is.Equal("", cmp.Diff(pl, got.pipeline, cmpopts.IgnoreUnexported(pipeline.Instance{}))) + + wantTasks := []funnel.Task{ + &funnel.SourceTask{}, + &funnel.DestinationTask{}, + } + is.Equal(len(got.w.Tasks), len(wantTasks)) + for i := range got.w.Tasks { + want := wantTasks[i] + got := got.w.Tasks[i] + is.Equal(reflect.TypeOf(want), reflect.TypeOf(got)) // unexpected task type + } + is.Equal(got.w.Order, funnel.Order{{1}, nil}) + is.Equal(got.w.Source.(*connector.Source).Instance, source) +} + +func TestService_buildRunnablePipeline_NoSourceNode(t *testing.T) { + is := is.New(t) + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + ctrl := gomock.NewController(t) + logger := log.New(zerolog.Nop()) + db := &inmemory.DB{} + persister := connector.NewPersister(logger, db, time.Second, 3) + + destination := dummyDestination(persister) + dlq := dummyDestination(persister) + pl := &pipeline.Instance{ + ID: uuid.NewString(), + Config: pipeline.Config{Name: "test-pipeline"}, + DLQ: pipeline.DLQ{ + Plugin: dlq.Plugin, + Settings: map[string]string{}, + WindowSize: 3, + WindowNackThreshold: 2, + }, + ConnectorIDs: []string{destination.ID}, + } + pl.SetStatus(pipeline.StatusUserStopped) + + ls := NewService(logger, + testConnectorService{ + destination.ID: destination, + testDLQID: dlq, + }, + testProcessorService{}, + testConnectorPluginService{ + destination.Plugin: pmock.NewDispenser(ctrl), + dlq.Plugin: pmock.NewDispenser(ctrl), + }, + testPipelineService{}, + ) + + wantErr := "can't build pipeline without any source connectors" + + got, err := ls.buildRunnablePipeline( + ctx, + pl, + ) + + is.True(err != nil) + is.Equal(err.Error(), wantErr) + is.Equal(got, nil) +} + +func TestService_buildRunnablePipeline_NoDestinationNode(t *testing.T) { + is := is.New(t) + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + ctrl := gomock.NewController(t) + logger := log.New(zerolog.Nop()) + db := &inmemory.DB{} + persister := connector.NewPersister(logger, db, time.Second, 3) + + source := dummySource(persister) + dlq := dummyDestination(persister) + + ls := NewService(logger, + testConnectorService{ + source.ID: source, + testDLQID: dlq, + }, + testProcessorService{}, + testConnectorPluginService{ + source.Plugin: pmock.NewDispenser(ctrl), + dlq.Plugin: pmock.NewDispenser(ctrl), + }, + testPipelineService{}, + ) + + wantErr := "can't build pipeline without any destination connectors" + + pl := &pipeline.Instance{ + ID: uuid.NewString(), + Config: pipeline.Config{Name: "test-pipeline"}, + DLQ: pipeline.DLQ{ + Plugin: dlq.Plugin, + Settings: map[string]string{}, + WindowSize: 3, + WindowNackThreshold: 2, + }, + ConnectorIDs: []string{source.ID}, + } + pl.SetStatus(pipeline.StatusUserStopped) + + got, err := ls.buildRunnablePipeline( + ctx, + pl, + ) + + is.True(err != nil) + is.Equal(err.Error(), wantErr) + is.Equal(got, nil) +} + +func TestServiceLifecycle_PipelineSuccess(t *testing.T) { + is := is.New(t) + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + logger := log.New(zerolog.Nop()) + db := &inmemory.DB{} + persister := connector.NewPersister(logger, db, time.Second, 3) + defer persister.Wait() + + ps := pipeline.NewService(logger, db) + + // create a host pipeline + pl, err := ps.Create(ctx, uuid.NewString(), pipeline.Config{Name: "test pipeline"}, pipeline.ProvisionTypeAPI) + is.NoErr(err) + + // create mocked connectors + ctrl := gomock.NewController(t) + wantRecords := generateRecords(10) + source, sourceDispenser := generatorSource(ctrl, persister, wantRecords, nil, false) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, false) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, false) + pl.DLQ.Plugin = dlq.Plugin + + pl, err = ps.AddConnector(ctx, pl.ID, source.ID) + is.NoErr(err) + pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) + is.NoErr(err) + + ls := NewService(logger, + testConnectorService{ + source.ID: source, + destination.ID: destination, + testDLQID: dlq, + }, + testProcessorService{}, + testConnectorPluginService{ + source.Plugin: sourceDispenser, + destination.Plugin: destDispenser, + dlq.Plugin: dlqDispenser, + }, + ps, + ) + + // start the pipeline now that everything is set up + err = ls.Start( + ctx, + pl.ID, + ) + is.NoErr(err) + + // wait for pipeline to finish consuming records from the source + time.Sleep(100 * time.Millisecond) + + is.Equal(pipeline.StatusRunning, pl.GetStatus()) + is.Equal("", pl.Error) + + // stop pipeline before ending test + err = ls.Stop(ctx, pl.ID, false) + is.NoErr(err) + + is.NoErr(ls.WaitPipeline(pl.ID)) +} + +func TestServiceLifecycle_PipelineError(t *testing.T) { + t.Skipf("this test fails, see github.com/ConduitIO/conduit/issues/1659") + + is := is.New(t) + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + logger := log.Test(t) + db := &inmemory.DB{} + persister := connector.NewPersister(logger, db, time.Second, 3) + + ps := pipeline.NewService(logger, db) + + // create a host pipeline + pl, err := ps.Create(ctx, uuid.NewString(), pipeline.Config{Name: "test pipeline"}, pipeline.ProvisionTypeAPI) + is.NoErr(err) + + // create mocked connectors + wantErr := cerrors.New("source connector error") + ctrl := gomock.NewController(t) + wantRecords := generateRecords(10) + source, sourceDispenser := generatorSource(ctrl, persister, wantRecords, wantErr, false) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, false) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, false) + pl.DLQ.Plugin = dlq.Plugin + + pl, err = ps.AddConnector(ctx, pl.ID, source.ID) + is.NoErr(err) + pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) + is.NoErr(err) + + ls := NewService(logger, + testConnectorService{ + source.ID: source, + destination.ID: destination, + testDLQID: dlq, + }, + testProcessorService{}, + testConnectorPluginService{ + source.Plugin: sourceDispenser, + destination.Plugin: destDispenser, + dlq.Plugin: dlqDispenser, + }, + ps, + ) + + events := make(chan FailureEvent, 1) + ls.OnFailure(func(e FailureEvent) { + events <- e + }) + + // start the pipeline now that everything is set up + err = ls.Start( + ctx, + pl.ID, + ) + is.NoErr(err) + + // wait for pipeline to finish + err = ls.WaitPipeline(pl.ID) + is.True(err != nil) + + is.Equal(pipeline.StatusDegraded, pl.GetStatus()) + // pipeline errors contain only string messages, so we can only compare the errors by the messages + t.Log(pl.Error) + + event, eventReceived, err := cchan.Chan[FailureEvent](events).RecvTimeout(ctx, 200*time.Millisecond) + is.NoErr(err) + is.True(eventReceived) + is.Equal(pl.ID, event.ID) + + // These conditions are NOT met + is.True( // expected error message to have "node stopped with error" + strings.Contains(pl.Error, fmt.Sprintf("node %s stopped with error:", source.ID)), + ) + is.True( // expected error message to contain "source connector error" + strings.Contains(pl.Error, wantErr.Error()), + ) + is.True(cerrors.Is(event.Error, wantErr)) +} + +func TestServiceLifecycle_PipelineStop(t *testing.T) { + is := is.New(t) + ctx, killAll := context.WithCancel(context.Background()) + defer killAll() + logger := log.New(zerolog.Nop()) + db := &inmemory.DB{} + persister := connector.NewPersister(logger, db, time.Second, 3) + + ps := pipeline.NewService(logger, db) + + // create a host pipeline + pl, err := ps.Create(ctx, uuid.NewString(), pipeline.Config{Name: "test pipeline"}, pipeline.ProvisionTypeAPI) + is.NoErr(err) + + // create mocked connectors + // source will stop and return ErrGracefulShutdown which should signal to the + // service that everything went well and the pipeline was gracefully shutdown + ctrl := gomock.NewController(t) + wantRecords := generateRecords(10) + source, sourceDispenser := generatorSource(ctrl, persister, wantRecords, nil, false) + destination, destDispenser := asserterDestination(ctrl, persister, wantRecords, false) + dlq, dlqDispenser := asserterDestination(ctrl, persister, nil, false) + pl.DLQ.Plugin = dlq.Plugin + + pl, err = ps.AddConnector(ctx, pl.ID, source.ID) + is.NoErr(err) + pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) + is.NoErr(err) + + ls := NewService(logger, + testConnectorService{ + source.ID: source, + destination.ID: destination, + testDLQID: dlq, + }, + testProcessorService{}, + testConnectorPluginService{ + source.Plugin: sourceDispenser, + destination.Plugin: destDispenser, + dlq.Plugin: dlqDispenser, + }, + ps, + ) + + // start the pipeline now that everything is set up + err = ls.Start( + ctx, + pl.ID, + ) + is.NoErr(err) + + // wait for pipeline to finish consuming records from the source + time.Sleep(100 * time.Millisecond) + err = ls.StopAll(ctx, false) + is.NoErr(err) + + // wait for pipeline to finish + err = ls.WaitPipeline(pl.ID) + is.NoErr(err) + + is.Equal(pipeline.StatusSystemStopped, pl.GetStatus()) + is.Equal("", pl.Error) +} + +func generateRecords(count int) []opencdc.Record { + records := make([]opencdc.Record, count) + for i := 0; i < count; i++ { + records[i] = opencdc.Record{ + Key: opencdc.RawData(uuid.NewString()), + Payload: opencdc.Change{ + Before: opencdc.RawData{}, + After: opencdc.RawData(uuid.NewString()), + }, + Position: opencdc.Position(strconv.Itoa(i)), + } + } + return records +} + +// generatorSource creates a connector source that fills up the returned slice +// with generated records as they are produced. After producing the requested +// number of records it returns wantErr. +func generatorSource( + ctrl *gomock.Controller, + persister *connector.Persister, + records []opencdc.Record, + wantErr error, + stop bool, +) (*connector.Instance, *pmock.Dispenser) { + sourcePluginOptions := []pmock.ConfigurableSourcePluginOption{ + pmock.SourcePluginWithConfigure(), + pmock.SourcePluginWithOpen(), + pmock.SourcePluginWithRun(), + pmock.SourcePluginWithRecords(records, wantErr), + pmock.SourcePluginWithAcks(len(records), wantErr == nil), + pmock.SourcePluginWithTeardown(), + } + + if stop { + sourcePluginOptions = append(sourcePluginOptions, pmock.SourcePluginWithStop()) + } + sourcePlugin := pmock.NewConfigurableSourcePlugin(ctrl, sourcePluginOptions...) + + source := dummySource(persister) + + dispenser := pmock.NewDispenser(ctrl) + dispenser.EXPECT().DispenseSource().Return(sourcePlugin, nil) + + return source, dispenser +} + +// asserterDestination creates a connector destination that checks if the records it gets +// match the expected records. On teardown it also makes sure that it received +// all expected records. +func asserterDestination( + ctrl *gomock.Controller, + persister *connector.Persister, + records []opencdc.Record, + stop bool, +) (*connector.Instance, *pmock.Dispenser) { + destinationPluginOptions := []pmock.ConfigurableDestinationPluginOption{ + pmock.DestinationPluginWithConfigure(), + pmock.DestinationPluginWithOpen(), + pmock.DestinationPluginWithRun(), + pmock.DestinationPluginWithRecords(records), + pmock.DestinationPluginWithTeardown(), + } + + if stop { + var lastPosition opencdc.Position + if len(records) > 0 { + lastPosition = records[len(records)-1].Position + } + destinationPluginOptions = append(destinationPluginOptions, pmock.DestinationPluginWithStop(lastPosition)) + } + + destinationPlugin := pmock.NewConfigurableDestinationPlugin(ctrl, destinationPluginOptions...) + + dest := dummyDestination(persister) + + dispenser := pmock.NewDispenser(ctrl) + dispenser.EXPECT().DispenseDestination().Return(destinationPlugin, nil) + + return dest, dispenser +} + +// dummySource creates a dummy source connector. +func dummySource(persister *connector.Persister) *connector.Instance { + // randomize plugin name in case of multiple sources + testPluginName := "test-source-plugin-" + uuid.NewString() + source := &connector.Instance{ + ID: uuid.NewString(), + Type: connector.TypeSource, + PipelineID: uuid.NewString(), + Plugin: testPluginName, + } + source.Init(log.Nop(), persister) + + return source +} + +// dummyDestination creates a dummy destination connector. +func dummyDestination(persister *connector.Persister) *connector.Instance { + // randomize plugin name in case of multiple destinations + testPluginName := "test-destination-plugin-" + uuid.NewString() + + destination := &connector.Instance{ + ID: uuid.NewString(), + Type: connector.TypeDestination, + PipelineID: uuid.NewString(), + Plugin: testPluginName, + } + destination.Init(log.Nop(), persister) + + return destination +} + +// testConnectorService fulfills the ConnectorService interface. +type testConnectorService map[string]*connector.Instance + +func (s testConnectorService) Get(_ context.Context, id string) (*connector.Instance, error) { + conn, ok := s[id] + if !ok { + return nil, connector.ErrInstanceNotFound + } + return conn, nil +} + +func (s testConnectorService) Create(context.Context, string, connector.Type, string, string, connector.Config, connector.ProvisionType) (*connector.Instance, error) { + return s[testDLQID], nil +} + +// testProcessorService fulfills the ProcessorService interface. +type testProcessorService map[string]*processor.Instance + +func (s testProcessorService) MakeRunnableProcessor(context.Context, *processor.Instance) (*processor.RunnableProcessor, error) { + return nil, cerrors.New("not implemented") +} + +func (s testProcessorService) Get(_ context.Context, id string) (*processor.Instance, error) { + proc, ok := s[id] + if !ok { + return nil, processor.ErrInstanceNotFound + } + return proc, nil +} + +// testConnectorPluginService fulfills the ConnectorPluginService interface. +type testConnectorPluginService map[string]connectorPlugin.Dispenser + +func (s testConnectorPluginService) NewDispenser(_ log.CtxLogger, name string, _ string) (connectorPlugin.Dispenser, error) { + plug, ok := s[name] + if !ok { + return nil, plugin.ErrPluginNotFound + } + return plug, nil +} + +// testPipelineService fulfills the PipelineService interface. +type testPipelineService map[string]*pipeline.Instance + +func (s testPipelineService) Get(_ context.Context, pipelineID string) (*pipeline.Instance, error) { + p, ok := s[pipelineID] + if !ok { + return nil, processor.ErrInstanceNotFound + } + return p, nil +} + +func (s testPipelineService) List(_ context.Context) map[string]*pipeline.Instance { + instances := make(map[string]*pipeline.Instance) + return instances +} + +func (s testPipelineService) UpdateStatus(_ context.Context, pipelineID string, status pipeline.Status, errMsg string) error { + p, ok := s[pipelineID] + if !ok { + return processor.ErrInstanceNotFound + } + p.SetStatus(status) + p.Error = errMsg + return nil +} diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index c56e17942..9e5eacdb2 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -334,7 +334,7 @@ func (s *Service) StopAll(ctx context.Context, reason error) { // // (2) an error, if the pipelines could not have been gracefully stopped, // -// (3) ErrTimeout if the pipelines were not stopped within the given timeout. +// (3) context.DeadlineExceeded if the pipelines were not stopped within the given timeout. func (s *Service) Wait(timeout time.Duration) error { gracefullyStopped := make(chan struct{}) var err error @@ -347,7 +347,7 @@ func (s *Service) Wait(timeout time.Duration) error { case <-gracefullyStopped: return err case <-time.After(timeout): - return pipeline.ErrTimeout + return context.DeadlineExceeded } } diff --git a/pkg/pipeline/errors.go b/pkg/pipeline/errors.go index 5e88c5bfa..7c4c7f5c8 100644 --- a/pkg/pipeline/errors.go +++ b/pkg/pipeline/errors.go @@ -17,7 +17,6 @@ package pipeline import "github.com/conduitio/conduit/pkg/foundation/cerrors" var ( - ErrTimeout = cerrors.New("operation timed out") ErrGracefulShutdown = cerrors.New("graceful shutdown") ErrForceStop = cerrors.New("force stop") ErrPipelineCannotRecover = cerrors.New("pipeline couldn't be recovered") diff --git a/pkg/plugin/connector/builtin/destination.go b/pkg/plugin/connector/builtin/destination.go index d2d07d2a4..c033a78ba 100644 --- a/pkg/plugin/connector/builtin/destination.go +++ b/pkg/plugin/connector/builtin/destination.go @@ -58,13 +58,13 @@ func (d *destinationPluginAdapter) withLogger(ctx context.Context) context.Conte func (d *destinationPluginAdapter) Configure(ctx context.Context, in pconnector.DestinationConfigureRequest) (pconnector.DestinationConfigureResponse, error) { d.logger.Debug(ctx).Any("request", in).Msg("calling Configure") - out, err := runSandbox(d.impl.Configure, d.withLogger(ctx), in.Clone(), d.logger) + out, err := runSandbox(d.impl.Configure, d.withLogger(ctx), in.Clone(), d.logger, "Configure") return out.Clone(), err } func (d *destinationPluginAdapter) Open(ctx context.Context, in pconnector.DestinationOpenRequest) (pconnector.DestinationOpenResponse, error) { d.logger.Debug(ctx).Any("request", in).Msg("calling Open") - out, err := runSandbox(d.impl.Open, d.withLogger(ctx), in.Clone(), d.logger) + out, err := runSandbox(d.impl.Open, d.withLogger(ctx), in.Clone(), d.logger, "Open") return out.Clone(), err } @@ -97,31 +97,31 @@ func (d *destinationPluginAdapter) Run(ctx context.Context, stream pconnector.De func (d *destinationPluginAdapter) Stop(ctx context.Context, in pconnector.DestinationStopRequest) (pconnector.DestinationStopResponse, error) { d.logger.Debug(ctx).Any("request", in).Msg("calling Stop") - out, err := runSandbox(d.impl.Stop, d.withLogger(ctx), in.Clone(), d.logger) + out, err := runSandbox(d.impl.Stop, d.withLogger(ctx), in.Clone(), d.logger, "Stop") return out.Clone(), err } func (d *destinationPluginAdapter) Teardown(ctx context.Context, in pconnector.DestinationTeardownRequest) (pconnector.DestinationTeardownResponse, error) { d.logger.Debug(ctx).Any("request", in).Msg("calling Teardown") - out, err := runSandbox(d.impl.Teardown, d.withLogger(ctx), in.Clone(), d.logger) + out, err := runSandbox(d.impl.Teardown, d.withLogger(ctx), in.Clone(), d.logger, "Teardown") return out.Clone(), err } func (d *destinationPluginAdapter) LifecycleOnCreated(ctx context.Context, in pconnector.DestinationLifecycleOnCreatedRequest) (pconnector.DestinationLifecycleOnCreatedResponse, error) { d.logger.Debug(ctx).Any("request", in).Msg("calling LifecycleOnCreated") - out, err := runSandbox(d.impl.LifecycleOnCreated, d.withLogger(ctx), in.Clone(), d.logger) + out, err := runSandbox(d.impl.LifecycleOnCreated, d.withLogger(ctx), in.Clone(), d.logger, "LifecycleOnCreated") return out.Clone(), err } func (d *destinationPluginAdapter) LifecycleOnUpdated(ctx context.Context, in pconnector.DestinationLifecycleOnUpdatedRequest) (pconnector.DestinationLifecycleOnUpdatedResponse, error) { d.logger.Debug(ctx).Any("request", in).Msg("calling LifecycleOnUpdated") - out, err := runSandbox(d.impl.LifecycleOnUpdated, d.withLogger(ctx), in.Clone(), d.logger) + out, err := runSandbox(d.impl.LifecycleOnUpdated, d.withLogger(ctx), in.Clone(), d.logger, "LifecycleOnUpdated") return out.Clone(), err } func (d *destinationPluginAdapter) LifecycleOnDeleted(ctx context.Context, in pconnector.DestinationLifecycleOnDeletedRequest) (pconnector.DestinationLifecycleOnDeletedResponse, error) { d.logger.Debug(ctx).Any("request", in).Msg("calling LifecycleOnDeleted") - out, err := runSandbox(d.impl.LifecycleOnDeleted, d.withLogger(ctx), in.Clone(), d.logger) + out, err := runSandbox(d.impl.LifecycleOnDeleted, d.withLogger(ctx), in.Clone(), d.logger, "LifecycleOnDeleted") return out.Clone(), err } diff --git a/pkg/plugin/connector/builtin/sandbox.go b/pkg/plugin/connector/builtin/sandbox.go index 0e5c71995..ca1d3bf83 100644 --- a/pkg/plugin/connector/builtin/sandbox.go +++ b/pkg/plugin/connector/builtin/sandbox.go @@ -37,6 +37,7 @@ func runSandbox[REQ any, RES any]( ctx context.Context, req REQ, logger log.CtxLogger, + name string, ) (RES, error) { c := sandboxChanPool.Get().(chan any) @@ -61,7 +62,7 @@ func runSandbox[REQ any, RES any]( select { case <-ctx.Done(): // Context was cancelled, detach from calling goroutine and return. - logger.Error(ctx).Msg("context cancelled while waiting for builtin connector plugin to respond, detaching from plugin") + logger.Error(ctx).Msgf("context cancelled while waiting for builtin connector plugin to respond to %q, detaching from plugin", name) var emptyRes RES return emptyRes, ctx.Err() case v := <-c: diff --git a/pkg/plugin/connector/builtin/sandbox_test.go b/pkg/plugin/connector/builtin/sandbox_test.go index 3689e9d45..baa1c3261 100644 --- a/pkg/plugin/connector/builtin/sandbox_test.go +++ b/pkg/plugin/connector/builtin/sandbox_test.go @@ -87,7 +87,7 @@ func TestRunSandbox(t *testing.T) { for _, td := range testData { t.Run(td.name, func(t *testing.T) { - gotResp, gotErr := runSandbox(td.f, ctx, td.req, logger) + gotResp, gotErr := runSandbox(td.f, ctx, td.req, logger, "test") is.Equal(gotResp, td.resp) if td.strict { // strict mode means we expect a very specific error diff --git a/pkg/plugin/connector/builtin/source.go b/pkg/plugin/connector/builtin/source.go index c6dfc2f63..85ee1f963 100644 --- a/pkg/plugin/connector/builtin/source.go +++ b/pkg/plugin/connector/builtin/source.go @@ -58,13 +58,13 @@ func (s *sourcePluginAdapter) withLogger(ctx context.Context) context.Context { func (s *sourcePluginAdapter) Configure(ctx context.Context, in pconnector.SourceConfigureRequest) (pconnector.SourceConfigureResponse, error) { s.logger.Debug(ctx).Any("request", in).Msg("calling Configure") - out, err := runSandbox(s.impl.Configure, s.withLogger(ctx), in.Clone(), s.logger) + out, err := runSandbox(s.impl.Configure, s.withLogger(ctx), in.Clone(), s.logger, "Configure") return out.Clone(), err } func (s *sourcePluginAdapter) Open(ctx context.Context, in pconnector.SourceOpenRequest) (pconnector.SourceOpenResponse, error) { s.logger.Debug(ctx).Any("request", in).Msg("calling Start") - out, err := runSandbox(s.impl.Open, s.withLogger(ctx), in.Clone(), s.logger) + out, err := runSandbox(s.impl.Open, s.withLogger(ctx), in.Clone(), s.logger, "Start") return out.Clone(), err } @@ -97,31 +97,31 @@ func (s *sourcePluginAdapter) Run(ctx context.Context, stream pconnector.SourceR func (s *sourcePluginAdapter) Stop(ctx context.Context, in pconnector.SourceStopRequest) (pconnector.SourceStopResponse, error) { s.logger.Debug(ctx).Any("request", in).Msg("calling Stop") - out, err := runSandbox(s.impl.Stop, s.withLogger(ctx), in.Clone(), s.logger) + out, err := runSandbox(s.impl.Stop, s.withLogger(ctx), in.Clone(), s.logger, "Stop") return out.Clone(), err } func (s *sourcePluginAdapter) Teardown(ctx context.Context, in pconnector.SourceTeardownRequest) (pconnector.SourceTeardownResponse, error) { s.logger.Debug(ctx).Any("request", in).Msg("calling Teardown") - out, err := runSandbox(s.impl.Teardown, s.withLogger(ctx), in.Clone(), s.logger) + out, err := runSandbox(s.impl.Teardown, s.withLogger(ctx), in.Clone(), s.logger, "Teardown") return out.Clone(), err } func (s *sourcePluginAdapter) LifecycleOnCreated(ctx context.Context, in pconnector.SourceLifecycleOnCreatedRequest) (pconnector.SourceLifecycleOnCreatedResponse, error) { s.logger.Debug(ctx).Any("request", in).Msg("calling LifecycleOnCreated") - out, err := runSandbox(s.impl.LifecycleOnCreated, s.withLogger(ctx), in.Clone(), s.logger) + out, err := runSandbox(s.impl.LifecycleOnCreated, s.withLogger(ctx), in.Clone(), s.logger, "LifecycleOnCreated") return out.Clone(), err } func (s *sourcePluginAdapter) LifecycleOnUpdated(ctx context.Context, in pconnector.SourceLifecycleOnUpdatedRequest) (pconnector.SourceLifecycleOnUpdatedResponse, error) { s.logger.Debug(ctx).Any("request", in).Msg("calling LifecycleOnUpdated") - out, err := runSandbox(s.impl.LifecycleOnUpdated, s.withLogger(ctx), in.Clone(), s.logger) + out, err := runSandbox(s.impl.LifecycleOnUpdated, s.withLogger(ctx), in.Clone(), s.logger, "LifecycleOnUpdated") return out.Clone(), err } func (s *sourcePluginAdapter) LifecycleOnDeleted(ctx context.Context, in pconnector.SourceLifecycleOnDeletedRequest) (pconnector.SourceLifecycleOnDeletedResponse, error) { s.logger.Debug(ctx).Any("request", in).Msg("calling LifecycleOnDeleted") - out, err := runSandbox(s.impl.LifecycleOnDeleted, s.withLogger(ctx), in.Clone(), s.logger) + out, err := runSandbox(s.impl.LifecycleOnDeleted, s.withLogger(ctx), in.Clone(), s.logger, "LifecycleOnDeleted") return out.Clone(), err } diff --git a/pkg/plugin/connector/builtin/specifier.go b/pkg/plugin/connector/builtin/specifier.go index 6ec1d66ab..366289883 100644 --- a/pkg/plugin/connector/builtin/specifier.go +++ b/pkg/plugin/connector/builtin/specifier.go @@ -38,5 +38,5 @@ func newSpecifierPluginAdapter(impl pconnector.SpecifierPlugin, logger log.CtxLo } func (s *specifierPluginAdapter) Specify(ctx context.Context, in pconnector.SpecifierSpecifyRequest) (pconnector.SpecifierSpecifyResponse, error) { - return runSandbox(s.impl.Specify, ctx, in, s.logger) + return runSandbox(s.impl.Specify, ctx, in, s.logger, "Specify") }