Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement storage job limiter #2123

Merged
merged 27 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8bf131a
fix: Add correct datadog tag env. New teardown and setup timeouts
Matovidlo Nov 4, 2024
1db958c
feat: Add configuration options for job and sink limit
Matovidlo Nov 8, 2024
0f04f52
feat: Add new plugin for sink delete
Matovidlo Nov 8, 2024
31f143e
feat: Add support for storage job repository
Matovidlo Nov 8, 2024
ad8eb9b
feat: Add support for keboola storage job
Matovidlo Nov 8, 2024
e2759c8
feat: Add support for deleting newly created jobs entities
Matovidlo Nov 8, 2024
658dad9
feat: Add new condition and mirroring of jobs
Matovidlo Nov 8, 2024
284d3d8
feat: Start metadata cleanup to unthrottle the sink
Matovidlo Nov 8, 2024
38959a4
fix: Remove sink repository, as it is not needed and present in deps
Matovidlo Nov 8, 2024
c009b9b
fix: Linter gci
Matovidlo Nov 8, 2024
2703814
fix: Add missing attribute in keboolafile
Matovidlo Nov 11, 2024
9fd9a6b
fix: Add new context attributes instead of logging whole object
Matovidlo Nov 11, 2024
b72a0f7
fix: Comment was copied from keboolasink
Matovidlo Nov 11, 2024
1fc049a
fix: Remove active prefix from job schema
Matovidlo Nov 11, 2024
8b406cd
fix: Change sinkLimit -> jobLimit
Matovidlo Nov 11, 2024
bacd1f0
fix: Add ListAll test, fix lint and config test
Matovidlo Nov 11, 2024
6116042
feat: Move job repository under keboola bridge
Matovidlo Nov 18, 2024
798bada
fix: Make configurable keboola bridge mirroring using exported function
Matovidlo Nov 18, 2024
d3a711d
fix: Linter issue
Matovidlo Nov 18, 2024
defdd2c
fix: Typo in comment
Matovidlo Nov 19, 2024
6cd186a
fix: Use context from Start of coordinator in bridge
Matovidlo Nov 19, 2024
0ef0afe
fix: Change naming and move throttle under file package in plugin
Matovidlo Nov 19, 2024
da8707a
fix: Add smaller interval to metadata cleanup so the jobs are cleared…
Matovidlo Nov 19, 2024
41c0053
feat: Remove token from job and use exported function on top of bridge
Matovidlo Nov 19, 2024
a376096
fix: Change default behaviour of CanAcceptNewFile. Returns by default…
Matovidlo Nov 19, 2024
0412bce
fix: Move cleanup of job under bridge module
Matovidlo Nov 20, 2024
665e97a
fix: Race condition on span with defer in errgrp
Matovidlo Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (b *Bridge) importFile(ctx context.Context, file plugin.File, stats statist
}

// Save job ID to etcd
err = b.createJob(ctx, token.TokenString(), file, job)
err = b.createJob(ctx, file, job)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/common/ctxattr"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin"
keboolasink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model"
)

func (b *Bridge) createJob(ctx context.Context, token string, file plugin.File, storageJob *keboola.StorageJob) error {
func (b *Bridge) SinkToken(ctx context.Context, sinkKey key.SinkKey) (keboolasink.Token, error) {
jachym-tousek-keboola marked this conversation as resolved.
Show resolved Hide resolved
return b.schema.Token().ForSink(sinkKey).GetOrErr(b.client).Do(ctx).ResultOrErr()
}

func (b *Bridge) createJob(ctx context.Context, file plugin.File, storageJob *keboola.StorageJob) error {
keboolaJob := model.Job{
JobKey: key.JobKey{SinkKey: file.SinkKey, JobID: key.JobID(storageJob.ID.String())},
Token: token,
}
// Add context attributes
ctx = ctxattr.ContextWith(ctx, attribute.String("job.id", keboolaJob.String()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,5 @@ import (
// At the end throttle the sink, so it does not overloads the service.
type Job struct {
key.JobKey
Deleted bool `json:"-"` // internal field to mark the entity for deletion, there is no soft delete
Token string `json:"token"`
Deleted bool `json:"-"` // internal field to mark the entity for deletion, there is no soft delete
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ storage/job/123/567/my-source/my-sink/321
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink",
"jobId": "321",
"token": "secret"
"jobId": "321"
}
>>>>>
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ storage/job/123/567/my-source/my-sink/321
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink",
"jobId": "321",
"token": "secret"
"jobId": "321"
}
>>>>>
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ storage/job/123/567/my-source/my-sink/321
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink",
"jobId": "321",
"token": "secret"
"jobId": "321"
}
>>>>>
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ storage/job/123/567/my-source/my-sink/321
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink",
"jobId": "321",
"token": "secret1"
"jobId": "321"
}
>>>>>

Expand All @@ -19,7 +18,6 @@ storage/job/123/567/my-source/my-sink/322
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink",
"jobId": "322",
"token": "secret2"
"jobId": "322"
}
>>>>>
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ storage/job/123/567/my-source/my-sink-1/321
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink-1",
"jobId": "321",
"token": "secret1"
"jobId": "321"
}
>>>>>

Expand All @@ -19,7 +18,6 @@ storage/job/123/567/my-source/my-sink-2/322
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink-2",
"jobId": "322",
"token": "secret2"
"jobId": "322"
}
>>>>>
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ storage/job/123/567/my-source/my-sink-2/321
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink-2",
"jobId": "321",
"token": "secret1"
"jobId": "321"
}
>>>>>

Expand All @@ -197,8 +196,7 @@ storage/job/123/567/my-source/my-sink-2/322
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink-2",
"jobId": "322",
"token": "secret2"
"jobId": "322"
}
>>>>>

Expand All @@ -210,7 +208,6 @@ storage/job/123/567/my-source/my-sink-3/323
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink-3",
"jobId": "323",
"token": "secret3"
"jobId": "323"
}
>>>>>
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ storage/job/123/567/my-source/my-sink-2/321
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink-2",
"jobId": "321",
"token": "secret1"
"jobId": "321"
}
>>>>>

Expand All @@ -197,8 +196,7 @@ storage/job/123/567/my-source/my-sink-2/322
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink-2",
"jobId": "322",
"token": "secret2"
"jobId": "322"
}
>>>>>

Expand All @@ -210,7 +208,6 @@ storage/job/123/567/my-source/my-sink-3/323
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink-3",
"jobId": "323",
"token": "secret3"
"jobId": "323"
}
>>>>>
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestJobRepository_Create(t *testing.T) {
sink := dummy.NewSink(sinkKey)
require.NoError(t, d.DefinitionRepository().Sink().Create(&sink, now, by, "Create sink").Do(ctx).Err())

job := model.Job{JobKey: jobKey, Token: "secret"}
job := model.Job{JobKey: jobKey}
result, err := repo.Create(&job).Do(ctx).ResultOrErr()
require.NoError(t, err)
assert.Equal(t, job, result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestJobRepository_Exists(t *testing.T) {
sink := dummy.NewSink(sinkKey)
require.NoError(t, d.DefinitionRepository().Sink().Create(&sink, now, by, "Create sink").Do(ctx).Err())

job := model.Job{JobKey: jobKey, Token: "secret"}
job := model.Job{JobKey: jobKey}
result, err := repo.Create(&job).Do(ctx).ResultOrErr()
require.NoError(t, err)
assert.Equal(t, job, result)
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestJobRepository_Exists(t *testing.T) {
// Purge - ok
// -----------------------------------------------------------------------------------------------------------------
{
job := model.Job{JobKey: jobKey, Token: "secret"}
job := model.Job{JobKey: jobKey}
assert.NoError(t, repo.Purge(&job).Do(ctx).Err())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestJobRepository_Get(t *testing.T) {
sink := dummy.NewSink(sinkKey)
require.NoError(t, d.DefinitionRepository().Sink().Create(&sink, now, by, "Create sink").Do(ctx).Err())

job := model.Job{JobKey: jobKey, Token: "secret"}
job := model.Job{JobKey: jobKey}
result, err := repo.Create(&job).Do(ctx).ResultOrErr()
require.NoError(t, err)
assert.Equal(t, job, result)
Expand All @@ -74,6 +74,6 @@ func TestJobRepository_Get(t *testing.T) {
{
result, err := repo.Get(jobKey).Do(ctx).ResultOrErr()
require.NoError(t, err)
assert.Equal(t, model.Job{JobKey: jobKey, Token: "secret"}, result)
assert.Equal(t, model.Job{JobKey: jobKey}, result)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ func TestJobRepository_List(t *testing.T) {
sink := dummy.NewSink(sinkKey)
require.NoError(t, d.DefinitionRepository().Sink().Create(&sink, now, by, "Create sink").Do(ctx).Err())

job1 = model.Job{JobKey: jobKey1, Token: "secret1"}
job1 = model.Job{JobKey: jobKey1}
result, err := repo.Create(&job1).Do(ctx).ResultOrErr()
require.NoError(t, err)
assert.Equal(t, job1, result)

job2 = model.Job{JobKey: jobKey2, Token: "secret2"}
job2 = model.Job{JobKey: jobKey2}
result, err = repo.Create(&job2).Do(ctx).ResultOrErr()
require.NoError(t, err)
assert.Equal(t, job2, result)
Expand Down Expand Up @@ -133,12 +133,12 @@ func TestJobRepository_ListDeleted(t *testing.T) {
// -----------------------------------------------------------------------------------------------------------------
var job1, job2 model.Job
{
job1 = model.Job{JobKey: jobKey1, Token: "secret1"}
job1 = model.Job{JobKey: jobKey1}
result, err := repo.Create(&job1).Do(ctx).ResultOrErr()
require.NoError(t, err)
assert.Equal(t, job1, result)

job2 = model.Job{JobKey: jobKey2, Token: "secret2"}
job2 = model.Job{JobKey: jobKey2}
result, err = repo.Create(&job2).Do(ctx).ResultOrErr()
require.NoError(t, err)
assert.Equal(t, job2, result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestJobRepository_Purge(t *testing.T) {
sink := dummy.NewSink(sinkKey)
require.NoError(t, d.DefinitionRepository().Sink().Create(&sink, now, by, "Create sink").Do(ctx).Err())

job := model.Job{JobKey: jobKey, Token: "secret"}
job := model.Job{JobKey: jobKey}
require.NoError(t, repo.Create(&job).Do(ctx).Err())
}

Expand Down Expand Up @@ -127,13 +127,13 @@ func TestJobRepository_PurgeJobsOnSinkDelete(t *testing.T) {
// Create three Jobs
// -----------------------------------------------------------------------------------------------------------------
{
job := model.Job{JobKey: jobKey1, Token: "secret1"}
job := model.Job{JobKey: jobKey1}
require.NoError(t, repo.Create(&job).Do(ctx).Err())

job = model.Job{JobKey: jobKey2, Token: "secret2"}
job = model.Job{JobKey: jobKey2}
require.NoError(t, repo.Create(&job).Do(ctx).Err())

job = model.Job{JobKey: jobKey3, Token: "secret3"}
job = model.Job{JobKey: jobKey3}
require.NoError(t, repo.Create(&job).Do(ctx).Err())
}

Expand Down Expand Up @@ -223,13 +223,13 @@ func TestJobRepository_PurgeJobsOnSourceDelete_DeleteSource(t *testing.T) {
serviceErrors.AssertErrorStatusCode(t, http.StatusNotFound, err)
}

job1 = model.Job{JobKey: jobKey1, Token: "secret1"}
job1 = model.Job{JobKey: jobKey1}
require.NoError(t, repo.Create(&job1).Do(ctx).Err())

job2 = model.Job{JobKey: jobKey2, Token: "secret2"}
job2 = model.Job{JobKey: jobKey2}
require.NoError(t, repo.Create(&job2).Do(ctx).Err())

job3 = model.Job{JobKey: jobKey3, Token: "secret3"}
job3 = model.Job{JobKey: jobKey3}
require.NoError(t, repo.Create(&job3).Do(ctx).Err())
etcdhelper.AssertKVsFromFile(t, client, "fixtures/job_purge_snapshot_003.txt", ignoredEtcdKeys)
}
Expand Down Expand Up @@ -325,13 +325,13 @@ func TestSinkRepository_DeleteSinksOnSourceDelete_DeleteBranch(t *testing.T) {
serviceErrors.AssertErrorStatusCode(t, http.StatusNotFound, err)
}

job1 = model.Job{JobKey: jobKey1, Token: "secret1"}
job1 = model.Job{JobKey: jobKey1}
require.NoError(t, repo.Create(&job1).Do(ctx).Err())

job2 = model.Job{JobKey: jobKey2, Token: "secret2"}
job2 = model.Job{JobKey: jobKey2}
require.NoError(t, repo.Create(&job2).Do(ctx).Err())

job3 = model.Job{JobKey: jobKey3, Token: "secret3"}
job3 = model.Job{JobKey: jobKey3}
require.NoError(t, repo.Create(&job3).Do(ctx).Err())
etcdhelper.AssertKVsFromFile(t, client, "fixtures/job_purge_snapshot_005.txt", ignoredEtcdKeys)
}
Expand Down
18 changes: 8 additions & 10 deletions internal/pkg/service/stream/storage/metacleanup/metacleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,16 +266,8 @@ func (n *Node) cleanJob(ctx context.Context, job keboolaBridgeModel.Job) (err er
ctx, span := n.telemetry.Tracer().Start(ctx, "keboola.go.stream.model.cleanup.metadata.cleanJob")
defer span.End(&err)

var keboolaJob keboolaBridgeModel.Job
// Retrieve job on bridge level
if keboolaJob, err = n.keboolaBridgeRepository.Job().Get(job.JobKey).Do(ctx).ResultOrErr(); err != nil {
err = errors.PrefixErrorf(err, `cannot get keboola storage job "%s"`, job.JobKey)
n.logger.Error(ctx, err.Error())
return err, false
}

// Parse storage job ID from string
id64, err := strconv.ParseInt(string(keboolaJob.JobKey.JobID), 10, 64)
id64, err := strconv.ParseInt(string(job.JobKey.JobID), 10, 64)
if err != nil {
err = errors.PrefixErrorf(err, `cannot get keboola storage job "%s"`, job.JobKey)
n.logger.Error(ctx, err.Error())
Expand All @@ -288,9 +280,15 @@ func (n *Node) cleanJob(ctx context.Context, job keboolaBridgeModel.Job) (err er
return err, false
}

token, err := n.bridge.SinkToken(ctx, job.SinkKey)
if err != nil {
n.logger.Warnf(ctx, "cannot get token for sink, already deleted: %s", err.Error())
return nil, false
}

// Get job details from storage API
id := int(id64)
api := n.publicAPI.NewAuthorizedAPI(keboolaJob.Token, 1*time.Minute)
api := n.publicAPI.NewAuthorizedAPI(token.TokenString(), 1*time.Minute)
var jobStatus *keboola.StorageJob
if jobStatus, err = api.GetStorageJobRequest(keboola.StorageJobKey{ID: keboola.StorageJobID(id)}).Send(ctx); err != nil {
Fixed Show fixed Hide fixed
n.logger.Warnf(ctx, "cannot get information about storage job, probably already deleted: %s", err.Error())
Expand Down
Loading
Loading