Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement storage job limiter #2123

Merged
merged 27 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8bf131a
fix: Add correct datadog tag env. New teardown and setup timeouts
Matovidlo Nov 4, 2024
1db958c
feat: Add configuration options for job and sink limit
Matovidlo Nov 8, 2024
0f04f52
feat: Add new plugin for sink delete
Matovidlo Nov 8, 2024
31f143e
feat: Add support for storage job repository
Matovidlo Nov 8, 2024
ad8eb9b
feat: Add support for keboola storage job
Matovidlo Nov 8, 2024
e2759c8
feat: Add support for deleting newly created jobs entities
Matovidlo Nov 8, 2024
658dad9
feat: Add new condition and mirroring of jobs
Matovidlo Nov 8, 2024
284d3d8
feat: Start metadata cleanup to unthrottle the sink
Matovidlo Nov 8, 2024
38959a4
fix: Remove sink repository, as it is not needed and present in deps
Matovidlo Nov 8, 2024
c009b9b
fix: Linter gci
Matovidlo Nov 8, 2024
2703814
fix: Add missing attribute in keboolafile
Matovidlo Nov 11, 2024
9fd9a6b
fix: Add new context attributes instead of logging whole object
Matovidlo Nov 11, 2024
b72a0f7
fix: Comment was copied from keboolasink
Matovidlo Nov 11, 2024
1fc049a
fix: Remove active prefix from job schema
Matovidlo Nov 11, 2024
8b406cd
fix: Change sinkLimit -> jobLimit
Matovidlo Nov 11, 2024
bacd1f0
fix: Add ListAll test, fix lint and config test
Matovidlo Nov 11, 2024
6116042
feat: Move job repository under keboola bridge
Matovidlo Nov 18, 2024
798bada
fix: Make configurable keboola bridge mirroring using exported function
Matovidlo Nov 18, 2024
d3a711d
fix: Linter issue
Matovidlo Nov 18, 2024
defdd2c
fix: Typo in comment
Matovidlo Nov 19, 2024
6cd186a
fix: Use context from Start of coordinator in bridge
Matovidlo Nov 19, 2024
0ef0afe
fix: Change naming and move throttle under file package in plugin
Matovidlo Nov 19, 2024
da8707a
fix: Add smaller interval to metadata cleanup so the jobs are cleared…
Matovidlo Nov 19, 2024
41c0053
feat: Remove token from job and use exported function on top of bridge
Matovidlo Nov 19, 2024
a376096
fix: Change default behaviour of CanAcceptNewFile. Returns by default…
Matovidlo Nov 19, 2024
0412bce
fix: Move cleanup of job under bridge module
Matovidlo Nov 20, 2024
665e97a
fix: Race condition on span with defer in errgrp
Matovidlo Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,20 @@ package bridge
import (
"context"
"fmt"
"math"
"strconv"
"time"

"github.com/keboola/go-client/pkg/keboola"
"go.opentelemetry.io/otel/attribute"

"github.com/keboola/keboola-as-code/internal/pkg/service/common/ctxattr"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin"
keboolasink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)

func (b *Bridge) SinkToken(ctx context.Context, sinkKey key.SinkKey) (keboolasink.Token, error) {
return b.schema.Token().ForSink(sinkKey).GetOrErr(b.client).Do(ctx).ResultOrErr()
}

func (b *Bridge) createJob(ctx context.Context, file plugin.File, storageJob *keboola.StorageJob) error {
keboolaJob := model.Job{
JobKey: key.JobKey{SinkKey: file.SinkKey, JobID: key.JobID(storageJob.ID.String())},
Expand Down Expand Up @@ -58,3 +57,65 @@ func (b *Bridge) canAcceptNewFile(ctx context.Context, sinkKey key.SinkKey) bool

return runningJobs < b.config.JobLimit
}

func (b *Bridge) CleanJob(ctx context.Context, job model.Job) (err error, deleted bool) {
// Parse storage job ID from string
id64, err := strconv.ParseInt(string(job.JobKey.JobID), 10, 64)
if err != nil {
err = errors.PrefixErrorf(err, `cannot get keboola storage job "%s"`, job.JobKey)
b.logger.Error(ctx, err.Error())
return err, false
}

if id64 < math.MinInt || id64 > math.MaxInt {
err = errors.Errorf("parsed job ID %d is out of int range", id64)
b.logger.Error(ctx, err.Error())
return err, false
}

token, err := b.schema.Token().ForSink(job.SinkKey).GetOrErr(b.client).Do(ctx).ResultOrErr()
if err != nil {
b.logger.Warnf(ctx, "cannot get token for sink, already deleted: %s", err.Error())
return nil, false
}

// Get job details from storage API
id := int(id64)
api := b.publicAPI.NewAuthorizedAPI(token.TokenString(), 1*time.Minute)
var jobStatus *keboola.StorageJob
if jobStatus, err = api.GetStorageJobRequest(keboola.StorageJobKey{ID: keboola.StorageJobID(id)}).Send(ctx); err != nil {
b.logger.Warnf(ctx, "cannot get information about storage job, probably already deleted: %s", err.Error())
return nil, false
}

attr := attribute.String("job.state", jobStatus.Status)
ctx = ctxattr.ContextWith(ctx, attr)
// Check status of storage job
if jobStatus.Status == keboola.StorageJobStatusProcessing || jobStatus.Status == keboola.StorageJobStatusWaiting {
b.logger.Debugf(ctx, "cannot remove storage job, job status: %s", jobStatus.Status)
return nil, false
}

// Acquire lock
mutex := b.locks.NewMutex(fmt.Sprintf("api.source.sink.jobs.%s", job.SinkKey))
if err = mutex.TryLock(ctx); err != nil {
return err, false
}
defer func() {
if err := mutex.Unlock(ctx); err != nil {
b.logger.Errorf(ctx, "cannot unlock the lock: %s", err)
}
}()

// Purge job in bridge repository
if _, err = b.keboolaBridgeRepository.Job().Purge(&job).RequireLock(mutex).Do(ctx).ResultOrErr(); err != nil {
err = errors.PrefixErrorf(err, `cannot delete finished storage job "%s"`, job.JobKey)
b.logger.Error(ctx, err.Error())
return err, false
}

// Log job details
b.logger.Infof(ctx, `deleted finished storage job`)

return nil, true
}
94 changes: 15 additions & 79 deletions internal/pkg/service/stream/storage/metacleanup/metacleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ package metacleanup

import (
"context"
"fmt"
"math"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -176,10 +173,24 @@ func (n *Node) cleanMetadata(ctx context.Context) (err error) {
ListAll().
ForEach(func(job keboolaBridgeModel.Job, _ *iterator.Header) error {
jachym-tousek-keboola marked this conversation as resolved.
Show resolved Hide resolved
grp.Go(func() error {
err, deleted := n.cleanJob(ctx, job)
// There can be several cleanup nodes, each node processes an own part.
if !n.dist.MustCheckIsOwner(job.ProjectID.String()) {
return nil
}

// Log/trace job details
attrs := job.Telemetry()
ctx = ctxattr.ContextWith(ctx, attrs...)

// Trace each job
ctx, span := n.telemetry.Tracer().Start(ctx, "keboola.go.stream.model.cleanup.metadata.cleanJob")
defer span.End(&err)

err, deleted := n.bridge.CleanJob(ctx, job)
if deleted {
jobCounter.Add(1)
}

return err
})

Expand Down Expand Up @@ -251,78 +262,3 @@ func (n *Node) isFileExpired(file model.File, age time.Duration) bool {
// Other files have a longer expiration so there is time for retries.
return age >= n.config.ActiveFileExpiration
}

func (n *Node) cleanJob(ctx context.Context, job keboolaBridgeModel.Job) (err error, deleted bool) {
// There can be several cleanup nodes, each node processes an own part.
if !n.dist.MustCheckIsOwner(job.ProjectID.String()) {
return nil, false
}

// Log/trace file details
attrs := job.Telemetry()
ctx = ctxattr.ContextWith(ctx, attrs...)

// Trace each job
ctx, span := n.telemetry.Tracer().Start(ctx, "keboola.go.stream.model.cleanup.metadata.cleanJob")
defer span.End(&err)

// Parse storage job ID from string
id64, err := strconv.ParseInt(string(job.JobKey.JobID), 10, 64)
if err != nil {
err = errors.PrefixErrorf(err, `cannot get keboola storage job "%s"`, job.JobKey)
n.logger.Error(ctx, err.Error())
return err, false
}

if id64 < math.MinInt || id64 > math.MaxInt {
err = errors.Errorf("parsed job ID %d is out of int range", id64)
n.logger.Error(ctx, err.Error())
return err, false
}

token, err := n.bridge.SinkToken(ctx, job.SinkKey)
if err != nil {
n.logger.Warnf(ctx, "cannot get token for sink, already deleted: %s", err.Error())
return nil, false
}

// Get job details from storage API
id := int(id64)
api := n.publicAPI.NewAuthorizedAPI(token.TokenString(), 1*time.Minute)
var jobStatus *keboola.StorageJob
if jobStatus, err = api.GetStorageJobRequest(keboola.StorageJobKey{ID: keboola.StorageJobID(id)}).Send(ctx); err != nil {
n.logger.Warnf(ctx, "cannot get information about storage job, probably already deleted: %s", err.Error())
return nil, false
}

attrs = append(attrs, attribute.String("job.state", jobStatus.Status))
ctx = ctxattr.ContextWith(ctx, attrs...)
// Check status of storage job
if jobStatus.Status == keboola.StorageJobStatusProcessing || jobStatus.Status == keboola.StorageJobStatusWaiting {
n.logger.Debugf(ctx, "cannot remove storage job, job status: %s", jobStatus.Status)
return nil, false
}

// Acquire lock
mutex := n.locks.NewMutex(fmt.Sprintf("api.source.sink.jobs.%s", job.SinkKey))
if err = mutex.TryLock(ctx); err != nil {
return err, false
}
defer func() {
if err := mutex.Unlock(ctx); err != nil {
n.logger.Errorf(ctx, "cannot unlock the lock: %s", err)
}
}()

// Purge job in bridge repository
if _, err = n.keboolaBridgeRepository.Job().Purge(&job).RequireLock(mutex).Do(ctx).ResultOrErr(); err != nil {
err = errors.PrefixErrorf(err, `cannot delete finished storage job "%s"`, job.JobKey)
n.logger.Error(ctx, err.Error())
return err, false
}

// Log file details
n.logger.Infof(ctx, `deleted finished storage job`)

return nil, true
}
Loading