Skip to content

Commit

Permalink
fix: Change naming and move throttle under file package in plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Matovidlo committed Nov 19, 2024
1 parent 6cd186a commit 577e20d
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 43 deletions.
19 changes: 19 additions & 0 deletions internal/pkg/service/stream/plugin/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
targetModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/target/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/statistics"
Expand All @@ -17,16 +18,34 @@ type File struct {
Provider targetModel.Provider
}

type CanAcceptNewFileRotation struct {
SinkKey key.SinkKey
Provider targetModel.Provider
}

type importFileFn func(ctx context.Context, file File, stats statistics.Value) error
type canAcceptNewFileRotationFn func(ctx context.Context, sinkKey key.SinkKey) bool

func (p *Plugins) RegisterFileImporter(provider targetModel.Provider, fn importFileFn) {
p.fileImport[provider] = fn
}

func (p *Plugins) RegisterCanAcceptNewFileRotation(provider targetModel.Provider, fn canAcceptNewFileRotationFn) {
p.canAcceptNewFileRotation[provider] = fn
}

func (p *Plugins) ImportFile(ctx context.Context, file File, stats statistics.Value) error {
if _, ok := p.fileImport[file.Provider]; !ok {
return errors.New(fmt.Sprintf("no importer for given provider: %v", file.Provider))
}

return p.fileImport[file.Provider](ctx, file, stats)
}

func (p *Plugins) CanAcceptNewFileRotation(ctx context.Context, provider targetModel.Provider, sinkKey key.SinkKey) bool {
if fn, ok := p.canAcceptNewFileRotation[provider]; ok {
return fn(ctx, sinkKey)
}

return false
}
24 changes: 12 additions & 12 deletions internal/pkg/service/stream/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
)

type Plugins struct {
collection *Collection
executor *Executor
localStorageSinks []func(sinkType definition.SinkType) bool
sinkPipelineOpeners []pipeline.Opener
sliceUploader map[stagingModel.FileProvider]uploadSliceFn
fileImport map[targetModel.Provider]importFileFn
throttle map[targetModel.Provider]throttleFn
collection *Collection
executor *Executor
localStorageSinks []func(sinkType definition.SinkType) bool
sinkPipelineOpeners []pipeline.Opener
sliceUploader map[stagingModel.FileProvider]uploadSliceFn
fileImport map[targetModel.Provider]importFileFn
canAcceptNewFileRotation map[targetModel.Provider]canAcceptNewFileRotationFn
}

type fnList[T any] []T
Expand All @@ -26,11 +26,11 @@ func New(logger log.Logger) *Plugins {
c := &Collection{}
e := &Executor{logger: logger.WithComponent("plugin"), collection: c}
return &Plugins{
collection: c,
executor: e,
sliceUploader: make(map[stagingModel.FileProvider]uploadSliceFn),
fileImport: make(map[targetModel.Provider]importFileFn),
throttle: make(map[targetModel.Provider]throttleFn),
collection: c,
executor: e,
sliceUploader: make(map[stagingModel.FileProvider]uploadSliceFn),
fileImport: make(map[targetModel.Provider]importFileFn),
canAcceptNewFileRotation: make(map[targetModel.Provider]canAcceptNewFileRotationFn),
}
}

Expand Down
27 changes: 0 additions & 27 deletions internal/pkg/service/stream/plugin/throttle.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func New(d dependencies, apiProvider apiProvider, config keboolasink.Config) *Br
b.deleteTokenOnSinkDeactivation()
b.plugins.RegisterFileImporter(targetProvider, b.importFile)
b.plugins.RegisterSliceUploader(stagingFileProvider, b.uploadSlice)
b.plugins.RegisterThrottle(targetProvider, b.isThrottled)
b.plugins.RegisterCanAcceptNewFileRotation(targetProvider, b.canAcceptNewFileRotation)
return b
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (b *Bridge) createJob(ctx context.Context, token string, file plugin.File,
return nil
}

func (b *Bridge) isThrottled(ctx context.Context, sinkKey key.SinkKey) bool {
func (b *Bridge) canAcceptNewFileRotation(ctx context.Context, sinkKey key.SinkKey) bool {
// Count running jobs only for given sink accessed by file.SinkKey
var runningJobs int
b.jobs.ForEach(func(jobKey key.JobKey, _ *jobData) (stop bool) {
Expand All @@ -52,5 +52,5 @@ func (b *Bridge) isThrottled(ctx context.Context, sinkKey key.SinkKey) bool {
return false
})

return runningJobs >= b.config.JobLimit
return runningJobs < b.config.JobLimit
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (o *operator) rotateFile(ctx context.Context, file *fileData) {
}

// Skip filerotation if target provider is throttled
if o.plugins.IsThrottled(ctx, file.Provider, file.FileKey.SinkKey) {
if !o.plugins.CanAcceptNewFileRotation(ctx, file.Provider, file.FileKey.SinkKey) {
o.logger.Warnf(ctx, "skipping file rotation: sink is throttled")
return
}
Expand Down

0 comments on commit 577e20d

Please sign in to comment.