Skip to content

Commit

Permalink
feat: Start metadata cleanup to unthrottle the sink
Browse files Browse the repository at this point in the history
When not present, the test never ends due to set limit
  • Loading branch information
Matovidlo committed Nov 8, 2024
1 parent a90ce27 commit c1ac629
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion test/stream/bridge/keboola/keboola_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ func TestKeboolaBridgeWorkflow(t *testing.T) {

// Update configuration to make the cluster testable
configFn := func(cfg *config.Config) {
// Enable metadata cleanup for removing storage jobs
cfg.Storage.MetadataCleanup.Enabled = true
// Disable unrelated workers
cfg.Storage.DiskCleanup.Enabled = false
cfg.Storage.MetadataCleanup.Enabled = false
cfg.API.Task.CleanupEnabled = false

// Use deterministic load balancer
Expand Down Expand Up @@ -66,6 +67,9 @@ func TestKeboolaBridgeWorkflow(t *testing.T) {
Expiration: duration.From(30 * time.Minute),
},
}

// Cleanup should be perfomed more frequently to remove already finished storage jobs
cfg.Storage.MetadataCleanup.Interval = 10 * time.Second
}

ts := setup(t, ctx, configFn)
Expand Down Expand Up @@ -440,6 +444,11 @@ func (ts *testState) testFileImport(t *testing.T, ctx context.Context, expectati
`)
}, 60*time.Second, 100*time.Millisecond)

assert.EventuallyWithT(t, func(c *assert.CollectT) {
ts.logger.AssertJSONMessages(c, `
{"level":"info","message":"deleted \"1\" jobs","deletedJobsCount":1,"component":"storage.metadata.cleanup"}
`)
}, 20*time.Second, 100*time.Millisecond)
// Check file/slices state after the upload
files := ts.checkState(t, ctx, expectations.expectedFiles)

Expand Down

0 comments on commit c1ac629

Please sign in to comment.