Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement storage job limiter #2123

Merged
merged 27 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8bf131a
fix: Add correct datadog tag env. New teardown and setup timeouts
Matovidlo Nov 4, 2024
1db958c
feat: Add configuration options for job and sink limit
Matovidlo Nov 8, 2024
0f04f52
feat: Add new plugin for sink delete
Matovidlo Nov 8, 2024
31f143e
feat: Add support for storage job repository
Matovidlo Nov 8, 2024
ad8eb9b
feat: Add support for keboola storage job
Matovidlo Nov 8, 2024
e2759c8
feat: Add support for deleting newly created jobs entities
Matovidlo Nov 8, 2024
658dad9
feat: Add new condition and mirroring of jobs
Matovidlo Nov 8, 2024
284d3d8
feat: Start metadata cleanup to unthrottle the sink
Matovidlo Nov 8, 2024
38959a4
fix: Remove sink repository, as it is not needed and present in deps
Matovidlo Nov 8, 2024
c009b9b
fix: Linter gci
Matovidlo Nov 8, 2024
2703814
fix: Add missing attribute in keboolafile
Matovidlo Nov 11, 2024
9fd9a6b
fix: Add new context attributes instead of logging whole object
Matovidlo Nov 11, 2024
b72a0f7
fix: Comment was copied from keboolasink
Matovidlo Nov 11, 2024
1fc049a
fix: Remove active prefix from job schema
Matovidlo Nov 11, 2024
8b406cd
fix: Change sinkLimit -> jobLimit
Matovidlo Nov 11, 2024
bacd1f0
fix: Add ListAll test, fix lint and config test
Matovidlo Nov 11, 2024
6116042
feat: Move job repository under keboola bridge
Matovidlo Nov 18, 2024
798bada
fix: Make configurable keboola bridge mirroring using exported function
Matovidlo Nov 18, 2024
d3a711d
fix: Linter issue
Matovidlo Nov 18, 2024
defdd2c
fix: Typo in comment
Matovidlo Nov 19, 2024
6cd186a
fix: Use context from Start of coordinator in bridge
Matovidlo Nov 19, 2024
0ef0afe
fix: Change naming and move throttle under file package in plugin
Matovidlo Nov 19, 2024
da8707a
fix: Add smaller interval to metadata cleanup so the jobs are cleared…
Matovidlo Nov 19, 2024
41c0053
feat: Remove token from job and use exported function on top of bridge
Matovidlo Nov 19, 2024
a376096
fix: Change default behaviour of CanAcceptNewFile. Returns by default…
Matovidlo Nov 19, 2024
0412bce
fix: Move cleanup of job under bridge module
Matovidlo Nov 20, 2024
665e97a
fix: Race condition on span with defer in errgrp
Matovidlo Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/pkg/service/stream/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ sink:
keboola:
# Timeout to perform upload send event of slice or import event of file
eventSendTimeout: 30s
# Number of import jobs running in keboola for single sink
jobLimit: 2
storage:
# Mounted volumes path, each volume is in "{type}/{label}" subdir. Validation rules: required
volumesPath: ""
Expand Down
31 changes: 31 additions & 0 deletions internal/pkg/service/stream/definition/key/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package key

import (
"go.opentelemetry.io/otel/attribute"

"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)

type JobID string

type JobKey struct {
SinkKey
JobID JobID `json:"jobId" validate:"required,min=1,max=48"`
}

func (v JobID) String() string {
if v == "" {
panic(errors.New("JobID cannot be empty"))
}
return string(v)
}

func (v JobKey) String() string {
return v.SinkKey.String() + "/" + v.JobID.String()
}

func (v JobKey) Telemetry() []attribute.KeyValue {
t := v.SinkKey.Telemetry()
t = append(t, attribute.String("job.id", v.JobID.String()))
return t
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (r *Repository) Create(input *definition.Branch, now time.Time, by definiti
created.SetCreation(now, by)
return r.save(ctx, now, by, nil, &created)
}).
// Update the input entity, it the operation is successful
// Update the input entity, if the operation is successful
OnResult(func(entity definition.Branch) {
*input = entity
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (r *Repository) Create(input *definition.Sink, now time.Time, by definition
created.IncrementVersion(created, now, by, versionDescription)
return r.save(ctx, now, by, nil, &created)
}).
// Update the input entity, it the operation is successful
// Update the input entity, if the operation is successful
OnResult(func(entity definition.Sink) {
*input = entity
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (r *Repository) Create(input *definition.Source, now time.Time, by definiti
created.IncrementVersion(created, now, by, versionDescription)
return r.save(ctx, now, by, nil, &created)
}).
// Update the input entity, it the operation is successful
// Update the input entity, if the operation is successful
OnResult(func(result definition.Source) {
*input = result
})
Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/service/stream/dependencies/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin"
sinkRouter "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/router"
keboolaSinkBridge "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge"
keboolaBridgeRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskreader"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection"
Expand Down Expand Up @@ -85,6 +86,7 @@ type ServiceScope interface {
StatisticsRepository() *statsRepo.Repository
AggregationRepository() *aggregationRepo.Repository
KeboolaSinkBridge() *keboolaSinkBridge.Bridge
KeboolaBridgeRepository() *keboolaBridgeRepo.Repository
}

type APIScope interface {
Expand Down
11 changes: 11 additions & 0 deletions internal/pkg/service/stream/dependencies/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
definitionRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/repository"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin"
keboolaSinkBridge "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge"
keboolaBridgeRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
storageRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model/repository"
statsRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/statistics/repository"
Expand All @@ -46,6 +47,7 @@ type serviceScope struct {
storageStatisticsRepository *statsRepo.Repository
aggregationRepository *aggregationRepo.Repository
keboolaBridge *keboolaSinkBridge.Bridge
keboolaBridgeRepository *keboolaBridgeRepo.Repository
}

type parentScopes struct {
Expand Down Expand Up @@ -200,6 +202,11 @@ func newServiceScope(baseScp dependencies.BaseScope, publicScp dependencies.Publ
return api
}

d.keboolaBridgeRepository, err = keboolaBridgeRepo.New(cfg.Storage.Level, d)
if err != nil {
return nil, err
}

d.keboolaBridge = keboolaSinkBridge.New(d, apiCtxProvider, cfg.Sink.Table.Keboola)

d.storageStatisticsRepository = statsRepo.New(d)
Expand All @@ -225,6 +232,10 @@ func (v *serviceScope) KeboolaSinkBridge() *keboolaSinkBridge.Bridge {
return v.keboolaBridge
}

func (v *serviceScope) KeboolaBridgeRepository() *keboolaBridgeRepo.Repository {
return v.keboolaBridgeRepository
}

func (v *serviceScope) StorageRepository() *storageRepo.Repository {
return v.storageRepository
}
Expand Down
9 changes: 9 additions & 0 deletions internal/pkg/service/stream/plugin/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,15 @@ func (c *Collection) OnSinkModification(fn onSinkSaveFn) {
})
}

func (c *Collection) OnSinkDelete(fn onSinkSaveFn) {
Matovidlo marked this conversation as resolved.
Show resolved Hide resolved
c.onSinkSave = append(c.onSinkSave, func(ctx context.Context, now time.Time, by definition.By, original, sink *definition.Sink) error {
if isDeletedNow(original, sink) {
return fn(ctx, now, by, original, sink)
}
return nil
})
}

func (c *Collection) OnFileOpen(fn onFileOpenFn) {
c.onFileOpen = append(c.onFileOpen, fn)
}
Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/service/stream/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Plugins struct {
sinkPipelineOpeners []pipeline.Opener
sliceUploader map[stagingModel.FileProvider]uploadSliceFn
fileImport map[targetModel.Provider]importFileFn
throttle map[targetModel.Provider]throttleFn
}

type fnList[T any] []T
Expand All @@ -29,6 +30,7 @@ func New(logger log.Logger) *Plugins {
executor: e,
sliceUploader: make(map[stagingModel.FileProvider]uploadSliceFn),
fileImport: make(map[targetModel.Provider]importFileFn),
throttle: make(map[targetModel.Provider]throttleFn),
}
}

Expand Down
27 changes: 27 additions & 0 deletions internal/pkg/service/stream/plugin/throttle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package plugin

import (
"context"

"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
targetModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/target/model"
)

type throttleFn func(ctx context.Context, sinkKey key.SinkKey) bool

type Throttle struct {
SinkKey key.SinkKey
Provider targetModel.Provider
}

func (p *Plugins) RegisterThrottle(provider targetModel.Provider, fn throttleFn) {
p.throttle[provider] = fn
}

func (p *Plugins) IsThrottled(ctx context.Context, provider targetModel.Provider, sinkKey key.SinkKey) bool {
jachym-tousek-keboola marked this conversation as resolved.
Show resolved Hide resolved
if fn, ok := p.throttle[provider]; ok {
return fn(ctx, sinkKey)
}

return false
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
package bridge

import (
"context"
"sync"

"github.com/keboola/go-client/pkg/keboola"
etcd "go.etcd.io/etcd/client/v3"
"golang.org/x/sync/singleflight"

"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/distlock"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/op"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/servicectx"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin"
keboolasink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/schema"
bridgeModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model"
keboolaBridgeRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema"
stagingModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/staging/model"
targetModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/target/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
Expand All @@ -31,51 +40,60 @@ const (
)

type Bridge struct {
logger log.Logger
config keboolasink.Config
client etcd.KV
schema schema.Schema
plugins *plugin.Plugins
publicAPI *keboola.PublicAPI
apiProvider apiProvider
storageRepository *storageRepo.Repository
locks *distlock.Provider
logger log.Logger
config keboolasink.Config
client etcd.KV
schema schema.Schema
plugins *plugin.Plugins
publicAPI *keboola.PublicAPI
apiProvider apiProvider
storageRepository *storageRepo.Repository
keboolaBridgeRepository *keboolaBridgeRepo.Repository
locks *distlock.Provider
jobs *etcdop.MirrorMap[bridgeModel.Job, key.JobKey, *jobData]

getBucketOnce *singleflight.Group
createBucketOnce *singleflight.Group
}

type jobData struct {
key.JobKey
}

type dependencies interface {
Logger() log.Logger
EtcdClient() *etcd.Client
EtcdSerde() *serde.Serde
Process() *servicectx.Process
Plugins() *plugin.Plugins
KeboolaPublicAPI() *keboola.PublicAPI
StorageRepository() *storageRepo.Repository
KeboolaBridgeRepository() *keboolaBridgeRepo.Repository
DistributedLockProvider() *distlock.Provider
}

func New(d dependencies, apiProvider apiProvider, config keboolasink.Config) *Bridge {
b := &Bridge{
logger: d.Logger().WithComponent("keboola.bridge"),
config: config,
client: d.EtcdClient(),
schema: schema.New(d.EtcdSerde()),
plugins: d.Plugins(),
publicAPI: d.KeboolaPublicAPI(),
apiProvider: apiProvider,
storageRepository: d.StorageRepository(),
locks: d.DistributedLockProvider(),
getBucketOnce: &singleflight.Group{},
createBucketOnce: &singleflight.Group{},
logger: d.Logger().WithComponent("keboola.bridge"),
config: config,
client: d.EtcdClient(),
schema: schema.New(d.EtcdSerde()),
plugins: d.Plugins(),
publicAPI: d.KeboolaPublicAPI(),
apiProvider: apiProvider,
storageRepository: d.StorageRepository(),
keboolaBridgeRepository: d.KeboolaBridgeRepository(),
locks: d.DistributedLockProvider(),
getBucketOnce: &singleflight.Group{},
createBucketOnce: &singleflight.Group{},
}

b.setupOnFileOpen()
b.deleteCredentialsOnFileDelete()
b.deleteTokenOnSinkDeactivation()
b.plugins.RegisterFileImporter(targetProvider, b.importFile)
b.plugins.RegisterSliceUploader(stagingFileProvider, b.uploadSlice)

b.plugins.RegisterThrottle(targetProvider, b.isThrottled)
return b
}

Expand All @@ -86,3 +104,35 @@ func (b *Bridge) isKeboolaTableSink(sink *definition.Sink) bool {
func (b *Bridge) isKeboolaStagingFile(file *model.File) bool {
return file.StagingStorage.Provider == stagingFileProvider
}

func (b *Bridge) MirrorJobs(ctx context.Context, d dependencies) error {
// Mirror jobs
wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(ctx)
jachym-tousek-keboola marked this conversation as resolved.
Show resolved Hide resolved
d.Process().OnShutdown(func(_ context.Context) {
b.logger.Info(ctx, "closing bridge job mirror")

// Stop mirroring
cancel()
wg.Wait()

b.logger.Info(ctx, "closed bridge job mirror")
})
b.jobs = etcdop.SetupMirrorMap[bridgeModel.Job, key.JobKey, *jobData](
b.keboolaBridgeRepository.Job().GetAllAndWatch(ctx, etcd.WithPrevKV()),
func(_ string, job bridgeModel.Job) key.JobKey {
return job.JobKey
},
func(_ string, job bridgeModel.Job, rawValue *op.KeyValue, oldValue **jobData) *jobData {
return &jobData{
job.JobKey,
}
},
).BuildMirror()
if err := <-b.jobs.StartMirroring(ctx, wg, b.logger); err != nil {
b.logger.Errorf(ctx, "cannot start mirroring jobs: %s", err)
return err
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestBridge_FullWorkflow(t *testing.T) {
bridgeTest.MockTokenStorageAPICalls(t, transport)
bridgeTest.MockBucketStorageAPICalls(t, transport)
bridgeTest.MockTableStorageAPICalls(t, transport)
bridgeTest.MockSuccessJobStorageAPICalls(t, transport)
bridgeTest.MockFileStorageAPICalls(t, clk, transport)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,18 @@ func (b *Bridge) importFile(ctx context.Context, file plugin.File, stats statist

// Save job ID to etcd
keboolaFile.StorageJobID = &job.ID
Matovidlo marked this conversation as resolved.
Show resolved Hide resolved

b.logger.With(attribute.String("job.id", keboolaFile.StorageJobID.String())).Infof(ctx, "created new storage job for file")

err = b.schema.File().ForFile(file.FileKey).Put(b.client, keboolaFile).Do(ctx).Err()
if err != nil {
return err
}

// Save job ID to etcd
err = b.createJob(ctx, token.TokenString(), file, job)
if err != nil {
return err
}

b.logger.Info(ctx, "created staging file")
}

// Wait for job to complete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func TestBridge_ImportFile_EmptyFile(t *testing.T) {
bridgeTest.MockTokenStorageAPICalls(t, transport)
bridgeTest.MockBucketStorageAPICalls(t, transport)
bridgeTest.MockTableStorageAPICalls(t, transport)
bridgeTest.MockProcessingJobStorageAPICalls(t, transport)
bridgeTest.MockFileStorageAPICalls(t, clk, transport)
}

Expand Down Expand Up @@ -105,7 +106,7 @@ func TestBridge_ImportFile_EmptyFile(t *testing.T) {
require.NoError(t, storageRepo.Slice().SwitchToUploading(slice.SliceKey, clk.Now(), true).Do(ctx).Err())
require.NoError(t, storageRepo.Slice().SwitchToUploaded(slice.SliceKey, clk.Now()).Do(ctx).Err())

// Switch file to the FileImporting state
// Switch empty file to the FileImporting state
clk.Add(time.Second)
require.NoError(t, storageRepo.File().SwitchToImporting(file.FileKey, clk.Now(), true).Do(ctx).Err())
}
Expand Down
Loading