diff --git a/internal/pkg/service/stream/plugin/file.go b/internal/pkg/service/stream/plugin/file.go index 84516d033e..e0619bd030 100644 --- a/internal/pkg/service/stream/plugin/file.go +++ b/internal/pkg/service/stream/plugin/file.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" targetModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/target/model" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/statistics" @@ -17,12 +18,22 @@ type File struct { Provider targetModel.Provider } +type CanAcceptNewFileRotation struct { + SinkKey key.SinkKey + Provider targetModel.Provider +} + type importFileFn func(ctx context.Context, file File, stats statistics.Value) error +type canAcceptNewFileRotationFn func(ctx context.Context, sinkKey key.SinkKey) bool func (p *Plugins) RegisterFileImporter(provider targetModel.Provider, fn importFileFn) { p.fileImport[provider] = fn } +func (p *Plugins) RegisterCanAcceptNewFileRotation(provider targetModel.Provider, fn canAcceptNewFileRotationFn) { + p.canAcceptNewFileRotation[provider] = fn +} + func (p *Plugins) ImportFile(ctx context.Context, file File, stats statistics.Value) error { if _, ok := p.fileImport[file.Provider]; !ok { return errors.New(fmt.Sprintf("no importer for given provider: %v", file.Provider)) @@ -30,3 +41,11 @@ func (p *Plugins) ImportFile(ctx context.Context, file File, stats statistics.Va return p.fileImport[file.Provider](ctx, file, stats) } + +func (p *Plugins) CanAcceptNewFileRotation(ctx context.Context, provider targetModel.Provider, sinkKey key.SinkKey) bool { + if fn, ok := p.canAcceptNewFileRotation[provider]; ok { + return fn(ctx, sinkKey) + } + + return false +} diff --git a/internal/pkg/service/stream/plugin/plugin.go b/internal/pkg/service/stream/plugin/plugin.go index 3fbca63258..2ae3f6b08d 100644 --- a/internal/pkg/service/stream/plugin/plugin.go +++ b/internal/pkg/service/stream/plugin/plugin.go @@ -11,13 +11,13 @@ import ( ) type Plugins struct { - collection *Collection - executor *Executor - localStorageSinks []func(sinkType definition.SinkType) bool - sinkPipelineOpeners []pipeline.Opener - sliceUploader map[stagingModel.FileProvider]uploadSliceFn - fileImport map[targetModel.Provider]importFileFn - throttle map[targetModel.Provider]throttleFn + collection *Collection + executor *Executor + localStorageSinks []func(sinkType definition.SinkType) bool + sinkPipelineOpeners []pipeline.Opener + sliceUploader map[stagingModel.FileProvider]uploadSliceFn + fileImport map[targetModel.Provider]importFileFn + canAcceptNewFileRotation map[targetModel.Provider]canAcceptNewFileRotationFn } type fnList[T any] []T @@ -26,11 +26,11 @@ func New(logger log.Logger) *Plugins { c := &Collection{} e := &Executor{logger: logger.WithComponent("plugin"), collection: c} return &Plugins{ - collection: c, - executor: e, - sliceUploader: make(map[stagingModel.FileProvider]uploadSliceFn), - fileImport: make(map[targetModel.Provider]importFileFn), - throttle: make(map[targetModel.Provider]throttleFn), + collection: c, + executor: e, + sliceUploader: make(map[stagingModel.FileProvider]uploadSliceFn), + fileImport: make(map[targetModel.Provider]importFileFn), + canAcceptNewFileRotation: make(map[targetModel.Provider]canAcceptNewFileRotationFn), } } diff --git a/internal/pkg/service/stream/plugin/throttle.go b/internal/pkg/service/stream/plugin/throttle.go deleted file mode 100644 index addceacbbd..0000000000 --- a/internal/pkg/service/stream/plugin/throttle.go +++ /dev/null @@ -1,27 +0,0 @@ -package plugin - -import ( - "context" - - "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" - targetModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/target/model" -) - -type throttleFn func(ctx context.Context, sinkKey key.SinkKey) bool - -type Throttle struct { - SinkKey key.SinkKey - Provider targetModel.Provider -} - -func (p *Plugins) RegisterThrottle(provider targetModel.Provider, fn throttleFn) { - p.throttle[provider] = fn -} - -func (p *Plugins) IsThrottled(ctx context.Context, provider targetModel.Provider, sinkKey key.SinkKey) bool { - if fn, ok := p.throttle[provider]; ok { - return fn(ctx, sinkKey) - } - - return false -} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/bridge.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/bridge.go index 48eb837839..ba49f40d1c 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/bridge.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/bridge.go @@ -93,7 +93,7 @@ func New(d dependencies, apiProvider apiProvider, config keboolasink.Config) *Br b.deleteTokenOnSinkDeactivation() b.plugins.RegisterFileImporter(targetProvider, b.importFile) b.plugins.RegisterSliceUploader(stagingFileProvider, b.uploadSlice) - b.plugins.RegisterThrottle(targetProvider, b.isThrottled) + b.plugins.RegisterCanAcceptNewFileRotation(targetProvider, b.canAcceptNewFileRotation) return b } diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/job.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/job.go index 1cf7215b78..78abe6b3de 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/job.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/job.go @@ -41,7 +41,7 @@ func (b *Bridge) createJob(ctx context.Context, token string, file plugin.File, return nil } -func (b *Bridge) isThrottled(ctx context.Context, sinkKey key.SinkKey) bool { +func (b *Bridge) canAcceptNewFileRotation(ctx context.Context, sinkKey key.SinkKey) bool { // Count running jobs only for given sink accessed by file.SinkKey var runningJobs int b.jobs.ForEach(func(jobKey key.JobKey, _ *jobData) (stop bool) { @@ -52,5 +52,5 @@ func (b *Bridge) isThrottled(ctx context.Context, sinkKey key.SinkKey) bool { return false }) - return runningJobs >= b.config.JobLimit + return runningJobs < b.config.JobLimit } diff --git a/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator.go b/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator.go index 1a8204fedb..cd7d18f332 100644 --- a/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator.go +++ b/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator.go @@ -359,7 +359,7 @@ func (o *operator) rotateFile(ctx context.Context, file *fileData) { } // Skip filerotation if target provider is throttled - if o.plugins.IsThrottled(ctx, file.Provider, file.FileKey.SinkKey) { + if !o.plugins.CanAcceptNewFileRotation(ctx, file.Provider, file.FileKey.SinkKey) { o.logger.Warnf(ctx, "skipping file rotation: sink is throttled") return }