Skip to content

Commit

Permalink
fix: Add stats check for colision imports when running toxiproxy test
Browse files Browse the repository at this point in the history
  • Loading branch information
Matovidlo committed Dec 12, 2024
1 parent b56b5fd commit f6710bb
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func (c *ClientConnection) Close(ctx context.Context) error {

func (c *ClientConnection) dialLoop(ctx context.Context, initDone chan error) {
b := newClientConnBackoff()
b.InitialInterval = 100 * time.Millisecond
var closeErr error
for {
if c.isClosed() || c.client.isClosed() || errors.Is(closeErr, yamux.ErrSessionShutdown) || errors.Is(closeErr, io.EOF) {
Expand Down
22 changes: 21 additions & 1 deletion test/stream/bridge/keboola/keboola_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,16 @@ func (ts *testState) testFileImport(t *testing.T, ctx context.Context, expectati
// Expect file rotation
ts.logSection(t, "expecting file rotation (min 30s from the previous)")
assert.EventuallyWithT(t, func(c *assert.CollectT) {
var files []model.File
files, err := ts.coordinatorScp1.StorageRepository().File().ListInState(ts.sinkKey, model.FileWriting).Do(ctx).All()
require.NoError(t, err)
for _, file := range files {
stats, err := ts.coordinatorScp1.StatisticsRepository().FileStats(ctx, file.FileKey)
if err == nil && stats.Local.RecordsCount == uint64(expectations.sendRecords.count) {
return
}
}

ts.logger.AssertJSONMessages(c, `
{"level":"info","message":"rotating file, import conditions met: count threshold met, records count: 30, threshold: 30","component":"storage.node.operator.file.rotation"}
{"level":"info","message":"rotated file","component":"storage.node.operator.file.rotation"}
Expand Down Expand Up @@ -680,7 +690,7 @@ func (ts *testState) testFileImport(t *testing.T, ctx context.Context, expectati
ts.logger.AssertJSONMessages(c, `
{"level":"info","message":"closed file","component":"storage.node.operator.file.rotation"}
`)
}, 15*time.Second, 100*time.Millisecond)
}, 25*time.Second, 100*time.Millisecond)

// Expect file import
ts.logSection(t, "expecting file import")
Expand Down Expand Up @@ -738,6 +748,16 @@ func (ts *testState) sendRecords(t *testing.T, ctx context.Context, start, n int
go func() {
var err error
for {
var files []model.File
files, err = ts.coordinatorScp1.StorageRepository().File().ListInState(ts.sinkKey, model.FileWriting).Do(ctx).All()
require.NoError(t, err)
for _, file := range files {
stats, err := ts.coordinatorScp1.StatisticsRepository().FileStats(ctx, file.FileKey)
if err == nil && stats.Local.RecordsCount == uint64(start+n) {
return
}
}

var req *http.Request
req, err = http.NewRequestWithContext(ctx, http.MethodPost, sourceURL, strings.NewReader(fmt.Sprintf("foo%d", start+i)))
require.NoError(t, err)
Expand Down

0 comments on commit f6710bb

Please sign in to comment.