Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement storage job limiter #2123

Merged
merged 27 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8bf131a
fix: Add correct datadog tag env. New teardown and setup timeouts
Matovidlo Nov 4, 2024
1db958c
feat: Add configuration options for job and sink limit
Matovidlo Nov 8, 2024
0f04f52
feat: Add new plugin for sink delete
Matovidlo Nov 8, 2024
31f143e
feat: Add support for storage job repository
Matovidlo Nov 8, 2024
ad8eb9b
feat: Add support for keboola storage job
Matovidlo Nov 8, 2024
e2759c8
feat: Add support for deleting newly created jobs entities
Matovidlo Nov 8, 2024
658dad9
feat: Add new condition and mirroring of jobs
Matovidlo Nov 8, 2024
284d3d8
feat: Start metadata cleanup to unthrottle the sink
Matovidlo Nov 8, 2024
38959a4
fix: Remove sink repository, as it is not needed and present in deps
Matovidlo Nov 8, 2024
c009b9b
fix: Linter gci
Matovidlo Nov 8, 2024
2703814
fix: Add missing attribute in keboolafile
Matovidlo Nov 11, 2024
9fd9a6b
fix: Add new context attributes instead of logging whole object
Matovidlo Nov 11, 2024
b72a0f7
fix: Comment was copied from keboolasink
Matovidlo Nov 11, 2024
1fc049a
fix: Remove active prefix from job schema
Matovidlo Nov 11, 2024
8b406cd
fix: Change sinkLimit -> jobLimit
Matovidlo Nov 11, 2024
bacd1f0
fix: Add ListAll test, fix lint and config test
Matovidlo Nov 11, 2024
6116042
feat: Move job repository under keboola bridge
Matovidlo Nov 18, 2024
798bada
fix: Make configurable keboola bridge mirroring using exported function
Matovidlo Nov 18, 2024
d3a711d
fix: Linter issue
Matovidlo Nov 18, 2024
defdd2c
fix: Typo in comment
Matovidlo Nov 19, 2024
6cd186a
fix: Use context from Start of coordinator in bridge
Matovidlo Nov 19, 2024
0ef0afe
fix: Change naming and move throttle under file package in plugin
Matovidlo Nov 19, 2024
da8707a
fix: Add smaller interval to metadata cleanup so the jobs are cleared…
Matovidlo Nov 19, 2024
41c0053
feat: Remove token from job and use exported function on top of bridge
Matovidlo Nov 19, 2024
a376096
fix: Change default behaviour of CanAcceptNewFile. Returns by default…
Matovidlo Nov 19, 2024
0412bce
fix: Move cleanup of job under bridge module
Matovidlo Nov 20, 2024
665e97a
fix: Race condition on span with defer in errgrp
Matovidlo Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/pkg/service/stream/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ sink:
keboola:
# Timeout to perform upload send event of slice or import event of file
eventSendTimeout: 30s
# Number of import jobs running in keboola for single sink
jobLimit: 2
storage:
# Mounted volumes path, each volume is in "{type}/{label}" subdir. Validation rules: required
volumesPath: ""
Expand Down Expand Up @@ -285,6 +287,8 @@ storage:
# Timeout of the file import operation. Validation rules: required,minDuration=30s,maxDuration=60m
fileImportTimeout: 15m0s
import:
# Specifies limit of sink as number of jobs that it cannot exceed. In case it is reached, the sink is throttled and import cannot be performed unless Trigger conditions were met
jobLimit: 2
# Min duration from the last import to trigger the next, takes precedence over other settings. Validation rules: required,minDuration=30s,maxDuration=24h
minInterval: 1m0s
trigger:
Expand Down
31 changes: 31 additions & 0 deletions internal/pkg/service/stream/definition/key/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package key

import (
"go.opentelemetry.io/otel/attribute"

"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)

type JobID string

type JobKey struct {
SinkKey
JobID JobID `json:"jobId" validate:"required,min=1,max=48"`
}

func (v JobID) String() string {
if v == "" {
panic(errors.New("JobID cannot be empty"))
}
return string(v)
}

func (v JobKey) String() string {
return v.SinkKey.String() + "/" + v.JobID.String()
}

func (v JobKey) Telemetry() []attribute.KeyValue {
t := v.SinkKey.Telemetry()
t = append(t, attribute.String("job.id", v.JobID.String()))
return t
}
9 changes: 9 additions & 0 deletions internal/pkg/service/stream/plugin/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,15 @@ func (c *Collection) OnSinkModification(fn onSinkSaveFn) {
})
}

func (c *Collection) OnSinkDelete(fn onSinkSaveFn) {
Matovidlo marked this conversation as resolved.
Show resolved Hide resolved
c.onSinkSave = append(c.onSinkSave, func(ctx context.Context, now time.Time, by definition.By, original, sink *definition.Sink) error {
if isDeletedNow(original, sink) {
return fn(ctx, now, by, original, sink)
}
return nil
})
}

func (c *Collection) OnFileOpen(fn onFileOpenFn) {
c.onFileOpen = append(c.onFileOpen, fn)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestBridge_FullWorkflow(t *testing.T) {
bridgeTest.MockTokenStorageAPICalls(t, transport)
bridgeTest.MockBucketStorageAPICalls(t, transport)
bridgeTest.MockTableStorageAPICalls(t, transport)
bridgeTest.MockSuccessJobStorageAPICalls(t, transport)
bridgeTest.MockFileStorageAPICalls(t, clk, transport)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ func (b *Bridge) setupOnFileOpen() {

// Update file entity
file.Mapping = sink.Table.Mapping
file.StagingStorage.Provider = stagingFileProvider // staging file is provided by the Keboola
file.TargetStorage.Provider = targetProvider // destination is a Keboola table
file.StagingStorage.Provider = stagingFileProvider // staging file is provided by the Keboola
file.TargetStorage.Provider = targetProvider // destination is a Keboola table
file.TargetStorage.Import.JobLimit = b.config.JobLimit // specific job condition for keboola provider
file.StagingStorage.Expiration = utctime.From(keboolaFile.UploadCredentials.CredentialsExpiration())
}

Expand Down Expand Up @@ -205,13 +206,18 @@ func (b *Bridge) importFile(ctx context.Context, file plugin.File, stats statist

// Save job ID to etcd
keboolaFile.StorageJobID = &job.ID
Matovidlo marked this conversation as resolved.
Show resolved Hide resolved

b.logger.With(attribute.String("job.id", keboolaFile.StorageJobID.String())).Infof(ctx, "created new storage job for file")

err = b.schema.File().ForFile(file.FileKey).Put(b.client, keboolaFile).Do(ctx).Err()
if err != nil {
return err
}

// Save job ID to etcd
err = b.createJob(ctx, token.TokenString(), file, job)
if err != nil {
return err
}

b.logger.Info(ctx, "created staging file")
}

// Wait for job to complete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func TestBridge_ImportFile_EmptyFile(t *testing.T) {
bridgeTest.MockTokenStorageAPICalls(t, transport)
bridgeTest.MockBucketStorageAPICalls(t, transport)
bridgeTest.MockTableStorageAPICalls(t, transport)
bridgeTest.MockProcessingJobStorageAPICalls(t, transport)
bridgeTest.MockFileStorageAPICalls(t, clk, transport)
}

Expand Down Expand Up @@ -105,7 +106,7 @@ func TestBridge_ImportFile_EmptyFile(t *testing.T) {
require.NoError(t, storageRepo.Slice().SwitchToUploading(slice.SliceKey, clk.Now(), true).Do(ctx).Err())
require.NoError(t, storageRepo.Slice().SwitchToUploaded(slice.SliceKey, clk.Now()).Do(ctx).Err())

// Switch file to the FileImporting state
// Switch empty file to the FileImporting state
clk.Add(time.Second)
require.NoError(t, storageRepo.File().SwitchToImporting(file.FileKey, clk.Now(), true).Do(ctx).Err())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ storage/file/level/local/123/456/my-source/my-sink/2000-01-01T01:00:00.000Z
"target": {
"provider": "keboola",
"import": {
"jobLimit": 2,
"minInterval": "1m0s",
"trigger": {
"count": 50000,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package bridge

import (
"context"
"fmt"

"github.com/keboola/go-client/pkg/keboola"
"go.opentelemetry.io/otel/attribute"

"github.com/keboola/keboola-as-code/internal/pkg/service/common/ctxattr"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/op"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin"
keboolasink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
)

func (b *Bridge) Job(k key.JobKey) op.WithResult[keboolasink.Job] {
return b.schema.Job().ForJob(k).GetOrErr(b.client)
}

func (b *Bridge) DeleteJob(k key.JobKey) *op.AtomicOp[keboolasink.Job] {
var active keboolasink.Job
return op.Atomic(b.client, &active).
// Entity must exist
Read(func(ctx context.Context) op.Op {
return b.schema.Job().ForJob(k).Get(b.client)
}).
// Delete
Write(func(ctx context.Context) op.Op {
return b.schema.Job().ForJob(k).Delete(b.client)
})
}

func (b *Bridge) createJob(ctx context.Context, token string, file plugin.File, storageJob *keboola.StorageJob) error {
modelJob := model.Job{
JobKey: key.JobKey{
SinkKey: file.SinkKey,
JobID: key.JobID(storageJob.ID.String()),
},
}
// Add context attributes
ctx = ctxattr.ContextWith(ctx, attribute.String("job.id", modelJob.String()))
b.logger.Debugf(ctx, "creating job")

keboolaJob := keboolasink.Job{
JobKey: modelJob.JobKey,
StorageJobKey: storageJob.StorageJobKey,
Token: token,
}
b.logger.Infof(ctx, "creating storage job")

lock := b.locks.NewMutex(fmt.Sprintf("api.source.sink.jobs.%s", file.SinkKey))
Matovidlo marked this conversation as resolved.
Show resolved Hide resolved
if err := lock.Lock(ctx); err != nil {
return err
}
defer func() {
if err := lock.Unlock(ctx); err != nil {
b.logger.Warnf(ctx, "cannot unlock lock %q: %s", lock.Key(), err)
}
}()

operation := b.storageRepository.Job().Create(&modelJob).RequireLock(lock)
if err := operation.Do(ctx).Err(); err != nil {
return err
}
b.logger.Debugf(ctx, "job created")

resultOp := b.schema.Job().ForJob(keboolaJob.JobKey).Put(b.client, keboolaJob)
if err := resultOp.Do(ctx).Err(); err != nil {
return err
}

b.logger.Infof(ctx, "storage job created")
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package bridge_test

import (
"context"
"testing"
"time"

"github.com/benbjohnson/clock"
"github.com/keboola/go-client/pkg/keboola"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/client/v3/concurrency"

commonDeps "github.com/keboola/keboola-as-code/internal/pkg/service/common/dependencies"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/duration"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/rollback"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/utctime"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/config"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies"
bridgeTest "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/test"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/node/coordinator/fileimport"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)

func TestBridge_CreateJob(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

clk := clock.NewMock()
clk.Set(utctime.MustParse("2000-01-01T01:00:00.000Z").Time())
by := test.ByUser()

checkInterval := time.Minute
d, mock := dependencies.NewMockedCoordinatorScopeWithConfig(t, ctx, func(cfg *config.Config) {
cfg.Storage.Level.Target.Operator.FileImportCheckInterval = duration.From(checkInterval)
}, commonDeps.WithClock(clk))
logger := mock.DebugLogger()
client := mock.TestEtcdClient()
defRepo := d.DefinitionRepository()
storageRepo := d.StorageRepository()
volumeRepo := storageRepo.Volume()

apiCtx := rollback.ContextWith(ctx, rollback.New(d.Logger()))
apiCtx = context.WithValue(apiCtx, dependencies.KeboolaProjectAPICtxKey, mock.KeboolaProjectAPI())

// Register mocked responses
// -----------------------------------------------------------------------------------------------------------------
transport := mock.MockedHTTPTransport()
{
bridgeTest.MockTokenStorageAPICalls(t, transport)
bridgeTest.MockBucketStorageAPICalls(t, transport)
bridgeTest.MockTableStorageAPICalls(t, transport)
bridgeTest.MockSuccessJobStorageAPICalls(t, transport)
bridgeTest.MockImportAsyncAPICalls(t, transport)
bridgeTest.MockFileStorageAPICalls(t, clk, transport)
}

// Register active volumes
// -----------------------------------------------------------------------------------------------------------------
{
session, err := concurrency.NewSession(client)
require.NoError(t, err)
defer func() { require.NoError(t, session.Close()) }()
test.RegisterWriterVolumes(t, ctx, volumeRepo, session, 1)
}

// Create sink
// -----------------------------------------------------------------------------------------------------------------
projectID := keboola.ProjectID(123)
branchKey := key.BranchKey{ProjectID: projectID, BranchID: 456}
sourceKey := key.SourceKey{BranchKey: branchKey, SourceID: "my-source"}
sinkKey := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink"}
{
branch := test.NewBranch(branchKey)
require.NoError(t, defRepo.Branch().Create(&branch, clk.Now(), by).Do(apiCtx).Err())
source := test.NewSource(sourceKey)
require.NoError(t, defRepo.Source().Create(&source, clk.Now(), by, "Create source").Do(apiCtx).Err())
sink := test.NewKeboolaTableSink(sinkKey)
require.NoError(t, defRepo.Sink().Create(&sink, clk.Now(), by, "Create sink").Do(apiCtx).Err())
}

// Switch file to the FileImporting state
// -----------------------------------------------------------------------------------------------------------------
{
// Rotate file
clk.Add(time.Second)
require.NoError(t, storageRepo.File().Rotate(sinkKey, clk.Now()).Do(apiCtx).Err())
files, err := storageRepo.File().ListAll().Do(ctx).All()
require.NoError(t, err)
require.Len(t, files, 2)
slices, err := storageRepo.Slice().ListIn(files[0].FileKey).Do(ctx).All()
require.NoError(t, err)
require.Len(t, slices, 1)
require.Equal(t, model.FileClosing, files[0].State)
require.Equal(t, model.FileWriting, files[1].State)
require.Equal(t, model.SliceClosing, slices[0].State)
file := files[0]
slice := slices[0]

// Upload of non empty slice
clk.Add(time.Second)
require.NoError(t, storageRepo.Slice().SwitchToUploading(slice.SliceKey, clk.Now(), false).Do(ctx).Err())
require.NoError(t, storageRepo.Slice().SwitchToUploaded(slice.SliceKey, clk.Now()).Do(ctx).Err())

// Switch file to the FileImporting state
clk.Add(time.Second)
require.NoError(t, storageRepo.File().SwitchToImporting(file.FileKey, clk.Now(), false).Do(ctx).Err())
}

// Start file import operator
// -----------------------------------------------------------------------------------------------------------------
require.NoError(t, fileimport.Start(d, mock.TestConfig().Storage.Level.Target.Operator))

// Wait import of the empty file
// -----------------------------------------------------------------------------------------------------------------
logger.Truncate()
clk.Add(checkInterval)
assert.EventuallyWithT(t, func(c *assert.CollectT) {
logger.AssertJSONMessages(c, `
{"level":"info","message":"importing file","component":"storage.node.operator.file.import"}
{"level":"debug","message":"creating job","job.id":"123/456/my-source/my-sink/321","component":"keboola.bridge"}
{"level":"info","message":"creating storage job","job.id":"123/456/my-source/my-sink/321","component":"keboola.bridge"}
{"level":"debug","message":"job created","job.id":"123/456/my-source/my-sink/321","component":"keboola.bridge"}
{"level":"info","message":"storage job created","job.id":"123/456/my-source/my-sink/321","component":"keboola.bridge"}
{"level":"info","message":"imported file","component":"storage.node.operator.file.import"}
`)
}, 15*time.Second, 50*time.Millisecond)

// Empty file in the Storage API has been deleted
// -----------------------------------------------------------------------------------------------------------------
expectedImportAsyncAPICall := "POST https://connection.keboola.local/v2/storage/branch/456/tables/in.c-bucket.my-table/import-async"
expectedStorageJobsCall := "GET https://connection.keboola.local/v2/storage/jobs/321"
assert.Equal(t, 1, transport.GetCallCountInfo()[expectedImportAsyncAPICall])
assert.Equal(t, 1, transport.GetCallCountInfo()[expectedStorageJobsCall])

// Shutdown
// -----------------------------------------------------------------------------------------------------------------
d.Process().Shutdown(ctx, errors.New("bye bye"))
d.Process().WaitForShutdown()
logger.AssertNoErrorMessage(t)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package schema

import (
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
keboolasink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola"
)

type (
// Job is an etcd prefix that stores all Keboola-specific data we need for job polling.
Job struct {
etcdop.PrefixT[keboolasink.Job]
}
)

func forJob(s *serde.Serde) Job {
return Job{PrefixT: etcdop.NewTypedPrefix[keboolasink.Job]("storage/keboola/job", s)}
}

func (v Job) ForJob(k key.JobKey) etcdop.KeyT[keboolasink.Job] {
return v.PrefixT.Key(k.String())
}
Loading