diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/transport/client_conn.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/transport/client_conn.go index a73b305a29..cf28c715da 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/transport/client_conn.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/transport/client_conn.go @@ -151,6 +151,7 @@ func (c *ClientConnection) Close(ctx context.Context) error { func (c *ClientConnection) dialLoop(ctx context.Context, initDone chan error) { b := newClientConnBackoff() + b.InitialInterval = 100 * time.Millisecond var closeErr error for { if c.isClosed() || c.client.isClosed() || errors.Is(closeErr, yamux.ErrSessionShutdown) || errors.Is(closeErr, io.EOF) { diff --git a/test/stream/bridge/keboola/keboola_test.go b/test/stream/bridge/keboola/keboola_test.go index e575771829..ff95b18f85 100644 --- a/test/stream/bridge/keboola/keboola_test.go +++ b/test/stream/bridge/keboola/keboola_test.go @@ -653,6 +653,16 @@ func (ts *testState) testFileImport(t *testing.T, ctx context.Context, expectati // Expect file rotation ts.logSection(t, "expecting file rotation (min 30s from the previous)") assert.EventuallyWithT(t, func(c *assert.CollectT) { + var files []model.File + files, err := ts.coordinatorScp1.StorageRepository().File().ListInState(ts.sinkKey, model.FileWriting).Do(ctx).All() + require.NoError(t, err) + for _, file := range files { + stats, err := ts.coordinatorScp1.StatisticsRepository().FileStats(ctx, file.FileKey) + if err == nil && stats.Local.RecordsCount == uint64(expectations.sendRecords.count) { + return + } + } + ts.logger.AssertJSONMessages(c, ` {"level":"info","message":"rotating file, import conditions met: count threshold met, records count: 30, threshold: 30","component":"storage.node.operator.file.rotation"} {"level":"info","message":"rotated file","component":"storage.node.operator.file.rotation"} @@ -680,7 +690,7 @@ func (ts *testState) testFileImport(t *testing.T, ctx context.Context, expectati ts.logger.AssertJSONMessages(c, ` {"level":"info","message":"closed file","component":"storage.node.operator.file.rotation"} `) - }, 15*time.Second, 100*time.Millisecond) + }, 25*time.Second, 100*time.Millisecond) // Expect file import ts.logSection(t, "expecting file import") @@ -738,6 +748,16 @@ func (ts *testState) sendRecords(t *testing.T, ctx context.Context, start, n int go func() { var err error for { + var files []model.File + files, err = ts.coordinatorScp1.StorageRepository().File().ListInState(ts.sinkKey, model.FileWriting).Do(ctx).All() + require.NoError(t, err) + for _, file := range files { + stats, err := ts.coordinatorScp1.StatisticsRepository().FileStats(ctx, file.FileKey) + if err == nil && stats.Local.RecordsCount == uint64(start+n) { + return + } + } + var req *http.Request req, err = http.NewRequestWithContext(ctx, http.MethodPost, sourceURL, strings.NewReader(fmt.Sprintf("foo%d", start+i))) require.NoError(t, err)