diff --git a/internal/pkg/service/stream/config/config_test.go b/internal/pkg/service/stream/config/config_test.go index eae87eab38..d864e03d5b 100644 --- a/internal/pkg/service/stream/config/config_test.go +++ b/internal/pkg/service/stream/config/config_test.go @@ -129,6 +129,8 @@ sink: keboola: # Timeout to perform upload send event of slice or import event of file eventSendTimeout: 30s + # Number of import jobs running in keboola for single sink + jobLimit: 2 storage: # Mounted volumes path, each volume is in "{type}/{label}" subdir. Validation rules: required volumesPath: "" @@ -147,8 +149,8 @@ storage: metadataCleanup: # Enable local storage metadata cleanup. enabled: true - # Cleanup interval. Validation rules: required,minDuration=5m,maxDuration=24h - interval: 5m0s + # Cleanup interval. Validation rules: required,minDuration=30s,maxDuration=24h + interval: 30s # How many files are deleted in parallel. Validation rules: required,min=1,max=500 concurrency: 50 # Expiration interval of a file that has not yet been imported. Validation rules: required,minDuration=1h,maxDuration=720h,gtefield=ArchivedFileExpiration diff --git a/internal/pkg/service/stream/definition/key/job.go b/internal/pkg/service/stream/definition/key/job.go new file mode 100644 index 0000000000..2ad7675a0b --- /dev/null +++ b/internal/pkg/service/stream/definition/key/job.go @@ -0,0 +1,31 @@ +package key + +import ( + "go.opentelemetry.io/otel/attribute" + + "github.com/keboola/keboola-as-code/internal/pkg/utils/errors" +) + +type JobID string + +type JobKey struct { + SinkKey + JobID JobID `json:"jobId" validate:"required,min=1,max=48"` +} + +func (v JobID) String() string { + if v == "" { + panic(errors.New("JobID cannot be empty")) + } + return string(v) +} + +func (v JobKey) String() string { + return v.SinkKey.String() + "/" + v.JobID.String() +} + +func (v JobKey) Telemetry() []attribute.KeyValue { + t := v.SinkKey.Telemetry() + t = append(t, attribute.String("job.id", v.JobID.String())) + return t +} diff --git a/internal/pkg/service/stream/definition/repository/branch/branch_create.go b/internal/pkg/service/stream/definition/repository/branch/branch_create.go index 8d1d0dec01..2a83dbc444 100644 --- a/internal/pkg/service/stream/definition/repository/branch/branch_create.go +++ b/internal/pkg/service/stream/definition/repository/branch/branch_create.go @@ -48,7 +48,7 @@ func (r *Repository) Create(input *definition.Branch, now time.Time, by definiti created.SetCreation(now, by) return r.save(ctx, now, by, nil, &created) }). - // Update the input entity, it the operation is successful + // Update the input entity, if the operation is successful OnResult(func(entity definition.Branch) { *input = entity }) diff --git a/internal/pkg/service/stream/definition/repository/sink/sink_create.go b/internal/pkg/service/stream/definition/repository/sink/sink_create.go index d9445319bf..1b8d3b41b5 100644 --- a/internal/pkg/service/stream/definition/repository/sink/sink_create.go +++ b/internal/pkg/service/stream/definition/repository/sink/sink_create.go @@ -48,7 +48,7 @@ func (r *Repository) Create(input *definition.Sink, now time.Time, by definition created.IncrementVersion(created, now, by, versionDescription) return r.save(ctx, now, by, nil, &created) }). - // Update the input entity, it the operation is successful + // Update the input entity, if the operation is successful OnResult(func(entity definition.Sink) { *input = entity }) diff --git a/internal/pkg/service/stream/definition/repository/source/source_create.go b/internal/pkg/service/stream/definition/repository/source/source_create.go index a3e6f017dc..239bbae62b 100644 --- a/internal/pkg/service/stream/definition/repository/source/source_create.go +++ b/internal/pkg/service/stream/definition/repository/source/source_create.go @@ -48,7 +48,7 @@ func (r *Repository) Create(input *definition.Source, now time.Time, by definiti created.IncrementVersion(created, now, by, versionDescription) return r.save(ctx, now, by, nil, &created) }). - // Update the input entity, it the operation is successful + // Update the input entity, if the operation is successful OnResult(func(result definition.Source) { *input = result }) diff --git a/internal/pkg/service/stream/dependencies/dependencies.go b/internal/pkg/service/stream/dependencies/dependencies.go index e3dce9bad6..7c56d0bf43 100644 --- a/internal/pkg/service/stream/dependencies/dependencies.go +++ b/internal/pkg/service/stream/dependencies/dependencies.go @@ -52,6 +52,7 @@ import ( "github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin" sinkRouter "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/router" keboolaSinkBridge "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge" + keboolaBridgeRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskreader" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection" @@ -85,6 +86,7 @@ type ServiceScope interface { StatisticsRepository() *statsRepo.Repository AggregationRepository() *aggregationRepo.Repository KeboolaSinkBridge() *keboolaSinkBridge.Bridge + KeboolaBridgeRepository() *keboolaBridgeRepo.Repository } type APIScope interface { diff --git a/internal/pkg/service/stream/dependencies/service.go b/internal/pkg/service/stream/dependencies/service.go index d65d70a2e9..e84c29942e 100644 --- a/internal/pkg/service/stream/dependencies/service.go +++ b/internal/pkg/service/stream/dependencies/service.go @@ -21,6 +21,7 @@ import ( definitionRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/repository" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin" keboolaSinkBridge "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge" + keboolaBridgeRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model" storageRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model/repository" statsRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/statistics/repository" @@ -46,6 +47,7 @@ type serviceScope struct { storageStatisticsRepository *statsRepo.Repository aggregationRepository *aggregationRepo.Repository keboolaBridge *keboolaSinkBridge.Bridge + keboolaBridgeRepository *keboolaBridgeRepo.Repository } type parentScopes struct { @@ -200,6 +202,11 @@ func newServiceScope(baseScp dependencies.BaseScope, publicScp dependencies.Publ return api } + d.keboolaBridgeRepository, err = keboolaBridgeRepo.New(cfg.Storage.Level, d) + if err != nil { + return nil, err + } + d.keboolaBridge = keboolaSinkBridge.New(d, apiCtxProvider, cfg.Sink.Table.Keboola) d.storageStatisticsRepository = statsRepo.New(d) @@ -225,6 +232,10 @@ func (v *serviceScope) KeboolaSinkBridge() *keboolaSinkBridge.Bridge { return v.keboolaBridge } +func (v *serviceScope) KeboolaBridgeRepository() *keboolaBridgeRepo.Repository { + return v.keboolaBridgeRepository +} + func (v *serviceScope) StorageRepository() *storageRepo.Repository { return v.storageRepository } diff --git a/internal/pkg/service/stream/plugin/collection.go b/internal/pkg/service/stream/plugin/collection.go index 491f10d631..1ed5192ff4 100644 --- a/internal/pkg/service/stream/plugin/collection.go +++ b/internal/pkg/service/stream/plugin/collection.go @@ -207,6 +207,15 @@ func (c *Collection) OnSinkModification(fn onSinkSaveFn) { }) } +func (c *Collection) OnSinkDelete(fn onSinkSaveFn) { + c.onSinkSave = append(c.onSinkSave, func(ctx context.Context, now time.Time, by definition.By, original, sink *definition.Sink) error { + if isDeletedNow(original, sink) { + return fn(ctx, now, by, original, sink) + } + return nil + }) +} + func (c *Collection) OnFileOpen(fn onFileOpenFn) { c.onFileOpen = append(c.onFileOpen, fn) } diff --git a/internal/pkg/service/stream/plugin/file.go b/internal/pkg/service/stream/plugin/file.go index 84516d033e..80b1c60bd2 100644 --- a/internal/pkg/service/stream/plugin/file.go +++ b/internal/pkg/service/stream/plugin/file.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" targetModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/target/model" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/statistics" @@ -17,12 +18,19 @@ type File struct { Provider targetModel.Provider } -type importFileFn func(ctx context.Context, file File, stats statistics.Value) error +type ( + importFileFn func(ctx context.Context, file File, stats statistics.Value) error + canAcceptNewFileFn func(ctx context.Context, sinkKey key.SinkKey) bool +) func (p *Plugins) RegisterFileImporter(provider targetModel.Provider, fn importFileFn) { p.fileImport[provider] = fn } +func (p *Plugins) RegisterCanAcceptNewFile(provider targetModel.Provider, fn canAcceptNewFileFn) { + p.canAcceptNewFile[provider] = fn +} + func (p *Plugins) ImportFile(ctx context.Context, file File, stats statistics.Value) error { if _, ok := p.fileImport[file.Provider]; !ok { return errors.New(fmt.Sprintf("no importer for given provider: %v", file.Provider)) @@ -30,3 +38,11 @@ func (p *Plugins) ImportFile(ctx context.Context, file File, stats statistics.Va return p.fileImport[file.Provider](ctx, file, stats) } + +func (p *Plugins) CanAcceptNewFile(ctx context.Context, provider targetModel.Provider, sinkKey key.SinkKey) bool { + if fn, ok := p.canAcceptNewFile[provider]; ok { + return fn(ctx, sinkKey) + } + + return true +} diff --git a/internal/pkg/service/stream/plugin/plugin.go b/internal/pkg/service/stream/plugin/plugin.go index 63ef25c70f..5c7fdf5ddb 100644 --- a/internal/pkg/service/stream/plugin/plugin.go +++ b/internal/pkg/service/stream/plugin/plugin.go @@ -17,6 +17,7 @@ type Plugins struct { sinkPipelineOpeners []pipeline.Opener sliceUploader map[stagingModel.FileProvider]uploadSliceFn fileImport map[targetModel.Provider]importFileFn + canAcceptNewFile map[targetModel.Provider]canAcceptNewFileFn } type fnList[T any] []T @@ -25,10 +26,11 @@ func New(logger log.Logger) *Plugins { c := &Collection{} e := &Executor{logger: logger.WithComponent("plugin"), collection: c} return &Plugins{ - collection: c, - executor: e, - sliceUploader: make(map[stagingModel.FileProvider]uploadSliceFn), - fileImport: make(map[targetModel.Provider]importFileFn), + collection: c, + executor: e, + sliceUploader: make(map[stagingModel.FileProvider]uploadSliceFn), + fileImport: make(map[targetModel.Provider]importFileFn), + canAcceptNewFile: make(map[targetModel.Provider]canAcceptNewFileFn), } } diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/bridge.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/bridge.go index be152d95b2..c9fcde4b3b 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/bridge.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/bridge.go @@ -1,17 +1,26 @@ package bridge import ( + "context" + "sync" + "github.com/keboola/go-client/pkg/keboola" etcd "go.etcd.io/etcd/client/v3" "golang.org/x/sync/singleflight" "github.com/keboola/keboola-as-code/internal/pkg/log" "github.com/keboola/keboola-as-code/internal/pkg/service/common/distlock" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/op" "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/servicectx" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin" keboolasink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola" - "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/schema" + bridgeModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" + keboolaBridgeRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema" stagingModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/staging/model" targetModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/target/model" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model" @@ -31,43 +40,52 @@ const ( ) type Bridge struct { - logger log.Logger - config keboolasink.Config - client etcd.KV - schema schema.Schema - plugins *plugin.Plugins - publicAPI *keboola.PublicAPI - apiProvider apiProvider - storageRepository *storageRepo.Repository - locks *distlock.Provider + logger log.Logger + config keboolasink.Config + client etcd.KV + schema schema.Schema + plugins *plugin.Plugins + publicAPI *keboola.PublicAPI + apiProvider apiProvider + storageRepository *storageRepo.Repository + keboolaBridgeRepository *keboolaBridgeRepo.Repository + locks *distlock.Provider + jobs *etcdop.MirrorMap[bridgeModel.Job, key.JobKey, *jobData] getBucketOnce *singleflight.Group createBucketOnce *singleflight.Group } +type jobData struct { + key.JobKey +} + type dependencies interface { Logger() log.Logger EtcdClient() *etcd.Client EtcdSerde() *serde.Serde + Process() *servicectx.Process Plugins() *plugin.Plugins KeboolaPublicAPI() *keboola.PublicAPI StorageRepository() *storageRepo.Repository + KeboolaBridgeRepository() *keboolaBridgeRepo.Repository DistributedLockProvider() *distlock.Provider } func New(d dependencies, apiProvider apiProvider, config keboolasink.Config) *Bridge { b := &Bridge{ - logger: d.Logger().WithComponent("keboola.bridge"), - config: config, - client: d.EtcdClient(), - schema: schema.New(d.EtcdSerde()), - plugins: d.Plugins(), - publicAPI: d.KeboolaPublicAPI(), - apiProvider: apiProvider, - storageRepository: d.StorageRepository(), - locks: d.DistributedLockProvider(), - getBucketOnce: &singleflight.Group{}, - createBucketOnce: &singleflight.Group{}, + logger: d.Logger().WithComponent("keboola.bridge"), + config: config, + client: d.EtcdClient(), + schema: schema.New(d.EtcdSerde()), + plugins: d.Plugins(), + publicAPI: d.KeboolaPublicAPI(), + apiProvider: apiProvider, + storageRepository: d.StorageRepository(), + keboolaBridgeRepository: d.KeboolaBridgeRepository(), + locks: d.DistributedLockProvider(), + getBucketOnce: &singleflight.Group{}, + createBucketOnce: &singleflight.Group{}, } b.setupOnFileOpen() @@ -75,7 +93,7 @@ func New(d dependencies, apiProvider apiProvider, config keboolasink.Config) *Br b.deleteTokenOnSinkDeactivation() b.plugins.RegisterFileImporter(targetProvider, b.importFile) b.plugins.RegisterSliceUploader(stagingFileProvider, b.uploadSlice) - + b.plugins.RegisterCanAcceptNewFile(targetProvider, b.canAcceptNewFile) return b } @@ -86,3 +104,35 @@ func (b *Bridge) isKeboolaTableSink(sink *definition.Sink) bool { func (b *Bridge) isKeboolaStagingFile(file *model.File) bool { return file.StagingStorage.Provider == stagingFileProvider } + +func (b *Bridge) MirrorJobs(ctx context.Context, d dependencies) error { + // Mirror jobs + wg := &sync.WaitGroup{} + ctx, cancel := context.WithCancel(ctx) + d.Process().OnShutdown(func(_ context.Context) { + b.logger.Info(ctx, "closing bridge job mirror") + + // Stop mirroring + cancel() + wg.Wait() + + b.logger.Info(ctx, "closed bridge job mirror") + }) + b.jobs = etcdop.SetupMirrorMap[bridgeModel.Job, key.JobKey, *jobData]( + b.keboolaBridgeRepository.Job().GetAllAndWatch(ctx, etcd.WithPrevKV()), + func(_ string, job bridgeModel.Job) key.JobKey { + return job.JobKey + }, + func(_ string, job bridgeModel.Job, rawValue *op.KeyValue, oldValue **jobData) *jobData { + return &jobData{ + job.JobKey, + } + }, + ).BuildMirror() + if err := <-b.jobs.StartMirroring(ctx, wg, b.logger); err != nil { + b.logger.Errorf(ctx, "cannot start mirroring jobs: %s", err) + return err + } + + return nil +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/bridge_test.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/bridge_test.go index ecdc9f3ad4..0c6a0d0ec0 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/bridge_test.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/bridge_test.go @@ -83,6 +83,7 @@ func TestBridge_FullWorkflow(t *testing.T) { bridgeTest.MockTokenStorageAPICalls(t, transport) bridgeTest.MockBucketStorageAPICalls(t, transport) bridgeTest.MockTableStorageAPICalls(t, transport) + bridgeTest.MockSuccessJobStorageAPICalls(t, transport) bridgeTest.MockFileStorageAPICalls(t, clk, transport) } diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/file.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/file.go index a362e29546..44def75808 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/file.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/file.go @@ -205,13 +205,18 @@ func (b *Bridge) importFile(ctx context.Context, file plugin.File, stats statist // Save job ID to etcd keboolaFile.StorageJobID = &job.ID - - b.logger.With(attribute.String("job.id", keboolaFile.StorageJobID.String())).Infof(ctx, "created new storage job for file") - err = b.schema.File().ForFile(file.FileKey).Put(b.client, keboolaFile).Do(ctx).Err() if err != nil { return err } + + // Save job ID to etcd + err = b.createJob(ctx, file, job) + if err != nil { + return err + } + + b.logger.Info(ctx, "created staging file") } // Wait for job to complete diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/file_test.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/file_test.go index ad113ab9a2..c4b7b89b0b 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/file_test.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/file_test.go @@ -55,6 +55,7 @@ func TestBridge_ImportFile_EmptyFile(t *testing.T) { bridgeTest.MockTokenStorageAPICalls(t, transport) bridgeTest.MockBucketStorageAPICalls(t, transport) bridgeTest.MockTableStorageAPICalls(t, transport) + bridgeTest.MockProcessingJobStorageAPICalls(t, transport) bridgeTest.MockFileStorageAPICalls(t, clk, transport) } @@ -105,7 +106,7 @@ func TestBridge_ImportFile_EmptyFile(t *testing.T) { require.NoError(t, storageRepo.Slice().SwitchToUploading(slice.SliceKey, clk.Now(), true).Do(ctx).Err()) require.NoError(t, storageRepo.Slice().SwitchToUploaded(slice.SliceKey, clk.Now()).Do(ctx).Err()) - // Switch file to the FileImporting state + // Switch empty file to the FileImporting state clk.Add(time.Second) require.NoError(t, storageRepo.File().SwitchToImporting(file.FileKey, clk.Now(), true).Do(ctx).Err()) } diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/job.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/job.go new file mode 100644 index 0000000000..e4c9d391f2 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/job.go @@ -0,0 +1,121 @@ +package bridge + +import ( + "context" + "fmt" + "math" + "strconv" + "time" + + "github.com/keboola/go-client/pkg/keboola" + "go.opentelemetry.io/otel/attribute" + + "github.com/keboola/keboola-as-code/internal/pkg/service/common/ctxattr" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" + "github.com/keboola/keboola-as-code/internal/pkg/utils/errors" +) + +func (b *Bridge) createJob(ctx context.Context, file plugin.File, storageJob *keboola.StorageJob) error { + keboolaJob := model.Job{ + JobKey: key.JobKey{SinkKey: file.SinkKey, JobID: key.JobID(storageJob.ID.String())}, + } + // Add context attributes + ctx = ctxattr.ContextWith(ctx, attribute.String("job.id", keboolaJob.String())) + b.logger.Debugf(ctx, "creating job") + + lock := b.locks.NewMutex(fmt.Sprintf("api.source.sink.jobs.%s", file.SinkKey)) + if err := lock.Lock(ctx); err != nil { + return err + } + defer func() { + if err := lock.Unlock(ctx); err != nil { + b.logger.Warnf(ctx, "cannot unlock lock %q: %s", lock.Key(), err) + } + }() + + operation := b.keboolaBridgeRepository.Job().Create(&keboolaJob).RequireLock(lock) + if err := operation.Do(ctx).Err(); err != nil { + return err + } + + b.logger.Debugf(ctx, "job created") + return nil +} + +func (b *Bridge) canAcceptNewFile(ctx context.Context, sinkKey key.SinkKey) bool { + // Count running jobs only for given sink accessed by file.SinkKey + var runningJobs int + b.jobs.ForEach(func(jobKey key.JobKey, _ *jobData) (stop bool) { + if jobKey.SinkKey == sinkKey { + runningJobs++ + } + + return false + }) + + return runningJobs < b.config.JobLimit +} + +func (b *Bridge) CleanJob(ctx context.Context, job model.Job) (err error, deleted bool) { + // Parse storage job ID from string + id64, err := strconv.ParseInt(string(job.JobKey.JobID), 10, 64) + if err != nil { + err = errors.PrefixErrorf(err, `cannot get keboola storage job "%s"`, job.JobKey) + b.logger.Error(ctx, err.Error()) + return err, false + } + + if id64 < math.MinInt || id64 > math.MaxInt { + err = errors.Errorf("parsed job ID %d is out of int range", id64) + b.logger.Error(ctx, err.Error()) + return err, false + } + + token, err := b.schema.Token().ForSink(job.SinkKey).GetOrErr(b.client).Do(ctx).ResultOrErr() + if err != nil { + b.logger.Warnf(ctx, "cannot get token for sink, already deleted: %s", err.Error()) + return nil, false + } + + // Get job details from storage API + id := int(id64) + api := b.publicAPI.NewAuthorizedAPI(token.TokenString(), 1*time.Minute) + var jobStatus *keboola.StorageJob + if jobStatus, err = api.GetStorageJobRequest(keboola.StorageJobKey{ID: keboola.StorageJobID(id)}).Send(ctx); err != nil { + b.logger.Warnf(ctx, "cannot get information about storage job, probably already deleted: %s", err.Error()) + return nil, false + } + + attr := attribute.String("job.state", jobStatus.Status) + ctx = ctxattr.ContextWith(ctx, attr) + // Check status of storage job + if jobStatus.Status == keboola.StorageJobStatusProcessing || jobStatus.Status == keboola.StorageJobStatusWaiting { + b.logger.Debugf(ctx, "cannot remove storage job, job status: %s", jobStatus.Status) + return nil, false + } + + // Acquire lock + mutex := b.locks.NewMutex(fmt.Sprintf("api.source.sink.jobs.%s", job.SinkKey)) + if err = mutex.TryLock(ctx); err != nil { + return err, false + } + defer func() { + if err := mutex.Unlock(ctx); err != nil { + b.logger.Errorf(ctx, "cannot unlock the lock: %s", err) + } + }() + + // Purge job in bridge repository + if _, err = b.keboolaBridgeRepository.Job().Purge(&job).RequireLock(mutex).Do(ctx).ResultOrErr(); err != nil { + err = errors.PrefixErrorf(err, `cannot delete finished storage job "%s"`, job.JobKey) + b.logger.Error(ctx, err.Error()) + return err, false + } + + // Log job details + b.logger.Infof(ctx, `deleted finished storage job`) + + return nil, true +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/job_test.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/job_test.go new file mode 100644 index 0000000000..aee0fc8e8c --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/job_test.go @@ -0,0 +1,144 @@ +package bridge_test + +import ( + "context" + "testing" + "time" + + "github.com/benbjohnson/clock" + "github.com/keboola/go-client/pkg/keboola" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/client/v3/concurrency" + + commonDeps "github.com/keboola/keboola-as-code/internal/pkg/service/common/dependencies" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/duration" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/rollback" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/utctime" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/config" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies" + bridgeTest "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/test" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/node/coordinator/fileimport" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test" + "github.com/keboola/keboola-as-code/internal/pkg/utils/errors" +) + +func TestBridge_CreateJob(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + clk := clock.NewMock() + clk.Set(utctime.MustParse("2000-01-01T01:00:00.000Z").Time()) + by := test.ByUser() + + checkInterval := time.Minute + d, mock := dependencies.NewMockedCoordinatorScopeWithConfig(t, ctx, func(cfg *config.Config) { + cfg.Storage.Level.Target.Operator.FileImportCheckInterval = duration.From(checkInterval) + }, commonDeps.WithClock(clk)) + logger := mock.DebugLogger() + client := mock.TestEtcdClient() + defRepo := d.DefinitionRepository() + storageRepo := d.StorageRepository() + volumeRepo := storageRepo.Volume() + + apiCtx := rollback.ContextWith(ctx, rollback.New(d.Logger())) + apiCtx = context.WithValue(apiCtx, dependencies.KeboolaProjectAPICtxKey, mock.KeboolaProjectAPI()) + + // Register mocked responses + // ----------------------------------------------------------------------------------------------------------------- + transport := mock.MockedHTTPTransport() + { + bridgeTest.MockTokenStorageAPICalls(t, transport) + bridgeTest.MockBucketStorageAPICalls(t, transport) + bridgeTest.MockTableStorageAPICalls(t, transport) + bridgeTest.MockSuccessJobStorageAPICalls(t, transport) + bridgeTest.MockImportAsyncAPICalls(t, transport) + bridgeTest.MockFileStorageAPICalls(t, clk, transport) + } + + // Register active volumes + // ----------------------------------------------------------------------------------------------------------------- + { + session, err := concurrency.NewSession(client) + require.NoError(t, err) + defer func() { require.NoError(t, session.Close()) }() + test.RegisterWriterVolumes(t, ctx, volumeRepo, session, 1) + } + + // Create sink + // ----------------------------------------------------------------------------------------------------------------- + projectID := keboola.ProjectID(123) + branchKey := key.BranchKey{ProjectID: projectID, BranchID: 456} + sourceKey := key.SourceKey{BranchKey: branchKey, SourceID: "my-source"} + sinkKey := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink"} + { + branch := test.NewBranch(branchKey) + require.NoError(t, defRepo.Branch().Create(&branch, clk.Now(), by).Do(apiCtx).Err()) + source := test.NewSource(sourceKey) + require.NoError(t, defRepo.Source().Create(&source, clk.Now(), by, "Create source").Do(apiCtx).Err()) + sink := test.NewKeboolaTableSink(sinkKey) + require.NoError(t, defRepo.Sink().Create(&sink, clk.Now(), by, "Create sink").Do(apiCtx).Err()) + } + + // Switch file to the FileImporting state + // ----------------------------------------------------------------------------------------------------------------- + { + // Rotate file + clk.Add(time.Second) + require.NoError(t, storageRepo.File().Rotate(sinkKey, clk.Now()).Do(apiCtx).Err()) + files, err := storageRepo.File().ListAll().Do(ctx).All() + require.NoError(t, err) + require.Len(t, files, 2) + slices, err := storageRepo.Slice().ListIn(files[0].FileKey).Do(ctx).All() + require.NoError(t, err) + require.Len(t, slices, 1) + require.Equal(t, model.FileClosing, files[0].State) + require.Equal(t, model.FileWriting, files[1].State) + require.Equal(t, model.SliceClosing, slices[0].State) + file := files[0] + slice := slices[0] + + // Upload of non empty slice + clk.Add(time.Second) + require.NoError(t, storageRepo.Slice().SwitchToUploading(slice.SliceKey, clk.Now(), false).Do(ctx).Err()) + require.NoError(t, storageRepo.Slice().SwitchToUploaded(slice.SliceKey, clk.Now()).Do(ctx).Err()) + + // Switch file to the FileImporting state + clk.Add(time.Second) + require.NoError(t, storageRepo.File().SwitchToImporting(file.FileKey, clk.Now(), false).Do(ctx).Err()) + } + + // Start file import operator + // ----------------------------------------------------------------------------------------------------------------- + require.NoError(t, fileimport.Start(d, mock.TestConfig().Storage.Level.Target.Operator)) + + // Wait import of the empty file + // ----------------------------------------------------------------------------------------------------------------- + logger.Truncate() + clk.Add(checkInterval) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + logger.AssertJSONMessages(c, ` +{"level":"info","message":"importing file","component":"storage.node.operator.file.import"} +{"level":"debug","message":"creating job","job.id":"123/456/my-source/my-sink/321","component":"keboola.bridge"} +{"level":"debug","message":"job created","job.id":"123/456/my-source/my-sink/321","component":"keboola.bridge"} +{"level":"info","message":"imported file","component":"storage.node.operator.file.import"} +`) + }, 15*time.Second, 50*time.Millisecond) + + // Empty file in the Storage API has been deleted + // ----------------------------------------------------------------------------------------------------------------- + expectedImportAsyncAPICall := "POST https://connection.keboola.local/v2/storage/branch/456/tables/in.c-bucket.my-table/import-async" + expectedStorageJobsCall := "GET https://connection.keboola.local/v2/storage/jobs/321" + assert.Equal(t, 1, transport.GetCallCountInfo()[expectedImportAsyncAPICall]) + assert.Equal(t, 1, transport.GetCallCountInfo()[expectedStorageJobsCall]) + + // Shutdown + // ----------------------------------------------------------------------------------------------------------------- + d.Process().Shutdown(ctx, errors.New("bye bye")) + d.Process().WaitForShutdown() + logger.AssertNoErrorMessage(t) +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/job.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/job.go new file mode 100644 index 0000000000..23e8b57ad0 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/job.go @@ -0,0 +1,12 @@ +package model + +import ( + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" +) + +// Job contains all Keboola-specific data we need for polling jobs. +// At the end throttle the sink, so it does not overloads the service. +type Job struct { + key.JobKey + Deleted bool `json:"-"` // internal field to mark the entity for deletion, there is no soft delete +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_create_snapshot_001.txt b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_create_snapshot_001.txt new file mode 100644 index 0000000000..d3c93a1f51 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_create_snapshot_001.txt @@ -0,0 +1,11 @@ +<<<<< +storage/job/123/567/my-source/my-sink/321 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink", + "jobId": "321" +} +>>>>> diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_exists_snapshot_001.txt b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_exists_snapshot_001.txt new file mode 100644 index 0000000000..d3c93a1f51 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_exists_snapshot_001.txt @@ -0,0 +1,11 @@ +<<<<< +storage/job/123/567/my-source/my-sink/321 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink", + "jobId": "321" +} +>>>>> diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_get_snapshot_001.txt b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_get_snapshot_001.txt new file mode 100644 index 0000000000..d3c93a1f51 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_get_snapshot_001.txt @@ -0,0 +1,11 @@ +<<<<< +storage/job/123/567/my-source/my-sink/321 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink", + "jobId": "321" +} +>>>>> diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_list_snapshot_001.txt b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_list_snapshot_001.txt new file mode 100644 index 0000000000..d5ba5a31ce --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_list_snapshot_001.txt @@ -0,0 +1,23 @@ +<<<<< +storage/job/123/567/my-source/my-sink/321 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink", + "jobId": "321" +} +>>>>> + +<<<<< +storage/job/123/567/my-source/my-sink/322 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink", + "jobId": "322" +} +>>>>> diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_list_snapshot_002.txt b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_list_snapshot_002.txt new file mode 100644 index 0000000000..7a8f01ae65 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_list_snapshot_002.txt @@ -0,0 +1,23 @@ +<<<<< +storage/job/123/567/my-source/my-sink-1/321 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-1", + "jobId": "321" +} +>>>>> + +<<<<< +storage/job/123/567/my-source/my-sink-2/322 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-2", + "jobId": "322" +} +>>>>> diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_purge_snapshot_001.txt b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_purge_snapshot_001.txt new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_purge_snapshot_001.txt @@ -0,0 +1 @@ + diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_purge_snapshot_002.txt b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_purge_snapshot_002.txt new file mode 100644 index 0000000000..69fcae4ec3 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_purge_snapshot_002.txt @@ -0,0 +1,103 @@ +<<<<< +definition/branch/active/123/567 +----- +{ + "projectId": 123, + "branchId": 567, + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "isDefault": false +} +>>>>> + +<<<<< +definition/sink/deleted/123/567/my-source/my-sink +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink", + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "version": { + "number": 1, + "hash": "db3de4a03cd1e821", + "description": "Create sink", + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "deleted": { + "directly": true, + "at": "2000-01-01T02:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "type": "test", + "name": "My Sink", + "description": "My Description" +} +>>>>> + +<<<<< +definition/source/active/123/567/my-source +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "version": { + "number": 1, + "hash": "bfd6eea528be59d3", + "description": "Create source", + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "type": "test", + "name": "My Source", + "description": "My Description" +} +>>>>> diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_purge_snapshot_003.txt b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_purge_snapshot_003.txt new file mode 100644 index 0000000000..2e49c3404a --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_purge_snapshot_003.txt @@ -0,0 +1,213 @@ +<<<<< +definition/branch/active/123/567 +----- +{ + "projectId": 123, + "branchId": 567, + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "isDefault": false +} +>>>>> + +<<<<< +definition/sink/active/123/567/my-source/my-sink-2 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-2", + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "version": { + "number": 1, + "hash": "643bb252f9a162dc", + "description": "Create sink", + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "type": "test", + "name": "My Sink", + "description": "My Description" +} +>>>>> + +<<<<< +definition/sink/active/123/567/my-source/my-sink-3 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-3", + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "version": { + "number": 1, + "hash": "c5a8b1a04c738aff", + "description": "Create sink", + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "type": "test", + "name": "My Sink", + "description": "My Description" +} +>>>>> + +<<<<< +definition/sink/deleted/123/567/my-source/my-sink-1 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-1", + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "version": { + "number": 1, + "hash": "1cbabf8d301e11b4", + "description": "Create sink", + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "deleted": { + "directly": true, + "at": "2000-01-01T02:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "type": "test", + "name": "My Sink", + "description": "My Description" +} +>>>>> + +<<<<< +definition/source/active/123/567/my-source +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "version": { + "number": 1, + "hash": "bfd6eea528be59d3", + "description": "Create source", + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "type": "test", + "name": "My Source", + "description": "My Description" +} +>>>>> + +<<<<< +storage/job/123/567/my-source/my-sink-2/321 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-2", + "jobId": "321" +} +>>>>> + +<<<<< +storage/job/123/567/my-source/my-sink-2/322 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-2", + "jobId": "322" +} +>>>>> + +<<<<< +storage/job/123/567/my-source/my-sink-3/323 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-3", + "jobId": "323" +} +>>>>> diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_purge_snapshot_004.txt b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_purge_snapshot_004.txt new file mode 100644 index 0000000000..2f508f69d1 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_purge_snapshot_004.txt @@ -0,0 +1,210 @@ +<<<<< +definition/branch/active/123/567 +----- +{ + "projectId": 123, + "branchId": 567, + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "isDefault": false +} +>>>>> + +<<<<< +definition/sink/deleted/123/567/my-source/my-sink-1 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-1", + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "version": { + "number": 1, + "hash": "1cbabf8d301e11b4", + "description": "Create sink", + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "deleted": { + "directly": true, + "at": "2000-01-01T02:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "type": "test", + "name": "My Sink", + "description": "My Description" +} +>>>>> + +<<<<< +definition/sink/deleted/123/567/my-source/my-sink-2 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-2", + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "version": { + "number": 1, + "hash": "643bb252f9a162dc", + "description": "Create sink", + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "deleted": { + "directly": false, + "at": "2000-01-01T03:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "type": "test", + "name": "My Sink", + "description": "My Description" +} +>>>>> + +<<<<< +definition/sink/deleted/123/567/my-source/my-sink-3 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-3", + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "version": { + "number": 1, + "hash": "c5a8b1a04c738aff", + "description": "Create sink", + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "deleted": { + "directly": false, + "at": "2000-01-01T03:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "type": "test", + "name": "My Sink", + "description": "My Description" +} +>>>>> + +<<<<< +definition/source/deleted/123/567/my-source +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "version": { + "number": 1, + "hash": "bfd6eea528be59d3", + "description": "Create source", + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "deleted": { + "directly": true, + "at": "2000-01-01T03:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "type": "test", + "name": "My Source", + "description": "My Description" +} +>>>>> diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_purge_snapshot_005.txt b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_purge_snapshot_005.txt new file mode 100644 index 0000000000..2e49c3404a --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_purge_snapshot_005.txt @@ -0,0 +1,213 @@ +<<<<< +definition/branch/active/123/567 +----- +{ + "projectId": 123, + "branchId": 567, + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "isDefault": false +} +>>>>> + +<<<<< +definition/sink/active/123/567/my-source/my-sink-2 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-2", + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "version": { + "number": 1, + "hash": "643bb252f9a162dc", + "description": "Create sink", + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "type": "test", + "name": "My Sink", + "description": "My Description" +} +>>>>> + +<<<<< +definition/sink/active/123/567/my-source/my-sink-3 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-3", + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "version": { + "number": 1, + "hash": "c5a8b1a04c738aff", + "description": "Create sink", + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "type": "test", + "name": "My Sink", + "description": "My Description" +} +>>>>> + +<<<<< +definition/sink/deleted/123/567/my-source/my-sink-1 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-1", + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "version": { + "number": 1, + "hash": "1cbabf8d301e11b4", + "description": "Create sink", + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "deleted": { + "directly": true, + "at": "2000-01-01T02:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "type": "test", + "name": "My Sink", + "description": "My Description" +} +>>>>> + +<<<<< +definition/source/active/123/567/my-source +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "version": { + "number": 1, + "hash": "bfd6eea528be59d3", + "description": "Create source", + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "type": "test", + "name": "My Source", + "description": "My Description" +} +>>>>> + +<<<<< +storage/job/123/567/my-source/my-sink-2/321 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-2", + "jobId": "321" +} +>>>>> + +<<<<< +storage/job/123/567/my-source/my-sink-2/322 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-2", + "jobId": "322" +} +>>>>> + +<<<<< +storage/job/123/567/my-source/my-sink-3/323 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-3", + "jobId": "323" +} +>>>>> diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_purge_snapshot_006.txt b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_purge_snapshot_006.txt new file mode 100644 index 0000000000..a37526f14d --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/fixtures/job_purge_snapshot_006.txt @@ -0,0 +1,221 @@ +<<<<< +definition/branch/deleted/123/567 +----- +{ + "projectId": 123, + "branchId": 567, + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "deleted": { + "directly": true, + "at": "2000-01-01T03:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "isDefault": false +} +>>>>> + +<<<<< +definition/sink/deleted/123/567/my-source/my-sink-1 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-1", + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "version": { + "number": 1, + "hash": "1cbabf8d301e11b4", + "description": "Create sink", + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "deleted": { + "directly": true, + "at": "2000-01-01T02:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "type": "test", + "name": "My Sink", + "description": "My Description" +} +>>>>> + +<<<<< +definition/sink/deleted/123/567/my-source/my-sink-2 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-2", + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "version": { + "number": 1, + "hash": "643bb252f9a162dc", + "description": "Create sink", + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "deleted": { + "directly": false, + "at": "2000-01-01T03:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "type": "test", + "name": "My Sink", + "description": "My Description" +} +>>>>> + +<<<<< +definition/sink/deleted/123/567/my-source/my-sink-3 +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "sinkId": "my-sink-3", + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "version": { + "number": 1, + "hash": "c5a8b1a04c738aff", + "description": "Create sink", + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "deleted": { + "directly": false, + "at": "2000-01-01T03:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "type": "test", + "name": "My Sink", + "description": "My Description" +} +>>>>> + +<<<<< +definition/source/deleted/123/567/my-source +----- +{ + "projectId": 123, + "branchId": 567, + "sourceId": "my-source", + "created": { + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "version": { + "number": 1, + "hash": "bfd6eea528be59d3", + "description": "Create source", + "at": "2000-01-01T01:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "deleted": { + "directly": false, + "at": "2000-01-01T03:00:00.000Z", + "by": { + "type": "user", + "tokenId": "111", + "tokenDesc": "some.user@company.com", + "userId": "222", + "userName": "Some User" + } + }, + "type": "test", + "name": "My Source", + "description": "My Description" +} +>>>>> diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_create.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_create.go new file mode 100644 index 0000000000..d97cc7ea83 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_create.go @@ -0,0 +1,33 @@ +package job + +import ( + "context" + + "github.com/keboola/go-utils/pkg/deepcopy" + + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/op" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" +) + +// Create a new stream Job. +// +// - If the Job already exists, the ResourceAlreadyExistsError is returned. +func (r *Repository) Create(input *model.Job) *op.AtomicOp[model.Job] { + k := input.JobKey + var created model.Job + return op.Atomic(r.client, &created). + // Entity must not exist + Read(func(ctx context.Context) op.Op { + return r.MustNotExist(k) + }). + // Create + Write(func(ctx context.Context) op.Op { + // Create + created = deepcopy.Copy(*input).(model.Job) + return r.save(&created) + }). + // Update the input entity, if the operation is successful + OnResult(func(result model.Job) { + *input = result + }) +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_create_test.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_create_test.go new file mode 100644 index 0000000000..6d307f3578 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_create_test.go @@ -0,0 +1,99 @@ +package job_test + +import ( + "context" + "net/http" + "testing" + + "github.com/keboola/go-client/pkg/keboola" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + serviceErrors "github.com/keboola/keboola-as-code/internal/pkg/service/common/errors" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/utctime" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test/dummy" + "github.com/keboola/keboola-as-code/internal/pkg/utils/etcdhelper" +) + +func TestJobRepository_Create(t *testing.T) { + t.Parallel() + + by := test.ByUser() + ctx := context.Background() + now := utctime.MustParse("2000-01-01T01:00:00.000Z").Time() + + d, mocked := dependencies.NewMockedServiceScope(t, ctx) + client := mocked.TestEtcdClient() + repo := d.KeboolaBridgeRepository().Job() + ignoredEtcdKeys := etcdhelper.WithIgnoredKeyPattern("^(definition/branch)|(definition/source)|(definition/sink)") + + // Fixtures + projectID := keboola.ProjectID(123) + branchKey := key.BranchKey{ProjectID: projectID, BranchID: 567} + sourceKey := key.SourceKey{BranchKey: branchKey, SourceID: "my-source"} + sinkKey := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink"} + jobKey := key.JobKey{SinkKey: sinkKey, JobID: "321"} + + // Create - parent not found + // ----------------------------------------------------------------------------------------------------------------- + { + job := model.Job{JobKey: jobKey} + if err := repo.Create(&job).Do(ctx).Err(); assert.Error(t, err) { + assert.Equal(t, `branch "567" not found in the project`, err.Error()) + serviceErrors.AssertErrorStatusCode(t, http.StatusNotFound, err) + } + } + + // Create - ok + // ----------------------------------------------------------------------------------------------------------------- + { + branch := test.NewBranch(branchKey) + require.NoError(t, d.DefinitionRepository().Branch().Create(&branch, now, by).Do(ctx).Err()) + + source := test.NewSource(sourceKey) + require.NoError(t, d.DefinitionRepository().Source().Create(&source, now, by, "Create source").Do(ctx).Err()) + + sink := dummy.NewSink(sinkKey) + require.NoError(t, d.DefinitionRepository().Sink().Create(&sink, now, by, "Create sink").Do(ctx).Err()) + + job := model.Job{JobKey: jobKey} + result, err := repo.Create(&job).Do(ctx).ResultOrErr() + require.NoError(t, err) + assert.Equal(t, job, result) + + etcdhelper.AssertKVsFromFile(t, client, "fixtures/job_create_snapshot_001.txt", ignoredEtcdKeys) + } + + // ExistsOrErr - ok + // ----------------------------------------------------------------------------------------------------------------- + { + require.NoError(t, repo.ExistsOrErr(jobKey).Do(ctx).Err()) + } + + // Create - already exists + // ----------------------------------------------------------------------------------------------------------------- + { + job := model.Job{JobKey: jobKey} + if err := repo.Create(&job).Do(ctx).Err(); assert.Error(t, err) { + assert.Equal(t, `job "321" already exists in the sink`, err.Error()) + serviceErrors.AssertErrorStatusCode(t, http.StatusConflict, err) + } + } + + // Purge - ok + // ----------------------------------------------------------------------------------------------------------------- + { + job := model.Job{JobKey: jobKey} + assert.NoError(t, repo.Purge(&job).Do(ctx).Err()) + } + + // MustNotExists - not ok + // ----------------------------------------------------------------------------------------------------------------- + { + require.NoError(t, repo.MustNotExist(jobKey).Do(ctx).Err()) + } +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_exists.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_exists.go new file mode 100644 index 0000000000..f98476cd11 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_exists.go @@ -0,0 +1,29 @@ +package job + +import ( + serviceError "github.com/keboola/keboola-as-code/internal/pkg/service/common/errors" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/op" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" +) + +func (r *Repository) ExistsOrErr(k key.JobKey) *op.TxnOp[op.NoResult] { + return op.Txn(r.client). + Merge(r.sinks.ExistsOrErr(k.SinkKey)). + Merge(r.schema.ByKey(k).Exists(r.client). + WithEmptyResultAsError(func() error { + return serviceError.NewResourceNotFoundError("job", k.JobID.String(), "sink") + }), + ). + FirstErrorOnly() +} + +func (r *Repository) MustNotExist(k key.JobKey) *op.TxnOp[op.NoResult] { + return op.Txn(r.client). + Merge(r.sinks.ExistsOrErr(k.SinkKey)). + Merge(r.schema.ByKey(k).Exists(r.client). + WithNotEmptyResultAsError(func() error { + return serviceError.NewResourceAlreadyExistsError("job", k.JobID.String(), "sink") + }), + ). + FirstErrorOnly() +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_exists_test.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_exists_test.go new file mode 100644 index 0000000000..42a5ae3a83 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_exists_test.go @@ -0,0 +1,101 @@ +package job_test + +import ( + "context" + "net/http" + "testing" + + "github.com/keboola/go-client/pkg/keboola" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + serviceErrors "github.com/keboola/keboola-as-code/internal/pkg/service/common/errors" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/utctime" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test/dummy" + "github.com/keboola/keboola-as-code/internal/pkg/utils/etcdhelper" +) + +func TestJobRepository_Exists(t *testing.T) { + t.Parallel() + + by := test.ByUser() + ctx := context.Background() + now := utctime.MustParse("2000-01-01T01:00:00.000Z").Time() + + d, mocked := dependencies.NewMockedServiceScope(t, ctx) + client := mocked.TestEtcdClient() + repo := d.KeboolaBridgeRepository().Job() + ignoredEtcdKeys := etcdhelper.WithIgnoredKeyPattern("^(definition/branch)|(definition/source)|(definition/sink)") + + // Fixtures + projectID := keboola.ProjectID(123) + branchKey := key.BranchKey{ProjectID: projectID, BranchID: 567} + sourceKey := key.SourceKey{BranchKey: branchKey, SourceID: "my-source"} + sinkKey := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink"} + jobKey := key.JobKey{SinkKey: sinkKey, JobID: "321"} + + // Exists and MustNotExists - the job does not exists, MustNotExists returns error + // ----------------------------------------------------------------------------------------------------------------- + { + if err := repo.ExistsOrErr(jobKey).Do(ctx).Err(); assert.Error(t, err) { + assert.Equal(t, `branch "567" not found in the project`, err.Error()) + serviceErrors.AssertErrorStatusCode(t, http.StatusNotFound, err) + } + + require.Error(t, repo.MustNotExist(jobKey).Do(ctx).Err()) + } + + // Create - ok + // ----------------------------------------------------------------------------------------------------------------- + { + branch := test.NewBranch(branchKey) + require.NoError(t, d.DefinitionRepository().Branch().Create(&branch, now, by).Do(ctx).Err()) + + source := test.NewSource(sourceKey) + require.NoError(t, d.DefinitionRepository().Source().Create(&source, now, by, "Create source").Do(ctx).Err()) + + sink := dummy.NewSink(sinkKey) + require.NoError(t, d.DefinitionRepository().Sink().Create(&sink, now, by, "Create sink").Do(ctx).Err()) + + job := model.Job{JobKey: jobKey} + result, err := repo.Create(&job).Do(ctx).ResultOrErr() + require.NoError(t, err) + assert.Equal(t, job, result) + assert.Equal(t, now, sink.VersionModifiedAt().Time()) + + etcdhelper.AssertKVsFromFile(t, client, "fixtures/job_exists_snapshot_001.txt", ignoredEtcdKeys) + } + + // ExistsOrErr - ok + // ----------------------------------------------------------------------------------------------------------------- + { + require.NoError(t, repo.ExistsOrErr(jobKey).Do(ctx).Err()) + } + + // Create - already exists + // ----------------------------------------------------------------------------------------------------------------- + { + job := model.Job{JobKey: jobKey} + if err := repo.Create(&job).Do(ctx).Err(); assert.Error(t, err) { + assert.Equal(t, `job "321" already exists in the sink`, err.Error()) + serviceErrors.AssertErrorStatusCode(t, http.StatusConflict, err) + } + } + + // Purge - ok + // ----------------------------------------------------------------------------------------------------------------- + { + job := model.Job{JobKey: jobKey} + assert.NoError(t, repo.Purge(&job).Do(ctx).Err()) + } + + // MustNotExists - not ok + // ----------------------------------------------------------------------------------------------------------------- + { + require.NoError(t, repo.MustNotExist(jobKey).Do(ctx).Err()) + } +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_get.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_get.go new file mode 100644 index 0000000000..d1d2ff5721 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_get.go @@ -0,0 +1,16 @@ +package job + +import ( + serviceError "github.com/keboola/keboola-as-code/internal/pkg/service/common/errors" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/op" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" +) + +// Get returns job by key. +func (r *Repository) Get(k key.JobKey) op.WithResult[model.Job] { + return r.schema.ByKey(k).GetOrErr(r.client). + WithEmptyResultAsError(func() error { + return serviceError.NewResourceNotFoundError("job", k.String(), "sink") + }) +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_get_test.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_get_test.go new file mode 100644 index 0000000000..71e35001ff --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_get_test.go @@ -0,0 +1,79 @@ +package job_test + +import ( + "context" + "net/http" + "testing" + + "github.com/keboola/go-client/pkg/keboola" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + serviceErrors "github.com/keboola/keboola-as-code/internal/pkg/service/common/errors" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/utctime" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test/dummy" + "github.com/keboola/keboola-as-code/internal/pkg/utils/etcdhelper" +) + +func TestJobRepository_Get(t *testing.T) { + t.Parallel() + + by := test.ByUser() + ctx := context.Background() + now := utctime.MustParse("2000-01-01T01:00:00.000Z").Time() + + d, mocked := dependencies.NewMockedServiceScope(t, ctx) + client := mocked.TestEtcdClient() + repo := d.KeboolaBridgeRepository().Job() + ignoredEtcdKeys := etcdhelper.WithIgnoredKeyPattern("^(definition/branch)|(definition/source)|(definition/sink)") + + // Fixtures + projectID := keboola.ProjectID(123) + branchKey := key.BranchKey{ProjectID: projectID, BranchID: 567} + sourceKey := key.SourceKey{BranchKey: branchKey, SourceID: "my-source"} + sinkKey := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink"} + jobKey := key.JobKey{SinkKey: sinkKey, JobID: "321"} + + // Get - job does not exist + // ----------------------------------------------------------------------------------------------------------------- + { + result, err := repo.Get(jobKey).Do(ctx).ResultOrErr() + if assert.Error(t, err) { + assert.Equal(t, `job "123/567/my-source/my-sink/321" not found in the sink`, err.Error()) + serviceErrors.AssertErrorStatusCode(t, http.StatusNotFound, err) + } + assert.Equal(t, model.Job{}, result) + } + + // Create prerequisites and job + // ----------------------------------------------------------------------------------------------------------------- + { + branch := test.NewBranch(branchKey) + require.NoError(t, d.DefinitionRepository().Branch().Create(&branch, now, by).Do(ctx).Err()) + + source := test.NewSource(sourceKey) + require.NoError(t, d.DefinitionRepository().Source().Create(&source, now, by, "Create source").Do(ctx).Err()) + + sink := dummy.NewSink(sinkKey) + require.NoError(t, d.DefinitionRepository().Sink().Create(&sink, now, by, "Create sink").Do(ctx).Err()) + + job := model.Job{JobKey: jobKey} + result, err := repo.Create(&job).Do(ctx).ResultOrErr() + require.NoError(t, err) + assert.Equal(t, job, result) + + etcdhelper.AssertKVsFromFile(t, client, "fixtures/job_get_snapshot_001.txt", ignoredEtcdKeys) + } + + // Get - ok + // ----------------------------------------------------------------------------------------------------------------- + { + result, err := repo.Get(jobKey).Do(ctx).ResultOrErr() + require.NoError(t, err) + assert.Equal(t, model.Job{JobKey: jobKey}, result) + } +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_list.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_list.go new file mode 100644 index 0000000000..2c55c87782 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_list.go @@ -0,0 +1,19 @@ +package job + +import ( + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/iterator" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/schema" +) + +func (r *Repository) ListAll() iterator.DefinitionT[model.Job] { + return r.schema.GetAll(r.client) +} + +func (r *Repository) List(parentKey any, opts ...iterator.Option) iterator.DefinitionT[model.Job] { + return r.list(r.schema, parentKey, opts...) +} + +func (r *Repository) list(pfx schema.Job, parentKey any, opts ...iterator.Option) iterator.DefinitionT[model.Job] { + return pfx.In(parentKey).GetAll(r.client, opts...) +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_list_test.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_list_test.go new file mode 100644 index 0000000000..15f2eadb96 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_list_test.go @@ -0,0 +1,168 @@ +package job_test + +import ( + "context" + "testing" + + "github.com/keboola/go-client/pkg/keboola" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/keboola/keboola-as-code/internal/pkg/service/common/utctime" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test/dummy" + "github.com/keboola/keboola-as-code/internal/pkg/utils/etcdhelper" +) + +func TestJobRepository_List(t *testing.T) { + t.Parallel() + + by := test.ByUser() + ctx := context.Background() + now := utctime.MustParse("2000-01-01T01:00:00.000Z").Time() + + d, mocked := dependencies.NewMockedServiceScope(t, ctx) + client := mocked.TestEtcdClient() + repo := d.KeboolaBridgeRepository().Job() + ignoredEtcdKeys := etcdhelper.WithIgnoredKeyPattern("^(definition/branch)|(definition/source)|(definition/sink)") + + // Fixtures + projectID := keboola.ProjectID(123) + branchKey := key.BranchKey{ProjectID: projectID, BranchID: 567} + sourceKey := key.SourceKey{BranchKey: branchKey, SourceID: "my-source"} + sinkKey := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink"} + jobKey1 := key.JobKey{SinkKey: sinkKey, JobID: "321"} + jobKey2 := key.JobKey{SinkKey: sinkKey, JobID: "322"} + + // List - empty + // ----------------------------------------------------------------------------------------------------------------- + { + result, err := repo.List(projectID).Do(ctx).All() + assert.NoError(t, err) + assert.Empty(t, result) + } + + // Create two jobs - ok + // ----------------------------------------------------------------------------------------------------------------- + var job1, job2 model.Job + { + branch := test.NewBranch(branchKey) + require.NoError(t, d.DefinitionRepository().Branch().Create(&branch, now, by).Do(ctx).Err()) + + source := test.NewSource(sourceKey) + require.NoError(t, d.DefinitionRepository().Source().Create(&source, now, by, "Create source").Do(ctx).Err()) + + sink := dummy.NewSink(sinkKey) + require.NoError(t, d.DefinitionRepository().Sink().Create(&sink, now, by, "Create sink").Do(ctx).Err()) + + job1 = model.Job{JobKey: jobKey1} + result, err := repo.Create(&job1).Do(ctx).ResultOrErr() + require.NoError(t, err) + assert.Equal(t, job1, result) + + job2 = model.Job{JobKey: jobKey2} + result, err = repo.Create(&job2).Do(ctx).ResultOrErr() + require.NoError(t, err) + assert.Equal(t, job2, result) + + etcdhelper.AssertKVsFromFile(t, client, "fixtures/job_list_snapshot_001.txt", ignoredEtcdKeys) + } + + // List - ok + // ----------------------------------------------------------------------------------------------------------------- + { + result, err := repo.List(projectID).Do(ctx).All() + assert.NoError(t, err) + assert.Len(t, result, 2) + assert.Equal(t, []model.Job{job1, job2}, result) + } + + // List all - ok + // ----------------------------------------------------------------------------------------------------------------- + { + result, err := repo.ListAll().Do(ctx).All() + assert.NoError(t, err) + assert.Len(t, result, 2) + assert.Equal(t, []model.Job{job1, job2}, result) + } +} + +func TestJobRepository_ListDeleted(t *testing.T) { + t.Parallel() + + ctx := context.Background() + now := utctime.MustParse("2000-01-01T01:00:00.000Z").Time() + by := test.ByUser() + + d, mocked := dependencies.NewMockedServiceScope(t, ctx) + client := mocked.TestEtcdClient() + repo := d.KeboolaBridgeRepository().Job() + ignoredEtcdKeys := etcdhelper.WithIgnoredKeyPattern("^(definition/branch)|(definition/source)|(definition/sink)") + + // Fixtures + projectID := keboola.ProjectID(123) + branchKey := key.BranchKey{ProjectID: projectID, BranchID: 567} + sourceKey := key.SourceKey{BranchKey: branchKey, SourceID: "my-source"} + sinkKey1 := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink-1"} + sinkKey2 := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink-2"} + jobKey1 := key.JobKey{SinkKey: sinkKey1, JobID: "321"} + jobKey2 := key.JobKey{SinkKey: sinkKey2, JobID: "322"} + + // Create two sinks + // ----------------------------------------------------------------------------------------------------------------- + var sink1, sink2 definition.Sink + { + branch := test.NewBranch(branchKey) + require.NoError(t, d.DefinitionRepository().Branch().Create(&branch, now, by).Do(ctx).Err()) + + source := test.NewSource(sourceKey) + require.NoError(t, d.DefinitionRepository().Source().Create(&source, now, by, "Create source").Do(ctx).Err()) + + sink1 = dummy.NewSink(sinkKey1) + require.NoError(t, d.DefinitionRepository().Sink().Create(&sink1, now, by, "Create sink").Do(ctx).Err()) + + sink2 = dummy.NewSink(sinkKey2) + require.NoError(t, d.DefinitionRepository().Sink().Create(&sink2, now, by, "Create sink").Do(ctx).Err()) + } + + // Create two jobs - ok + // ----------------------------------------------------------------------------------------------------------------- + var job1, job2 model.Job + { + job1 = model.Job{JobKey: jobKey1} + result, err := repo.Create(&job1).Do(ctx).ResultOrErr() + require.NoError(t, err) + assert.Equal(t, job1, result) + + job2 = model.Job{JobKey: jobKey2} + result, err = repo.Create(&job2).Do(ctx).ResultOrErr() + require.NoError(t, err) + assert.Equal(t, job2, result) + + etcdhelper.AssertKVsFromFile(t, client, "fixtures/job_list_snapshot_002.txt", ignoredEtcdKeys) + } + + // SoftDelete two sinks - ok, removes jobs + // ----------------------------------------------------------------------------------------------------------------- + { + var err error + + sink1, err = d.DefinitionRepository().Sink().SoftDelete(sinkKey1, now, by).Do(ctx).ResultOrErr() + assert.NoError(t, err) + + sink2, err = d.DefinitionRepository().Sink().SoftDelete(sinkKey2, now, by).Do(ctx).ResultOrErr() + assert.NoError(t, err) + } + + // List - empty + // ----------------------------------------------------------------------------------------------------------------- + { + result, err := repo.List(sourceKey).Do(ctx).All() + assert.NoError(t, err) + assert.Len(t, result, 0) + } +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_purge.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_purge.go new file mode 100644 index 0000000000..53ae51b2bf --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_purge.go @@ -0,0 +1,69 @@ +package job + +import ( + "context" + "fmt" + "time" + + "github.com/keboola/go-utils/pkg/deepcopy" + + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/op" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" +) + +func (r *Repository) purgeJobsOnSinkDelete() { + r.plugins.Collection().OnSinkDelete(func(ctx context.Context, now time.Time, by definition.By, original, deleted *definition.Sink) error { + op.AtomicOpCtxFrom(ctx).AddFrom(r.purgeAllFrom(deleted.SinkKey)) + return nil + }) +} + +// purgeAllFrom the parent key (SinkKey, SourceKey, BranchKey, ProjectKey). +func (r *Repository) purgeAllFrom(parentKey fmt.Stringer) *op.AtomicOp[[]model.Job] { + var allOriginal, allDeleted []model.Job + atomicOp := op.Atomic(r.client, &allDeleted) + + // List by SinkKey + atomicOp.Read(func(ctx context.Context) op.Op { + return r.List(parentKey).WithAllTo(&allOriginal) + }) + + // Iterate all + atomicOp.Write(func(ctx context.Context) op.Op { + txn := op.Txn(r.client) + for _, old := range allOriginal { + // Purge job + purged := deepcopy.Copy(old).(model.Job) + purged.Deleted = true + + txn.Merge(r.save(&purged)) + allDeleted = append(allDeleted, purged) + } + return txn + }) + + return atomicOp +} + +// Purge purges the job. +func (r *Repository) Purge(input *model.Job) *op.AtomicOp[model.Job] { + k := input.JobKey + var deleted model.Job + return op.Atomic(r.client, &deleted). + // Entity must exist + Read(func(ctx context.Context) op.Op { + return r.ExistsOrErr(k) + }). + // Delete + Write(func(ctx context.Context) op.Op { + // entity to be deleted + deleted = deepcopy.Copy(*input).(model.Job) + deleted.Deleted = true + return r.save(&deleted) + }). + // Update the input entity, if the operation is successful + OnResult(func(result model.Job) { + *input = result + }) +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_purge_test.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_purge_test.go new file mode 100644 index 0000000000..36b4519a39 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_purge_test.go @@ -0,0 +1,360 @@ +package job_test + +import ( + "context" + "net/http" + "testing" + "time" + + "github.com/keboola/go-client/pkg/keboola" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + serviceErrors "github.com/keboola/keboola-as-code/internal/pkg/service/common/errors" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/utctime" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test/dummy" + "github.com/keboola/keboola-as-code/internal/pkg/utils/etcdhelper" +) + +func TestJobRepository_Purge(t *testing.T) { + t.Parallel() + + ctx := context.Background() + now := utctime.MustParse("2000-01-01T01:00:00.000Z").Time() + by := test.ByUser() + + d, mocked := dependencies.NewMockedServiceScope(t, ctx) + client := mocked.TestEtcdClient() + repo := d.KeboolaBridgeRepository().Job() + ignoredEtcdKeys := etcdhelper.WithIgnoredKeyPattern("^(definition/branch)|(definition/source)|(definition/sink)") + + // Fixtures + projectID := keboola.ProjectID(123) + branchKey := key.BranchKey{ProjectID: projectID, BranchID: 567} + sourceKey := key.SourceKey{BranchKey: branchKey, SourceID: "my-source"} + sinkKey := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink"} + jobKey := key.JobKey{SinkKey: sinkKey, JobID: "321"} + + // Purge - not found + // ----------------------------------------------------------------------------------------------------------------- + { + job := model.Job{JobKey: jobKey} + if err := repo.Purge(&job).Do(ctx).Err(); assert.Error(t, err) { + assert.Equal(t, `branch "567" not found in the project`, err.Error()) + serviceErrors.AssertErrorStatusCode(t, http.StatusNotFound, err) + } + } + + // Create - ok + // ----------------------------------------------------------------------------------------------------------------- + { + branch := test.NewBranch(branchKey) + require.NoError(t, d.DefinitionRepository().Branch().Create(&branch, now, by).Do(ctx).Err()) + + source := test.NewSource(sourceKey) + require.NoError(t, d.DefinitionRepository().Source().Create(&source, now, by, "Create source").Do(ctx).Err()) + + sink := dummy.NewSink(sinkKey) + require.NoError(t, d.DefinitionRepository().Sink().Create(&sink, now, by, "Create sink").Do(ctx).Err()) + + job := model.Job{JobKey: jobKey} + require.NoError(t, repo.Create(&job).Do(ctx).Err()) + } + + // ExistsOrErr - ok + // ----------------------------------------------------------------------------------------------------------------- + { + require.NoError(t, repo.ExistsOrErr(jobKey).Do(ctx).Err()) + } + + // Purge - ok + // ----------------------------------------------------------------------------------------------------------------- + { + job := model.Job{JobKey: jobKey} + assert.NoError(t, repo.Purge(&job).Do(ctx).Err()) + etcdhelper.AssertKVsFromFile(t, client, "fixtures/job_purge_snapshot_001.txt", ignoredEtcdKeys) + } + + // ExistsOrErr - not found + // ----------------------------------------------------------------------------------------------------------------- + { + if err := repo.ExistsOrErr(jobKey).Do(ctx).Err(); assert.Error(t, err) { + assert.Equal(t, `job "321" not found in the sink`, err.Error()) + } + } +} + +func TestJobRepository_PurgeJobsOnSinkDelete(t *testing.T) { + t.Parallel() + + ctx := context.Background() + now := utctime.MustParse("2000-01-01T01:00:00.000Z").Time() + by := test.ByUser() + + d, mocked := dependencies.NewMockedServiceScope(t, ctx) + client := mocked.TestEtcdClient() + repo := d.KeboolaBridgeRepository().Job() + ignoredEtcdKeys := etcdhelper.WithIgnoredKeyPattern("^(definition/source/version)|(definition/sink/version)") + + // Fixtures + projectID := keboola.ProjectID(123) + branchKey := key.BranchKey{ProjectID: projectID, BranchID: 567} + sourceKey := key.SourceKey{BranchKey: branchKey, SourceID: "my-source"} + sinkKey := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink"} + jobKey1 := key.JobKey{SinkKey: sinkKey, JobID: "321"} + jobKey2 := key.JobKey{SinkKey: sinkKey, JobID: "322"} + jobKey3 := key.JobKey{SinkKey: sinkKey, JobID: "323"} + + // Create Branch, Source and Sink + // ----------------------------------------------------------------------------------------------------------------- + var sink definition.Sink + { + branch := test.NewBranch(branchKey) + require.NoError(t, d.DefinitionRepository().Branch().Create(&branch, now, by).Do(ctx).Err()) + + source := test.NewSource(sourceKey) + require.NoError(t, d.DefinitionRepository().Source().Create(&source, now, by, "Create source").Do(ctx).Err()) + + sink = dummy.NewSink(sinkKey) + require.NoError(t, d.DefinitionRepository().Sink().Create(&sink, now, by, "Create sink").Do(ctx).Err()) + } + + // Create three Jobs + // ----------------------------------------------------------------------------------------------------------------- + { + job := model.Job{JobKey: jobKey1} + require.NoError(t, repo.Create(&job).Do(ctx).Err()) + + job = model.Job{JobKey: jobKey2} + require.NoError(t, repo.Create(&job).Do(ctx).Err()) + + job = model.Job{JobKey: jobKey3} + require.NoError(t, repo.Create(&job).Do(ctx).Err()) + } + + // Delete Sink + // ----------------------------------------------------------------------------------------------------------------- + { + now = now.Add(time.Hour) + var err error + sink, err = d.DefinitionRepository().Sink().SoftDelete(sinkKey, now, by).Do(ctx).ResultOrErr() + require.NoError(t, err) + assert.True(t, sink.IsDeleted()) + assert.Equal(t, now, sink.DeletedAt().Time()) + } + + // Check keys + // ----------------------------------------------------------------------------------------------------------------- + { + etcdhelper.AssertKVsFromFile(t, client, "fixtures/job_purge_snapshot_002.txt", ignoredEtcdKeys) + } +} + +func TestJobRepository_PurgeJobsOnSourceDelete_DeleteSource(t *testing.T) { + t.Parallel() + + ctx := context.Background() + now := utctime.MustParse("2000-01-01T01:00:00.000Z").Time() + by := test.ByUser() + + d, mocked := dependencies.NewMockedServiceScope(t, ctx) + client := mocked.TestEtcdClient() + repo := d.KeboolaBridgeRepository().Job() + ignoredEtcdKeys := etcdhelper.WithIgnoredKeyPattern("^(definition/source/version)|(definition/sink/version)") + + // Fixtures + projectID := keboola.ProjectID(123) + branchKey := key.BranchKey{ProjectID: projectID, BranchID: 567} + sourceKey := key.SourceKey{BranchKey: branchKey, SourceID: "my-source"} + sinkKey1 := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink-1"} + sinkKey2 := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink-2"} + sinkKey3 := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink-3"} + jobKey0 := key.JobKey{SinkKey: sinkKey1, JobID: "320"} + jobKey1 := key.JobKey{SinkKey: sinkKey2, JobID: "321"} + jobKey2 := key.JobKey{SinkKey: sinkKey2, JobID: "322"} + jobKey3 := key.JobKey{SinkKey: sinkKey3, JobID: "323"} + + // Create Branch and Source + // ----------------------------------------------------------------------------------------------------------------- + { + branch := test.NewBranch(branchKey) + require.NoError(t, d.DefinitionRepository().Branch().Create(&branch, now, by).Do(ctx).Err()) + + source := test.NewSource(sourceKey) + require.NoError(t, d.DefinitionRepository().Source().Create(&source, now, by, "Create source").Do(ctx).Err()) + } + + // Create sinks + // ----------------------------------------------------------------------------------------------------------------- + var sink1, sink2, sink3 definition.Sink + { + sink1 = dummy.NewSink(sinkKey1) + require.NoError(t, d.DefinitionRepository().Sink().Create(&sink1, now, by, "Create sink").Do(ctx).Err()) + sink2 = dummy.NewSink(sinkKey2) + require.NoError(t, d.DefinitionRepository().Sink().Create(&sink2, now, by, "Create sink").Do(ctx).Err()) + sink3 = dummy.NewSink(sinkKey3) + require.NoError(t, d.DefinitionRepository().Sink().Create(&sink3, now, by, "Create sink").Do(ctx).Err()) + } + + // Delete Sink1 + // ----------------------------------------------------------------------------------------------------------------- + { + now = now.Add(time.Hour) + var err error + sink1, err = d.DefinitionRepository().Sink().SoftDelete(sinkKey1, now, by).Do(ctx).ResultOrErr() + require.NoError(t, err) + assert.True(t, sink1.IsDeleted()) + assert.Equal(t, now, sink1.DeletedAt().Time()) + } + + // Create three out of four Jobs + // ----------------------------------------------------------------------------------------------------------------- + var job0, job1, job2, job3 model.Job + { + // This should not be able to create as sink1 is deleted + job0 = model.Job{JobKey: jobKey0} + if err := repo.Create(&job0).Do(ctx).Err(); assert.Error(t, err) { + assert.Equal(t, `sink "my-sink-1" not found in the source`, err.Error()) + serviceErrors.AssertErrorStatusCode(t, http.StatusNotFound, err) + } + + job1 = model.Job{JobKey: jobKey1} + require.NoError(t, repo.Create(&job1).Do(ctx).Err()) + + job2 = model.Job{JobKey: jobKey2} + require.NoError(t, repo.Create(&job2).Do(ctx).Err()) + + job3 = model.Job{JobKey: jobKey3} + require.NoError(t, repo.Create(&job3).Do(ctx).Err()) + etcdhelper.AssertKVsFromFile(t, client, "fixtures/job_purge_snapshot_003.txt", ignoredEtcdKeys) + } + + // Delete Source + // ----------------------------------------------------------------------------------------------------------------- + { + now = now.Add(time.Hour) + require.NoError(t, d.DefinitionRepository().Source().SoftDelete(sourceKey, now, by).Do(ctx).Err()) + etcdhelper.AssertKVsFromFile(t, client, "fixtures/job_purge_snapshot_004.txt", ignoredEtcdKeys) + } + { + if _, err := repo.MustNotExist(jobKey0).Do(ctx).ResultOrErr(); assert.Error(t, err) { + assert.Equal(t, `source "my-source" not found in the branch`, err.Error()) + serviceErrors.AssertErrorStatusCode(t, http.StatusNotFound, err) + } + + var err error + _, err = repo.MustNotExist(jobKey1).Do(ctx).ResultOrErr() + require.Error(t, err) + _, err = repo.MustNotExist(jobKey2).Do(ctx).ResultOrErr() + require.Error(t, err) + _, err = repo.MustNotExist(jobKey3).Do(ctx).ResultOrErr() + require.Error(t, err) + } +} + +func TestSinkRepository_DeleteSinksOnSourceDelete_DeleteBranch(t *testing.T) { + t.Parallel() + + ctx := context.Background() + now := utctime.MustParse("2000-01-01T01:00:00.000Z").Time() + by := test.ByUser() + + d, mocked := dependencies.NewMockedServiceScope(t, ctx) + client := mocked.TestEtcdClient() + repo := d.KeboolaBridgeRepository().Job() + ignoredEtcdKeys := etcdhelper.WithIgnoredKeyPattern("^(definition/source/version)|(definition/sink/version)") + + // Fixtures + projectID := keboola.ProjectID(123) + branchKey := key.BranchKey{ProjectID: projectID, BranchID: 567} + sourceKey := key.SourceKey{BranchKey: branchKey, SourceID: "my-source"} + sinkKey1 := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink-1"} + sinkKey2 := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink-2"} + sinkKey3 := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink-3"} + jobKey0 := key.JobKey{SinkKey: sinkKey1, JobID: "320"} + jobKey1 := key.JobKey{SinkKey: sinkKey2, JobID: "321"} + jobKey2 := key.JobKey{SinkKey: sinkKey2, JobID: "322"} + jobKey3 := key.JobKey{SinkKey: sinkKey3, JobID: "323"} + + // Create Branch and Source + // ----------------------------------------------------------------------------------------------------------------- + { + branch := test.NewBranch(branchKey) + require.NoError(t, d.DefinitionRepository().Branch().Create(&branch, now, by).Do(ctx).Err()) + + source := test.NewSource(sourceKey) + require.NoError(t, d.DefinitionRepository().Source().Create(&source, now, by, "Create source").Do(ctx).Err()) + } + + // Create sinks + // ----------------------------------------------------------------------------------------------------------------- + var sink1, sink2, sink3 definition.Sink + { + sink1 = dummy.NewSink(sinkKey1) + require.NoError(t, d.DefinitionRepository().Sink().Create(&sink1, now, by, "Create sink").Do(ctx).Err()) + sink2 = dummy.NewSink(sinkKey2) + require.NoError(t, d.DefinitionRepository().Sink().Create(&sink2, now, by, "Create sink").Do(ctx).Err()) + sink3 = dummy.NewSink(sinkKey3) + require.NoError(t, d.DefinitionRepository().Sink().Create(&sink3, now, by, "Create sink").Do(ctx).Err()) + } + + // Delete Sink1 + // ----------------------------------------------------------------------------------------------------------------- + { + now = now.Add(time.Hour) + var err error + sink1, err = d.DefinitionRepository().Sink().SoftDelete(sinkKey1, now, by).Do(ctx).ResultOrErr() + require.NoError(t, err) + assert.True(t, sink1.IsDeleted()) + assert.Equal(t, now, sink1.DeletedAt().Time()) + } + + // Create three out of four Jobs + // ----------------------------------------------------------------------------------------------------------------- + var job0, job1, job2, job3 model.Job + { + // This should not be able to create as sink1 is deleted + job0 = model.Job{JobKey: jobKey0} + if err := repo.Create(&job0).Do(ctx).Err(); assert.Error(t, err) { + assert.Equal(t, `sink "my-sink-1" not found in the source`, err.Error()) + serviceErrors.AssertErrorStatusCode(t, http.StatusNotFound, err) + } + + job1 = model.Job{JobKey: jobKey1} + require.NoError(t, repo.Create(&job1).Do(ctx).Err()) + + job2 = model.Job{JobKey: jobKey2} + require.NoError(t, repo.Create(&job2).Do(ctx).Err()) + + job3 = model.Job{JobKey: jobKey3} + require.NoError(t, repo.Create(&job3).Do(ctx).Err()) + etcdhelper.AssertKVsFromFile(t, client, "fixtures/job_purge_snapshot_005.txt", ignoredEtcdKeys) + } + + // Delete Branch + // ----------------------------------------------------------------------------------------------------------------- + { + now = now.Add(time.Hour) + require.NoError(t, d.DefinitionRepository().Branch().SoftDelete(branchKey, now, by).Do(ctx).Err()) + etcdhelper.AssertKVsFromFile(t, client, "fixtures/job_purge_snapshot_006.txt", ignoredEtcdKeys) + } + { + if _, err := repo.MustNotExist(jobKey0).Do(ctx).ResultOrErr(); assert.Error(t, err) { + assert.Equal(t, `branch "567" not found in the project`, err.Error()) + serviceErrors.AssertErrorStatusCode(t, http.StatusNotFound, err) + } + + var err error + _, err = repo.MustNotExist(jobKey1).Do(ctx).ResultOrErr() + require.Error(t, err) + _, err = repo.MustNotExist(jobKey2).Do(ctx).ResultOrErr() + require.Error(t, err) + _, err = repo.MustNotExist(jobKey3).Do(ctx).ResultOrErr() + require.Error(t, err) + } +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_repo.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_repo.go new file mode 100644 index 0000000000..7bf38b530f --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_repo.go @@ -0,0 +1,59 @@ +package job + +import ( + etcd "go.etcd.io/etcd/client/v3" + + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/op" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde" + definitionRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/repository" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/repository/sink" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/schema" +) + +type Repository struct { + client *etcd.Client + schema schema.Job + sinks *sink.Repository + plugins *plugin.Plugins +} + +type dependencies interface { + EtcdClient() *etcd.Client + EtcdSerde() *serde.Serde + DefinitionRepository() *definitionRepo.Repository + Plugins() *plugin.Plugins +} + +func NewRepository(d dependencies) *Repository { + r := &Repository{ + client: d.EtcdClient(), + schema: schema.New(d.EtcdSerde()), + sinks: d.DefinitionRepository().Sink(), + plugins: d.Plugins(), + } + + r.purgeJobsOnSinkDelete() + return r +} + +// save Job on create, triggers connected plugins to enrich the operation. +func (r *Repository) save(updated *model.Job) *op.TxnOp[model.Job] { + // Call no plugins + + saveTxn := op.TxnWithResult(r.client, updated) + if updated.Deleted { + // Delete entity from the active prefix + saveTxn.Then( + r.schema.ByKey(updated.JobKey).Delete(r.client), + ) + } else { + // Save record to the "active" prefix + saveTxn.Then( + r.schema.ByKey(updated.JobKey).Put(r.client, *updated), + ) + } + + return saveTxn +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_watch.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_watch.go new file mode 100644 index 0000000000..f8b2c16094 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/job_watch.go @@ -0,0 +1,14 @@ +package job + +import ( + "context" + + etcd "go.etcd.io/etcd/client/v3" + + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" +) + +func (r *Repository) GetAllAndWatch(ctx context.Context, opts ...etcd.OpOption) *etcdop.RestartableWatchStreamT[model.Job] { + return r.schema.GetAllAndWatch(ctx, r.client, opts...) +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/schema/job.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/schema/job.go new file mode 100644 index 0000000000..8502aed636 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/schema/job.go @@ -0,0 +1,61 @@ +package schema + +import ( + "github.com/keboola/go-client/pkg/keboola" + + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" + "github.com/keboola/keboola-as-code/internal/pkg/utils/errors" +) + +type ( + // Job is an etcd prefix that stores all Keboola-specific data we need for job polling. + Job struct { + etcdop.PrefixT[model.Job] + } +) + +func New(s *serde.Serde) Job { + return Job{PrefixT: etcdop.NewTypedPrefix[model.Job]("storage/job", s)} +} + +func (j Job) ForSink(k key.JobKey) etcdop.KeyT[model.Job] { + return j.PrefixT.Key(k.String()) +} + +func (j Job) In(objectKey any) etcdop.PrefixT[model.Job] { + switch k := objectKey.(type) { + case keboola.ProjectID: + return j.InProject(k) + case key.BranchKey: + return j.InBranch(k) + case key.SourceKey: + return j.InSource(k) + case key.SinkKey: + return j.InSink(k) + default: + panic(errors.Errorf(`unexpected Job parent key type "%T"`, objectKey)) + } +} + +func (j Job) InProject(k keboola.ProjectID) etcdop.PrefixT[model.Job] { + return j.PrefixT.Add(k.String()) +} + +func (j Job) InBranch(k key.BranchKey) etcdop.PrefixT[model.Job] { + return j.PrefixT.Add(k.String()) +} + +func (j Job) InSource(k key.SourceKey) etcdop.PrefixT[model.Job] { + return j.PrefixT.Add(k.String()) +} + +func (j Job) InSink(k key.SinkKey) etcdop.PrefixT[model.Job] { + return j.PrefixT.Add(k.String()) +} + +func (j Job) ByKey(k key.JobKey) etcdop.KeyT[model.Job] { + return j.PrefixT.Key(k.String()) +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/schema/job_schema_test.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/schema/job_schema_test.go new file mode 100644 index 0000000000..28010c7e2e --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job/schema/job_schema_test.go @@ -0,0 +1,89 @@ +package schema + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" +) + +func TestJobSchema(t *testing.T) { + t.Parallel() + s := New(serde.NewJSON(serde.NoValidation)) + + sourceKey := key.SourceKey{ + BranchKey: key.BranchKey{ + ProjectID: 123, + BranchID: 456, + }, + SourceID: "my-source", + } + + sinkKey := key.SinkKey{ + SourceKey: sourceKey, + SinkID: "my-sink", + } + + jobKey := key.JobKey{ + SinkKey: sinkKey, + JobID: "321", + } + + cases := []struct{ actual, expected string }{ + { + s.Prefix(), + "storage/job/", + }, + { + s.Prefix(), + "storage/job/", + }, + { + s.In(jobKey.ProjectID).Prefix(), + "storage/job/123/", + }, + { + s.In(jobKey.BranchKey).Prefix(), + "storage/job/123/456/", + }, + { + s.In(jobKey.SourceKey).Prefix(), + "storage/job/123/456/my-source/", + }, + { + s.InProject(jobKey.ProjectID).Prefix(), + "storage/job/123/", + }, + { + s.InBranch(jobKey.BranchKey).Prefix(), + "storage/job/123/456/", + }, + { + s.InSource(jobKey.SourceKey).Prefix(), + "storage/job/123/456/my-source/", + }, + { + s.InSink(jobKey.SinkKey).Prefix(), + "storage/job/123/456/my-source/my-sink/", + }, + { + s.ByKey(jobKey).Key(), + "storage/job/123/456/my-source/my-sink/321", + }, + } + + for i, c := range cases { + assert.Equal(t, c.expected, c.actual, fmt.Sprintf(`case "%d"`, i+1)) + } +} + +func TestSinkSchemaInState_In(t *testing.T) { + t.Parallel() + s := New(serde.NewJSON(serde.NoValidation)) + assert.Panics(t, func() { + s.In("unexpected type") + }) +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/repository.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/repository.go new file mode 100644 index 0000000000..38047a807d --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/repository.go @@ -0,0 +1,35 @@ +package repository + +import ( + etcd "go.etcd.io/etcd/client/v3" + + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde" + definitionRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/repository" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/job" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level" +) + +type dependencies interface { + EtcdClient() *etcd.Client + EtcdSerde() *serde.Serde + Plugins() *plugin.Plugins + DefinitionRepository() *definitionRepo.Repository +} + +// Repository provides database operations with the storage entities. +type Repository struct { + job *job.Repository +} + +func New(cfg level.Config, d dependencies) (*Repository, error) { + r := &Repository{} + + r.job = job.NewRepository(d) + + return r, nil +} + +func (r *Repository) Job() *job.Repository { + return r.job +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/schema/file.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/file.go similarity index 100% rename from internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/schema/file.go rename to internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/file.go diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/schema/file_test.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/file_test.go similarity index 100% rename from internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/schema/file_test.go rename to internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/file_test.go diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/schema/schema.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/schema.go similarity index 100% rename from internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/schema/schema.go rename to internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/schema.go diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/schema/token.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/token.go similarity index 100% rename from internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/schema/token.go rename to internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/token.go diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/schema/token_test.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/token_test.go similarity index 100% rename from internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/schema/token_test.go rename to internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/token_test.go diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/test/job.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/test/job.go new file mode 100644 index 0000000000..4d047838d1 --- /dev/null +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/test/job.go @@ -0,0 +1,77 @@ +package test + +import ( + "net/http" + "strconv" + "testing" + + "github.com/jarcoal/httpmock" + "github.com/keboola/go-client/pkg/keboola" + "go.uber.org/atomic" +) + +func MockImportAsyncAPICalls(tb testing.TB, transport *httpmock.MockTransport) { + tb.Helper() + + jobID := atomic.NewInt32(321) + jobStr := strconv.FormatInt(int64(jobID.Load()), 10) + // Mocked import async resource endpoint + transport.RegisterResponder( + http.MethodPost, + `=~/v2/storage/branch/[0-9]+/tables/in\.c-bucket\.my-table/import-async`, + func(request *http.Request) (*http.Response, error) { + return httpmock.NewJsonResponse(http.StatusOK, &keboola.StorageJob{ + StorageJobKey: keboola.StorageJobKey{ + ID: keboola.StorageJobID(jobID.Load()), + }, + Status: "processing", + URL: "https://connection.keboola.com/v2/storage/jobs/" + jobStr, + OperationName: "importFile", + }) + }, + ) +} + +func MockProcessingJobStorageAPICalls(tb testing.TB, transport *httpmock.MockTransport) { + tb.Helper() + + jobID := atomic.NewInt32(321) + jobStr := strconv.FormatInt(int64(jobID.Load()), 10) + // Mocked job inspection resource endpoint + transport.RegisterResponder( + http.MethodGet, + `=~/v2/storage/jobs/`+jobStr, + func(request *http.Request) (*http.Response, error) { + return httpmock.NewJsonResponse(http.StatusOK, &keboola.StorageJob{ + StorageJobKey: keboola.StorageJobKey{ + ID: keboola.StorageJobID(jobID.Load()), + }, + Status: "processing", + URL: "https://connection.keboola.com/v2/storage/jobs/" + jobStr, + OperationName: "importFile", + }) + }, + ) +} + +func MockSuccessJobStorageAPICalls(tb testing.TB, transport *httpmock.MockTransport) { + tb.Helper() + + jobID := atomic.NewInt32(321) + jobStr := strconv.FormatInt(int64(jobID.Load()), 10) + // Mocked file prepare resource endpoint + transport.RegisterResponder( + http.MethodGet, + `=~/v2/storage/jobs/`+jobStr, + func(request *http.Request) (*http.Response, error) { + return httpmock.NewJsonResponse(http.StatusOK, &keboola.StorageJob{ + StorageJobKey: keboola.StorageJobKey{ + ID: keboola.StorageJobID(jobID.Load()), + }, + Status: "success", + URL: "https://connection.keboola.com/v2/storage/jobs/" + jobStr, + OperationName: "importFile", + }) + }, + ) +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/test/table.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/test/table.go index 27e582e378..2b87e18a6b 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/test/table.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/test/table.go @@ -110,7 +110,7 @@ func MockTableStorageAPICalls(tb testing.TB, transport *httpmock.MockTransport) // Create table job - ok transport.RegisterResponder( http.MethodGet, - `=~/v2/storage/jobs/.+$`, + `=~/v2/storage/jobs/1.*`, func(request *http.Request) (*http.Response, error) { lock.Lock() defer lock.Unlock() diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/config.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/config.go index 08b08d6c5a..28e1254c8d 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/config.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/config.go @@ -8,10 +8,12 @@ import ( type Config struct { // EventSendTimeout is a timeout to perform slice upload event or file import event. EventSendTimeout time.Duration `configKey:"eventSendTimeout" configUsage:"Timeout to perform upload send event of slice or import event of file"` + JobLimit int `configKey:"jobLimit" configUsage:"Number of import jobs running in keboola for single sink"` } func NewConfig() Config { return Config{ EventSendTimeout: 30 * time.Second, + JobLimit: 2, } } diff --git a/internal/pkg/service/stream/storage/metacleanup/config.go b/internal/pkg/service/stream/storage/metacleanup/config.go index 313b24020f..abddb8fb0e 100644 --- a/internal/pkg/service/stream/storage/metacleanup/config.go +++ b/internal/pkg/service/stream/storage/metacleanup/config.go @@ -4,7 +4,7 @@ import "time" type Config struct { Enabled bool `configKey:"enabled" configUsage:"Enable local storage metadata cleanup."` - Interval time.Duration `configKey:"interval" configUsage:"Cleanup interval." validate:"required,minDuration=5m,maxDuration=24h"` + Interval time.Duration `configKey:"interval" configUsage:"Cleanup interval." validate:"required,minDuration=30s,maxDuration=24h"` Concurrency int `configKey:"concurrency" configUsage:"How many files are deleted in parallel." validate:"required,min=1,max=500"` ActiveFileExpiration time.Duration `configKey:"activeFileExpiration" configUsage:"Expiration interval of a file that has not yet been imported." validate:"required,minDuration=1h,maxDuration=720h,gtefield=ArchivedFileExpiration"` // maxDuration=30 days ArchivedFileExpiration time.Duration `configKey:"archivedFileExpiration" configUsage:"Expiration interval of a file that has already been imported." validate:"required,minDuration=15m,maxDuration=720h"` // maxDuration=30 days @@ -13,7 +13,7 @@ type Config struct { func NewConfig() Config { return Config{ Enabled: true, - Interval: 5 * time.Minute, + Interval: 30 * time.Second, Concurrency: 50, ActiveFileExpiration: 7 * 24 * time.Hour, // 7 days ArchivedFileExpiration: 6 * time.Hour, diff --git a/internal/pkg/service/stream/storage/metacleanup/metacleanup.go b/internal/pkg/service/stream/storage/metacleanup/metacleanup.go index 08193ff782..35180e25ee 100644 --- a/internal/pkg/service/stream/storage/metacleanup/metacleanup.go +++ b/internal/pkg/service/stream/storage/metacleanup/metacleanup.go @@ -8,6 +8,7 @@ import ( "time" "github.com/benbjohnson/clock" + "github.com/keboola/go-client/pkg/keboola" etcd "go.etcd.io/etcd/client/v3" "go.opentelemetry.io/otel/attribute" "go.uber.org/atomic" @@ -19,6 +20,9 @@ import ( "github.com/keboola/keboola-as-code/internal/pkg/service/common/distribution" "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/iterator" "github.com/keboola/keboola-as-code/internal/pkg/service/common/servicectx" + keboolaSinkBridge "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge" + keboolaBridgeModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" + keboolaBridgeRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model" storageRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model/repository" "github.com/keboola/keboola-as-code/internal/pkg/telemetry" @@ -31,29 +35,38 @@ type dependencies interface { Telemetry() telemetry.Telemetry Process() *servicectx.Process EtcdClient() *etcd.Client + KeboolaSinkBridge() *keboolaSinkBridge.Bridge + KeboolaPublicAPI() *keboola.PublicAPI DistributionNode() *distribution.Node DistributedLockProvider() *distlock.Provider StorageRepository() *storageRepo.Repository + KeboolaBridgeRepository() *keboolaBridgeRepo.Repository } type Node struct { - config Config - clock clock.Clock - logger log.Logger - telemetry telemetry.Telemetry - dist *distribution.GroupNode - locks *distlock.Provider - storage *storageRepo.Repository + config Config + clock clock.Clock + logger log.Logger + telemetry telemetry.Telemetry + bridge *keboolaSinkBridge.Bridge + dist *distribution.GroupNode + publicAPI *keboola.PublicAPI + locks *distlock.Provider + storageRepository *storageRepo.Repository + keboolaBridgeRepository *keboolaBridgeRepo.Repository } func Start(d dependencies, cfg Config) error { n := &Node{ - config: cfg, - clock: d.Clock(), - logger: d.Logger().WithComponent("storage.metadata.cleanup"), - telemetry: d.Telemetry(), - locks: d.DistributedLockProvider(), - storage: d.StorageRepository(), + config: cfg, + clock: d.Clock(), + logger: d.Logger().WithComponent("storage.metadata.cleanup"), + telemetry: d.Telemetry(), + locks: d.DistributedLockProvider(), + bridge: d.KeboolaSinkBridge(), + publicAPI: d.KeboolaPublicAPI(), + storageRepository: d.StorageRepository(), + keboolaBridgeRepository: d.KeboolaBridgeRepository(), } if dist, err := d.DistributionNode().Group("storage.metadata.cleanup"); err == nil { @@ -112,27 +125,35 @@ func (n *Node) cleanMetadata(ctx context.Context) (err error) { defer span.End(&err) // Measure count of deleted files - counter := atomic.NewInt64(0) + fileCounter := atomic.NewInt64(0) defer func() { - count := counter.Load() + count := fileCounter.Load() span.SetAttributes(attribute.Int64("deletedFilesCount", count)) n.logger.With(attribute.Int64("deletedFilesCount", count)).Info(ctx, `deleted "" files`) }() + // Measure count of deleted storage jobs + jobCounter := atomic.NewInt64(0) + defer func() { + count := jobCounter.Load() + span.SetAttributes(attribute.Int64("deletedJobsCount", count)) + n.logger.With(attribute.Int64("deletedJobsCount", count)).Info(ctx, `deleted "" jobs`) + }() + // Delete files in parallel, but with limit n.logger.Info(ctx, `deleting metadata of expired files`) grp, ctx := errgroup.WithContext(ctx) grp.SetLimit(n.config.Concurrency) // Iterate all files - err = n.storage. + err = n.storageRepository. File(). ListAll(). ForEach(func(file model.File, _ *iterator.Header) error { grp.Go(func() error { err, deleted := n.cleanFile(ctx, file) if deleted { - counter.Add(1) + fileCounter.Add(1) } return err }) @@ -145,6 +166,42 @@ func (n *Node) cleanMetadata(ctx context.Context) (err error) { return err } + n.logger.Info(ctx, `deleting metadata of success jobs`) + // Iterate all storage jobs + err = n.keboolaBridgeRepository. + Job(). + ListAll(). + ForEach(func(job keboolaBridgeModel.Job, _ *iterator.Header) error { + grp.Go(func() error { + // There can be several cleanup nodes, each node processes an own part. + if !n.dist.MustCheckIsOwner(job.ProjectID.String()) { + return nil + } + + // Log/trace job details + attrs := job.Telemetry() + ctx = ctxattr.ContextWith(ctx, attrs...) + + // Trace each job + ctx, span := n.telemetry.Tracer().Start(ctx, "keboola.go.stream.model.cleanup.metadata.cleanJob") + + err, deleted := n.bridge.CleanJob(ctx, job) + if deleted { + jobCounter.Add(1) + } + + span.End(&err) + return err + }) + + return nil + }). + Do(ctx). + Err() + if err != nil { + return err + } + // Handle error group error return grp.Wait() } @@ -183,7 +240,7 @@ func (n *Node) cleanFile(ctx context.Context, file model.File) (err error, delet }() // Delete the file - if err = n.storage.File().Delete(file.FileKey, n.clock.Now()).RequireLock(mutex).Do(ctx).Err(); err != nil { + if err = n.storageRepository.File().Delete(file.FileKey, n.clock.Now()).RequireLock(mutex).Do(ctx).Err(); err != nil { err = errors.PrefixErrorf(err, `cannot delete expired file "%s"`, file.FileKey) n.logger.Error(ctx, err.Error()) return err, false diff --git a/internal/pkg/service/stream/storage/metacleanup/metacleanup_test.go b/internal/pkg/service/stream/storage/metacleanup/metacleanup_test.go index 5a8223d616..1e43e6bbe5 100644 --- a/internal/pkg/service/stream/storage/metacleanup/metacleanup_test.go +++ b/internal/pkg/service/stream/storage/metacleanup/metacleanup_test.go @@ -12,16 +12,36 @@ import ( "go.etcd.io/etcd/client/v3/concurrency" commonDeps "github.com/keboola/keboola-as-code/internal/pkg/service/common/dependencies" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop" + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde" "github.com/keboola/keboola-as-code/internal/pkg/service/common/utctime" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies" + keboolaSink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola" + bridgeTest "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/test" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/metacleanup" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test" + bridgeEntity "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test/bridge" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test/dummy" "github.com/keboola/keboola-as-code/internal/pkg/utils/etcdhelper" ) +type ( + // token is an etcd prefix that stores all Keboola Storage API token entities. + testToken struct { + etcdop.PrefixT[keboolaSink.Token] + } +) + +func forToken(s *serde.Serde) testToken { + return testToken{PrefixT: etcdop.NewTypedPrefix[keboolaSink.Token]("storage/keboola/secret/token", s)} +} + +func (v testToken) ForSink(k key.SinkKey) etcdop.KeyT[keboolaSink.Token] { + return v.PrefixT.Key(k.String()) +} + func TestMetadataCleanup(t *testing.T) { t.Parallel() @@ -36,7 +56,8 @@ func TestMetadataCleanup(t *testing.T) { branchKey := key.BranchKey{ProjectID: projectID, BranchID: 456} sourceKey := key.SourceKey{BranchKey: branchKey, SourceID: "my-source"} sinkKey := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink"} - ignoredEtcdKeys := etcdhelper.WithIgnoredKeyPattern(`^definition/|storage/secret/|storage/volume/|storage/file/all/|storage/slice/all/|storage/stats/|runtime/`) + jobKey := key.JobKey{SinkKey: sinkKey, JobID: "321"} + ignoredEtcdKeys := etcdhelper.WithIgnoredKeyPattern(`^definition/|storage/secret/|storage/volume/|storage/file/all/|storage/slice/all/|storage/stats/|runtime/|storage/keboola/secret/token/`) // Get services d, mocked := dependencies.NewMockedCoordinatorScope(t, ctx, commonDeps.WithClock(clk)) @@ -58,6 +79,12 @@ func TestMetadataCleanup(t *testing.T) { cfg.ActiveFileExpiration = activeFileExpiration cfg.ArchivedFileExpiration = importedFileExpiration + // Register routes for receiving information about jobs + transport := mocked.MockedHTTPTransport() + { + bridgeTest.MockSuccessJobStorageAPICalls(t, transport) + } + // Start metadata cleanup node // ----------------------------------------------------------------------------------------------------------------- require.NoError(t, metacleanup.Start(d, cfg)) @@ -86,7 +113,7 @@ func TestMetadataCleanup(t *testing.T) { test.RegisterWriterVolumes(t, ctx, volumeRepo, session, 1) } - // Create parent branch, source, sink, file + // Create parent branch, source, sink, token // ----------------------------------------------------------------------------------------------------------------- { branch := test.NewBranch(branchKey) @@ -95,6 +122,8 @@ func TestMetadataCleanup(t *testing.T) { require.NoError(t, defRepo.Source().Create(&source, clk.Now(), by, "Create source").Do(ctx).Err()) sink := dummy.NewSinkWithLocalStorage(sinkKey) require.NoError(t, defRepo.Sink().Create(&sink, clk.Now(), by, "Create sink").Do(ctx).Err()) + token := keboolaSink.Token{SinkKey: sinkKey, Token: keboola.Token{ID: "secret"}} + require.NoError(t, forToken(d.EtcdSerde()).ForSink(sinkKey).Put(client, token).Do(ctx).Err()) } // Create the second file @@ -194,8 +223,136 @@ func TestMetadataCleanup(t *testing.T) { {"level":"info","message":"deleted \"1\" files","deletedFilesCount":1} `) } + // Check database state + { + etcdhelper.AssertKeys(t, client, nil, ignoredEtcdKeys) + } + + // Create job in global level and bridge level + { + job := bridgeEntity.NewJob(jobKey) + require.NoError(t, d.KeboolaBridgeRepository().Job().Create(&job).Do(ctx).Err()) + } + + // Delete success job as it has finished + jobCleanupAttempt := 0 + { + logger.Truncate() + startTime := clk.Now() + jobCleanupAttempt++ + clk.Set(startTime.Add(time.Duration(jobCleanupAttempt) * cleanupInterval)) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + logger.AssertJSONMessages(c, `{"level":"info","message":"deleted \"%d\" jobs"}`) + }, 2*time.Second, 100*time.Millisecond) + logger.AssertJSONMessages(t, ` +{"level":"info","message":"deleting metadata of success jobs"} +{"level":"info","message":"deleted finished storage job"} +{"level":"info","message":"deleted \"1\" jobs","deletedJobsCount":1} +`) + } + // Check database state { - // Check database state etcdhelper.AssertKeys(t, client, nil, ignoredEtcdKeys) } } + +func TestMetadataProcessingJobCleanup(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + clk := clock.NewMock() + clk.Set(utctime.MustParse("2000-01-01T00:00:00.000Z").Time()) + by := test.ByUser() + + // Fixtures + projectID := keboola.ProjectID(123) + branchKey := key.BranchKey{ProjectID: projectID, BranchID: 456} + sourceKey := key.SourceKey{BranchKey: branchKey, SourceID: "my-source"} + sinkKey := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink"} + jobKey := key.JobKey{SinkKey: sinkKey, JobID: "321"} + ignoredEtcdKeys := etcdhelper.WithIgnoredKeyPattern(`^definition/|storage/secret/|storage/volume/|storage/file/all/|storage/file/level/local/|storage/slice/all/|storage/slice/level/local/|storage/stats/|runtime/|storage/keboola/secret/token/`) + + // Get services + d, mocked := dependencies.NewMockedCoordinatorScope(t, ctx, commonDeps.WithClock(clk)) + + logger := mocked.DebugLogger() + client := mocked.TestEtcdClient() + defRepo := d.DefinitionRepository() + storageRepo := d.StorageRepository() + volumeRepo := storageRepo.Volume() + + // Setup cleanup interval + cleanupInterval := 12 * time.Hour + importedFileExpiration := time.Hour // the first call of the doCleanup triggers it + activeFileExpiration := 30 * time.Hour // the third call of the doCleanup triggers it + cfg := metacleanup.NewConfig() + cfg.Interval = cleanupInterval + cfg.ActiveFileExpiration = activeFileExpiration + cfg.ArchivedFileExpiration = importedFileExpiration + + // Register routes for receiving information about jobs + transport := mocked.MockedHTTPTransport() + { + bridgeTest.MockProcessingJobStorageAPICalls(t, transport) + } + + // Start metadata cleanup node + // ----------------------------------------------------------------------------------------------------------------- + require.NoError(t, metacleanup.Start(d, cfg)) + + // Register active volumes + // ----------------------------------------------------------------------------------------------------------------- + { + session, err := concurrency.NewSession(client) + require.NoError(t, err) + defer func() { require.NoError(t, session.Close()) }() + test.RegisterWriterVolumes(t, ctx, volumeRepo, session, 1) + } + + // Create parent branch, source, sink, token + // ----------------------------------------------------------------------------------------------------------------- + { + branch := test.NewBranch(branchKey) + require.NoError(t, defRepo.Branch().Create(&branch, clk.Now(), by).Do(ctx).Err()) + source := test.NewSource(sourceKey) + require.NoError(t, defRepo.Source().Create(&source, clk.Now(), by, "Create source").Do(ctx).Err()) + sink := dummy.NewSinkWithLocalStorage(sinkKey) + require.NoError(t, defRepo.Sink().Create(&sink, clk.Now(), by, "Create sink").Do(ctx).Err()) + token := keboolaSink.Token{SinkKey: sinkKey, Token: keboola.Token{ID: "secret"}} + require.NoError(t, forToken(d.EtcdSerde()).ForSink(sinkKey).Put(client, token).Do(ctx).Err()) + } + + // Create job in global level and bridge level + { + job := bridgeEntity.NewJob(jobKey) + require.NoError(t, d.KeboolaBridgeRepository().Job().Create(&job).Do(ctx).Err()) + } + + // Delete processing job as it has finished + jobCleanupAttempt := 0 + { + logger.Truncate() + startTime := clk.Now() + jobCleanupAttempt++ + clk.Set(startTime.Add(time.Duration(jobCleanupAttempt) * cleanupInterval)) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + logger.AssertJSONMessages(c, `{"level":"info","message":"deleted \"%d\" jobs"}`) + }, 2*time.Second, 100*time.Millisecond) + logger.AssertJSONMessages(t, ` +{"level":"info","message":"deleting metadata of success jobs"} +{"level":"debug","message":"cannot remove storage job, job status: processing"} +{"level":"info","message":"deleted \"0\" jobs","deletedJobsCount":0} +`) + } + // Check database state + { + etcdhelper.AssertKeys( + t, + client, + []string{ + "storage/job/123/456/my-source/my-sink/321", + }, + ignoredEtcdKeys) + } +} diff --git a/internal/pkg/service/stream/storage/node/coordinator/coordinator.go b/internal/pkg/service/stream/storage/node/coordinator/coordinator.go index cfdbec7c75..65785c4253 100644 --- a/internal/pkg/service/stream/storage/node/coordinator/coordinator.go +++ b/internal/pkg/service/stream/storage/node/coordinator/coordinator.go @@ -17,6 +17,11 @@ import ( func Start(ctx context.Context, d dependencies.CoordinatorScope, cfg config.Config) error { logger := d.Logger().WithComponent("storage.node.coordinator") logger.Info(ctx, `starting storage coordinator node`) + // Change default bridge behaviour to use Mirror as it is required for sink throttling + err := d.KeboolaSinkBridge().MirrorJobs(ctx, d) + if err != nil { + return err + } if err := filerotation.Start(d, cfg.Storage.Level.Target.Operator); err != nil { return err diff --git a/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator.go b/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator.go index 9a072b9841..81261a27bd 100644 --- a/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator.go +++ b/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator.go @@ -24,7 +24,9 @@ import ( "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" definitionRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/repository" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin" targetConfig "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/target/config" + targetModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/target/model" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model" storageRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model/repository" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/node" @@ -46,6 +48,7 @@ type operator struct { distribution *distribution.GroupNode locks *distlock.Provider telemetry telemetry.Telemetry + plugins *plugin.Plugins files *etcdop.MirrorMap[model.File, model.FileKey, *fileData] sinks *etcdop.MirrorMap[definition.Sink, key.SinkKey, *sinkData] @@ -62,6 +65,7 @@ type fileData struct { FileKey model.FileKey State model.FileState Expiration utctime.UTCTime + Provider targetModel.Provider ImportConfig targetConfig.ImportConfig Retry model.Retryable ModRevision int64 @@ -89,6 +93,7 @@ type dependencies interface { StatisticsL1Cache() *statsCache.L1 DistributionNode() *distribution.Node DistributedLockProvider() *distlock.Provider + Plugins() *plugin.Plugins Telemetry() telemetry.Telemetry } @@ -102,6 +107,7 @@ func Start(d dependencies, config targetConfig.OperatorConfig) error { statisticsCache: d.StatisticsL1Cache(), locks: d.DistributedLockProvider(), telemetry: d.Telemetry(), + plugins: d.Plugins(), metrics: node.NewMetrics(d.Telemetry().Meter()), } @@ -138,6 +144,7 @@ func Start(d dependencies, config targetConfig.OperatorConfig) error { FileKey: file.FileKey, State: file.State, Expiration: file.StagingStorage.Expiration, + Provider: file.TargetStorage.Provider, ImportConfig: file.TargetStorage.Import, Retry: file.Retryable, ModRevision: rawValue.ModRevision, @@ -351,6 +358,12 @@ func (o *operator) rotateFile(ctx context.Context, file *fileData) { return } + // Skip filerotation if target provider is throttled + if !o.plugins.CanAcceptNewFile(ctx, file.Provider, file.FileKey.SinkKey) { + o.logger.Warnf(ctx, "skipping file rotation: sink is throttled") + return + } + // Log cause o.logger.Infof(ctx, "rotating file, import conditions met: %s", result.Cause()) diff --git a/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator_test.go b/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator_test.go index 7f9ee68273..1d4a2c397f 100644 --- a/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator_test.go +++ b/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator_test.go @@ -45,7 +45,7 @@ func TestFileRotation(t *testing.T) { // Trigger check - no import trigger ts.triggerCheck(t, false, ` -{"level":"debug","message":"checking files import conditions","component":"storage.node.operator.file.rotation"} +{"level":"debug","message":"checking files import conditions","component":"storage.node.operator.file.rotation"} {"level":"debug","message":"skipping file rotation: no record","component":"storage.node.operator.file.rotation"} `) diff --git a/internal/pkg/service/stream/storage/test/bridge/job.go b/internal/pkg/service/stream/storage/test/bridge/job.go new file mode 100644 index 0000000000..d1b77c2022 --- /dev/null +++ b/internal/pkg/service/stream/storage/test/bridge/job.go @@ -0,0 +1,26 @@ +package job + +import ( + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" + keboolaSink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test" +) + +func NewJobKey() key.JobKey { + return key.JobKey{ + SinkKey: test.NewSinkKey(), + JobID: "1111", + } +} + +func NewTestJob() keboolaSink.Job { + return keboolaSink.Job{ + JobKey: NewJobKey(), + } +} + +func NewJob(k key.JobKey) keboolaSink.Job { + return keboolaSink.Job{ + JobKey: k, + } +} diff --git a/provisioning/stream/kubernetes/benchmark.sh b/provisioning/stream/kubernetes/benchmark.sh index 6e3b1b1816..b2e7cb710e 100644 --- a/provisioning/stream/kubernetes/benchmark.sh +++ b/provisioning/stream/kubernetes/benchmark.sh @@ -1,5 +1,4 @@ -export STACK_NAME=dev-keboola-gcp-us-central1 -export KEBOOLA_STACK=connection.us-central1.gcp.keboola.dev +export KEBOOLA_STACK="${KEBOOLA_STACK:=dev-keboola-gcp-us-central1}" export API_TOKEN=$STORAGE_API_TOKEN export API_TOKEN_BASE64=$(echo -n $API_TOKEN | base64) diff --git a/provisioning/stream/kubernetes/templates/benchmark/job.yaml b/provisioning/stream/kubernetes/templates/benchmark/job.yaml index d43fabe24f..857d69c67e 100644 --- a/provisioning/stream/kubernetes/templates/benchmark/job.yaml +++ b/provisioning/stream/kubernetes/templates/benchmark/job.yaml @@ -14,7 +14,7 @@ spec: labels: app: load-test stream-etcd-client: "true" - tags.datadoghq.com/env: "$STACK_NAME" + tags.datadoghq.com/env: "$KEBOOLA_STACK" tags.datadoghq.com/service: "load-test" tags.datadoghq.com/version: "stream-benchmark" annotations: diff --git a/scripts/k6/stream-api/api.js b/scripts/k6/stream-api/api.js index 18176d6a44..080609dcfc 100644 --- a/scripts/k6/stream-api/api.js +++ b/scripts/k6/stream-api/api.js @@ -102,7 +102,7 @@ export class Api { } awaitTask(taskUrl) { - const taskTimeout = 60 + const taskTimeout = 180 for (let retries = taskTimeout; retries > 0; retries--) { let res = http.get(taskUrl, { headers: this.headers, responseType: "text" }) if (res.status !== 200) { diff --git a/scripts/k6/stream-api/main.js b/scripts/k6/stream-api/main.js index 1f8decdc28..883d2f96e3 100644 --- a/scripts/k6/stream-api/main.js +++ b/scripts/k6/stream-api/main.js @@ -108,8 +108,9 @@ const mappings = { // K6 options export const options = { systemTags: ['status', 'group'], + setupTimeout: '240s', discardResponseBodies: true, // we are checking only status codes - teardownTimeout: '120s', scenarios: { + teardownTimeout: '180s', scenarios: { [SCENARIO]: scenarios[SCENARIO] }, // Improve results summary diff --git a/test/stream/bridge/keboola/keboola_test.go b/test/stream/bridge/keboola/keboola_test.go index 8d07059819..ca005199d1 100644 --- a/test/stream/bridge/keboola/keboola_test.go +++ b/test/stream/bridge/keboola/keboola_test.go @@ -34,9 +34,10 @@ func TestKeboolaBridgeWorkflow(t *testing.T) { // Update configuration to make the cluster testable configFn := func(cfg *config.Config) { + // Enable metadata cleanup for removing storage jobs + cfg.Storage.MetadataCleanup.Enabled = true // Disable unrelated workers cfg.Storage.DiskCleanup.Enabled = false - cfg.Storage.MetadataCleanup.Enabled = false cfg.API.Task.CleanupEnabled = false // Use deterministic load balancer @@ -52,6 +53,9 @@ func TestKeboolaBridgeWorkflow(t *testing.T) { }, } + // In the test, we trigger the file import only when sink limit is not reached. + cfg.Sink.Table.Keboola.JobLimit = 1 + // In the test, we trigger the file import via the records count, the other values are intentionally high. cfg.Storage.Level.Target.Import = targetConfig.ImportConfig{ MinInterval: duration.From(30 * time.Second), // minimum @@ -63,6 +67,9 @@ func TestKeboolaBridgeWorkflow(t *testing.T) { Expiration: duration.From(30 * time.Minute), }, } + + // Cleanup should be perfomed more frequently to remove already finished storage jobs + cfg.Storage.MetadataCleanup.Interval = 10 * time.Second } ts := setup(t, ctx, configFn) @@ -437,6 +444,11 @@ func (ts *testState) testFileImport(t *testing.T, ctx context.Context, expectati `) }, 60*time.Second, 100*time.Millisecond) + assert.EventuallyWithT(t, func(c *assert.CollectT) { + ts.logger.AssertJSONMessages(c, ` +{"level":"info","message":"deleted \"1\" jobs","deletedJobsCount":1,"component":"storage.metadata.cleanup"} + `) + }, 20*time.Second, 100*time.Millisecond) // Check file/slices state after the upload files := ts.checkState(t, ctx, expectations.expectedFiles)