Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add toxiproxy stream test #2175

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func (c *ClientConnection) Close(ctx context.Context) error {

func (c *ClientConnection) dialLoop(ctx context.Context, initDone chan error) {
b := newClientConnBackoff()
b.InitialInterval = 100 * time.Millisecond
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to toxiproxy we have now higher latency and this often failed locally.

var closeErr error
for {
if c.isClosed() || c.client.isClosed() || errors.Is(closeErr, yamux.ErrSessionShutdown) || errors.Is(closeErr, io.EOF) {
Expand Down
3 changes: 3 additions & 0 deletions provisioning/dev/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ ENV PATH="$PATH:$GOBIN"
# Install packages
RUN apt-get update && apt-get install --no-install-recommends --yes nano protobuf-compiler graphviz build-essential
ENV EDITOR=nano
# Download toxiproxy
RUN curl -L https://github.com/Shopify/toxiproxy/releases/download/v2.11.0/toxiproxy-server-linux-amd64 -o /usr/local/bin/toxiproxy-server
RUN chmod +x /usr/local/bin/toxiproxy-server

# Install tools
RUN mkdir -p /tmp/build
Expand Down
3 changes: 1 addition & 2 deletions test/stream/bridge/keboola/guest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ func TestGuestUserWorkflow(t *testing.T) {
}
ts := setup(
t,
ctx,
modifyConfig,
utilsproject.WithIsGuest(),
)
ts.startNodes(t, ctx, modifyConfig)
ts.setupSourceThroughAPI(t, ctx, http.StatusForbidden)
defer ts.teardown(t, ctx)
recreateStreamAPI(t, &ts, ctx, modifyConfig)
Expand Down
297 changes: 283 additions & 14 deletions test/stream/bridge/keboola/keboola_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ import (
"fmt"
"io"
"net/http"
"os"
"strings"
"testing"
"time"

"github.com/Shopify/toxiproxy/v2"
toxiproxyClient "github.com/Shopify/toxiproxy/v2/client"
"github.com/c2h5oh/datasize"
"github.com/keboola/go-client/pkg/keboola"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -25,9 +29,7 @@ import (
)

// To see details run: TEST_VERBOSE=true go test ./test/stream/bridge/... -v.
func TestKeboolaBridgeWorkflow(t *testing.T) {
t.Parallel()

func TestKeboolaBridgeWorkflow(t *testing.T) { // nolint: paralleltest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why no parallel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is issue in etcd connection when being in parallel. Looks like we have some issue in the test suite. I will create subsequent tasks to dig into the issue. Simply adding parallel will make the test fail in CI for sure. The result locally varies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
defer cancel()

Expand Down Expand Up @@ -71,7 +73,8 @@ func TestKeboolaBridgeWorkflow(t *testing.T) {
cfg.Storage.MetadataCleanup.Interval = 10 * time.Second
}

ts := setup(t, ctx, configFn)
ts := setup(t)
ts.startNodes(t, ctx, configFn)
ts.setupSink(t, ctx)
defer ts.teardown(t, ctx)

Expand Down Expand Up @@ -302,6 +305,242 @@ func TestKeboolaBridgeWorkflow(t *testing.T) {
ts.checkKeboolaTable(t, ctx, 1, 129)
}

func TestNetworkIssuesKeboolaBridgeWorkflow(t *testing.T) { // nolint: paralleltest
metrics := toxiproxy.NewMetricsContainer(nil)
server := toxiproxy.NewServer(metrics, zerolog.New(os.Stderr))
go server.Listen("localhost:8474")

ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
defer cancel()

// Update configuration to make the cluster testable
configFn := func(cfg *config.Config) {
// Enable metadata cleanup for removing storage jobs
cfg.Storage.MetadataCleanup.Enabled = true
// Disable unrelated workers
cfg.Storage.DiskCleanup.Enabled = false
cfg.API.Task.CleanupEnabled = false

// Use deterministic load balancer
cfg.Storage.Level.Local.Writer.Network.PipelineBalancer = network.RoundRobinBalancerType

// In the test, we trigger the slice upload via the records count, the other values are intentionally high.
cfg.Storage.Level.Staging.Upload = stagingConfig.UploadConfig{
MinInterval: duration.From(1 * time.Second), // minimum
Trigger: stagingConfig.UploadTrigger{
Count: 10,
Size: 1000 * datasize.MB,
Interval: duration.From(30 * time.Minute),
},
}

// In the test, we trigger the file import only when sink limit is not reached.
cfg.Sink.Table.Keboola.JobLimit = 1

// In the test, we trigger the file import via the records count, the other values are intentionally high.
cfg.Storage.Level.Target.Import = targetConfig.ImportConfig{
MinInterval: duration.From(30 * time.Second), // minimum
Trigger: targetConfig.ImportTrigger{
Count: 30,
Size: 1000 * datasize.MB,
Interval: duration.From(30 * time.Minute),
SlicesCount: 100,
Expiration: duration.From(30 * time.Minute),
},
}

// Cleanup should be perfomed more frequently to remove already finished storage jobs
cfg.Storage.MetadataCleanup.Interval = 10 * time.Second
}

ts := setup(t)
client := toxiproxyClient.NewClient("localhost:8474")
proxy, err := client.CreateProxy("source1", ts.sourceURL1[7:len(ts.sourceURL1)-1], ts.sourceURL1[7:])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the source url is localhost:12345 i want to create new proxy on host:port localhost:1234.
It can be adjusted to randomized port

require.NoError(t, err)
proxy.AddToxic("latency_down", "latency", "downstream", 1.0, map[string]interface{}{
"latency": 1000,
})
t.Cleanup(func() {
proxy.Delete()
server.Shutdown()
})

ts.proxy = proxy
ts.startNodes(t, ctx, configFn)
defer ts.teardown(t, ctx)

ts.sourceURL1 = ts.sourceURL1[:len(ts.sourceURL1)-1]
ts.sourcePort1 /= 10
ts.logger.Infof(ctx, "proxyurl:%s, port:%d", ts.sourceURL1, ts.sourcePort1)
ts.setupSink(t, ctx)
// Check initial state
ts.checkState(t, ctx, []file{
{
state: model.FileWriting,
volumes: []volume{
{
slices: []model.SliceState{
model.SliceWriting,
},
},
{
slices: []model.SliceState{
model.SliceWriting,
},
},
},
},
})

// First upload
ts.logSection(t, "testing first upload")
ts.sendRecords(t, ctx, 1, 20)
if ts.proxy != nil {
ts.proxy.Disable()
time.Sleep(100 * time.Millisecond)
ts.proxy.Enable()
}

// First import
ts.logSection(t, "testing first import")
ts.testFileImport(t, ctx, fileImport{
sendRecords: records{
startID: 21,
count: 10,
},
expectedFileRecords: records{
startID: 1,
count: 30,
},
expectedTableRecords: records{
startID: 1,
count: 30,
},
expectedFiles: []file{
{
state: model.FileImported, // <<<<<
volumes: []volume{
{
slices: []model.SliceState{
model.SliceImported,
model.SliceImported,
},
},
{
slices: []model.SliceState{
model.SliceImported,
model.SliceImported,
},
},
},
},
{
state: model.FileWriting,
volumes: []volume{
{
slices: []model.SliceState{
model.SliceWriting,
},
},
{
slices: []model.SliceState{
model.SliceWriting,
},
},
},
},
},
})

ts.logSection(t, "testing second upload")
ts.sendRecords(t, ctx, 31, 20)
if ts.proxy != nil {
ts.proxy.Disable()
time.Sleep(100 * time.Millisecond)
ts.proxy.Enable()
}

ts.logSection(t, "testing second import")
ts.testFileImport(t, ctx, fileImport{
sendRecords: records{
startID: 51,
count: 10,
},
expectedFileRecords: records{
startID: 31,
count: 30,
},
expectedTableRecords: records{
startID: 1,
count: 60,
},
expectedFiles: []file{
{
state: model.FileImported,
volumes: []volume{
{
slices: []model.SliceState{
model.SliceImported,
model.SliceImported,
},
},
{
slices: []model.SliceState{
model.SliceImported,
model.SliceImported,
},
},
},
},
{
state: model.FileImported, // <<<<<
volumes: []volume{
{
slices: []model.SliceState{
model.SliceImported,
model.SliceImported,
},
},
{
slices: []model.SliceState{
model.SliceImported,
model.SliceImported,
},
},
},
},
{
state: model.FileWriting,
volumes: []volume{
{
slices: []model.SliceState{
model.SliceWriting,
},
},
{
slices: []model.SliceState{
model.SliceWriting,
},
},
},
},
},
})

// Test simultaneous slice and file rotations
ts.logSection(t, "testing simultaneous file and slice rotations, both conditions are met")
ts.logger.Truncate()
ts.sendRecords(t, ctx, 61, 69)
assert.EventuallyWithT(t, func(c *assert.CollectT) {
ts.logger.AssertJSONMessages(c, `
{"level":"info","message":"closed file","component":"storage.node.operator.file.rotation"}
{"level":"info","message":"importing file","component":"storage.node.operator.file.import"}
{"level":"info","message":"imported file","component":"storage.node.operator.file.import"}
`)
}, 60*time.Second, 100*time.Millisecond)
ts.checkKeboolaTable(t, ctx, 1, 129)
}

func (ts *testState) testSlicesUpload(t *testing.T, ctx context.Context, expectations sliceUpload) {
t.Helper()
ts.logger.Truncate()
Expand All @@ -326,7 +565,7 @@ func (ts *testState) testSlicesUpload(t *testing.T, ctx context.Context, expecta
{"level":"info","message":"closed slice","component":"storage.node.operator.slice.rotation"}
{"level":"info","message":"closed slice","component":"storage.node.operator.slice.rotation"}
`)
}, 10*time.Second, 10*time.Millisecond)
}, 20*time.Second, 10*time.Millisecond)

// Expect slices upload
ts.logSection(t, "expecting slices upload")
Expand All @@ -339,7 +578,7 @@ func (ts *testState) testSlicesUpload(t *testing.T, ctx context.Context, expecta
{"level":"info","message":"uploaded slice","component":"storage.node.operator.slice.upload"}
{"level":"info","message":"uploaded slice","component":"storage.node.operator.slice.upload"}
`)
}, 15*time.Second, 10*time.Millisecond)
}, 20*time.Second, 10*time.Millisecond)

// Check file/slices state after the upload
files := ts.checkState(t, ctx, expectations.expectedFiles)
Expand Down Expand Up @@ -409,6 +648,16 @@ func (ts *testState) testFileImport(t *testing.T, ctx context.Context, expectati
{"level":"info","message":"rotating file, import conditions met: count threshold met, records count: 30, threshold: 30","component":"storage.node.operator.file.rotation"}
{"level":"info","message":"rotated file","component":"storage.node.operator.file.rotation"}
`)

var files []model.File
files, err := ts.coordinatorScp1.StorageRepository().File().ListInState(ts.sinkKey, model.FileWriting).Do(ctx).All()
require.NoError(t, err)
for _, file := range files {
stats, err := ts.coordinatorScp1.StatisticsRepository().FileStats(ctx, file.FileKey)
if err == nil && stats.Local.RecordsCount == uint64(expectations.sendRecords.count) {
return
}
}
}, 60*time.Second, 100*time.Millisecond)

// Expect slices closing, upload and file closing
Expand All @@ -432,7 +681,7 @@ func (ts *testState) testFileImport(t *testing.T, ctx context.Context, expectati
ts.logger.AssertJSONMessages(c, `
{"level":"info","message":"closed file","component":"storage.node.operator.file.rotation"}
`)
}, 15*time.Second, 100*time.Millisecond)
}, 25*time.Second, 100*time.Millisecond)

// Expect file import
ts.logSection(t, "expecting file import")
Expand Down Expand Up @@ -487,13 +736,33 @@ func (ts *testState) sendRecords(t *testing.T, ctx context.Context, start, n int
sourceURL = ts.sourceURL2
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, sourceURL, strings.NewReader(fmt.Sprintf("foo%d", start+i)))
require.NoError(t, err)
resp, err := ts.httpClient.Do(req)
if assert.NoError(t, err) {
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.NoError(t, resp.Body.Close())
}
go func() {
var err error
for {
var files []model.File
files, err = ts.coordinatorScp1.StorageRepository().File().ListInState(ts.sinkKey, model.FileWriting).Do(ctx).All()
require.NoError(t, err)
for _, file := range files {
stats, err := ts.coordinatorScp1.StatisticsRepository().FileStats(ctx, file.FileKey)
if err == nil && stats.Local.RecordsCount == uint64(start+n-1) {
return
}
}

var req *http.Request
req, err = http.NewRequestWithContext(ctx, http.MethodPost, sourceURL, strings.NewReader(fmt.Sprintf("foo%d", start+i)))
require.NoError(t, err)
var resp *http.Response
resp, err = ts.httpClient.Do(req)
if err == nil {
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.NoError(t, resp.Body.Close())
return
}

time.Sleep(10 * time.Millisecond)
}
}()
}
}

Expand Down
Loading
Loading