Skip to content

Commit

Permalink
fix: Change sinkLimit -> jobLimit
Browse files Browse the repository at this point in the history
  • Loading branch information
Matovidlo committed Nov 11, 2024
1 parent 248d454 commit 1b280ed
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 12 deletions.
2 changes: 1 addition & 1 deletion internal/pkg/service/stream/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ storage:
fileImportTimeout: 15m0s
import:
# Specifies limit of sink that is representable by number
sinkLimit: 2
jobLimit: 2
# Min duration from the last import to trigger the next, takes precedence over other settings. Validation rules: required,minDuration=30s,maxDuration=24h
minInterval: 1m0s
trigger:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func (b *Bridge) setupOnFileOpen() {

// Update file entity
file.Mapping = sink.Table.Mapping
file.StagingStorage.Provider = stagingFileProvider // staging file is provided by the Keboola
file.TargetStorage.Provider = targetProvider // destination is a Keboola table
file.TargetStorage.Import.SinkLimit = b.config.JobLimit // specific sink condition for keboola provider
file.StagingStorage.Provider = stagingFileProvider // staging file is provided by the Keboola
file.TargetStorage.Provider = targetProvider // destination is a Keboola table
file.TargetStorage.Import.JobLimit = b.config.JobLimit // specific job condition for keboola provider
file.StagingStorage.Expiration = utctime.From(keboolaFile.UploadCredentials.CredentialsExpiration())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewConfig() Config {
FileImportTimeout: duration.From(15 * time.Minute),
},
Import: ImportConfig{
SinkLimit: 2,
JobLimit: 2,
MinInterval: duration.From(60 * time.Second),
Trigger: ImportTrigger{
Count: 50000,
Expand All @@ -53,7 +53,7 @@ type OperatorConfig struct {

// ImportConfig configures the file import.
type ImportConfig struct {
SinkLimit int `json:"sinkLimit" configKey:"sinkLimit" configUsage:"Specifies limit of sink that is representable by number"`
JobLimit int `json:"jobLimit" configKey:"jobLimit" configUsage:"Specifies limit of sink as number of jobs that it cannot exceed. In case it is reached, the sink is throttled and import cannot be performed unless Trigger conditions were met"`
MinInterval duration.Duration `json:"minInterval" configKey:"minInterval" configUsage:"Min duration from the last import to trigger the next, takes precedence over other settings." validate:"required,minDuration=30s,maxDuration=24h"`
Trigger ImportTrigger `json:"trigger" configKey:"trigger"`
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ func (c fileRotationConditionResult) String() string {
}
}

func shouldImport(cfg targetConfig.ImportConfig, now, openedAt, expiration time.Time, stats statistics.Value, sinkLimit int) fileRotationConditionResult {
func shouldImport(cfg targetConfig.ImportConfig, now, openedAt, expiration time.Time, stats statistics.Value, jobLimit int) fileRotationConditionResult {
sinceOpened := now.Sub(openedAt).Truncate(time.Second)
if threshold := cfg.MinInterval.Duration(); sinceOpened < threshold {
// Min interval settings take precedence over other settings.
return newConditionResult(noConditionMet, "min interval between imports is not met")
}

if threshold := cfg.SinkLimit; sinkLimit >= threshold {
if threshold := cfg.JobLimit; jobLimit >= threshold {
// When sink is throttled take precedence over other settings.
return newConditionResult(sinkThrottled, "sink is throttled, waiting for next import check")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestShouldImport(t *testing.T) {

// Defaults
cfg := targetConfig.ImportConfig{
SinkLimit: 2,
JobLimit: 2,
MinInterval: duration.From(60 * time.Second),
Trigger: targetConfig.ImportTrigger{
Count: 10000,
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestShouldImport(t *testing.T) {
assert.Equal(t, "no condition met", result.Cause())

// Sink limit reached, but no limit configured
cfg.SinkLimit = 1
cfg.JobLimit = 1
result = shouldImport(cfg, now, openedBefore01Min, expirationIn60min, statistics.Value{
RecordsCount: 100,
CompressedSize: 1 * datasize.KB,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func setup(t *testing.T, ctx context.Context, sinkLimit int) *testState {
clk.Set(utctime.MustParse("2000-01-01T00:00:00.000Z").Time())
d, mock := dependencies.NewMockedCoordinatorScopeWithConfig(t, ctx, func(cfg *config.Config) {
cfg.Storage.Level.Target.Import = targetConfig.ImportConfig{
SinkLimit: sinkLimit,
JobLimit: sinkLimit,
MinInterval: duration.From(minImportInterval),
Trigger: importTrigger,
}
Expand Down
2 changes: 1 addition & 1 deletion test/stream/bridge/keboola/keboola_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestKeboolaBridgeWorkflow(t *testing.T) {
},
}

// In the test, we trigger the file import only when sink limit is not reached. We do import to Keboola
// In the test, we trigger the file import only when sink limit is not reached.
cfg.Sink.Table.Keboola.JobLimit = 1

// In the test, we trigger the file import via the records count, the other values are intentionally high.
Expand Down

0 comments on commit 1b280ed

Please sign in to comment.