From e8f1d7ec28a2b35060274338d278be1cda78f63b Mon Sep 17 00:00:00 2001 From: Robert Gettys Date: Mon, 7 Oct 2024 16:41:02 -0400 Subject: [PATCH 1/3] Release connections on error during Flush() --- conn_batch.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/conn_batch.go b/conn_batch.go index a729b2d238..d65300621c 100644 --- a/conn_batch.go +++ b/conn_batch.go @@ -23,6 +23,7 @@ import ( "os" "regexp" "slices" + "syscall" "time" "github.com/pkg/errors" @@ -286,6 +287,10 @@ func (b *batch) Flush() error { } if b.block.Rows() != 0 { if err := b.conn.sendData(b.block, ""); err != nil { + // broken pipe/conn reset aren't generally recoverable on retry + if errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) { + b.release(err) + } return err } if b.closeOnFlush { From 22589fad016a633fd8ef21c81c00cbd9a7d34037 Mon Sep 17 00:00:00 2001 From: Robert Gettys Date: Thu, 10 Oct 2024 11:43:34 -0400 Subject: [PATCH 2/3] Add test to illustrate broken batch flushes --- tests/broken_connection_test.go | 63 +++++++++++++++++++++++++++++++++ tests/utils.go | 12 ++++--- 2 files changed, 70 insertions(+), 5 deletions(-) create mode 100644 tests/broken_connection_test.go diff --git a/tests/broken_connection_test.go b/tests/broken_connection_test.go new file mode 100644 index 0000000000..20db042c43 --- /dev/null +++ b/tests/broken_connection_test.go @@ -0,0 +1,63 @@ +package tests + +import ( + "context" + "errors" + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/docker/docker/api/types/container" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "os" + "syscall" + "testing" +) + +//goland:noinspection ALL +const insertQry = "INSERT INTO test (foo, foo2)" + +func TestBatchFlushBrokenConn(t *testing.T) { + env, err := GetNativeTestEnvironment() + require.NoError(t, err) + require.NotNil(t, env) + ctx := context.Background() + client, err := testcontainers.NewDockerClientWithOpts(ctx) + require.NoError(t, err) + chClient, err := getConnection(env, env.Database, clickhouse.Settings{}, nil, &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }) + + err = chClient.Exec(ctx, "CREATE TABLE test (foo String, foo2 String) ENGINE = MergeTree ORDER BY (foo)") + require.NoError(t, err) + batch, err := chClient.PrepareBatch(ctx, insertQry, driver.WithCloseOnFlush()) + require.NoError(t, err) + err = batch.Append("bar", "bar") + require.NoError(t, err) + err = batch.Flush() + require.NoError(t, err) + err = batch.Append("bar2", "bar2") + require.NoError(t, err) + err = batch.Flush() + require.NoError(t, err) + + err = batch.Append(RandAsciiString(200000000), RandAsciiString(20000000)) + + require.NoError(t, err) + ch := make(chan struct{}) + go func() { + err = batch.Flush() + close(ch) + }() + //timeout := 0 + err2 := client.ContainerKill(ctx, env.ContainerID, "KILL") + <-ch + require.NoError(t, err2) + require.True(t, errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET)) + err = client.ContainerStart(ctx, env.ContainerID, container.StartOptions{}) + require.NoError(t, err) + err = batch.Flush() + // retry after server is up should have either no error, or a reconnect error (for example because the mapped port + // changed on container startup) + require.True(t, err == nil || errors.Is(err, syscall.ECONNREFUSED) || os.IsTimeout(err), err) + +} diff --git a/tests/utils.go b/tests/utils.go index b33aa2c918..2eda3fa62f 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -61,6 +61,7 @@ func GetClickHouseTestVersion() string { } type ClickHouseTestEnvironment struct { + ContainerID string Port int HttpPort int SslPort int @@ -203,11 +204,12 @@ func CreateClickHouseTestEnvironment(testSet string) (ClickHouseTestEnvironment, hps, _ := clickhouseContainer.MappedPort(ctx, "8443") ip, _ := clickhouseContainer.ContainerIP(ctx) testEnv := ClickHouseTestEnvironment{ - Port: p.Int(), - HttpPort: hp.Int(), - SslPort: sslPort.Int(), - HttpsPort: hps.Int(), - Host: "127.0.0.1", + ContainerID: clickhouseContainer.GetContainerID(), + Port: p.Int(), + HttpPort: hp.Int(), + SslPort: sslPort.Int(), + HttpsPort: hps.Int(), + Host: "127.0.0.1", // we set this explicitly - note its also set in the /etc/clickhouse-server/users.d/admin.xml Username: "default", Password: "ClickHouse", From 957b9f6bdf9e737f1352c136170a6188adbe0cf1 Mon Sep 17 00:00:00 2001 From: Kuba Kaflik Date: Tue, 15 Oct 2024 10:44:47 +0200 Subject: [PATCH 3/3] Use a dedicated test environment for broken connection test recover #1421 --- .../1421_test.go} | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) rename tests/{broken_connection_test.go => issues/1421_test.go} (78%) diff --git a/tests/broken_connection_test.go b/tests/issues/1421_test.go similarity index 78% rename from tests/broken_connection_test.go rename to tests/issues/1421_test.go index 20db042c43..904ee4fde7 100644 --- a/tests/broken_connection_test.go +++ b/tests/issues/1421_test.go @@ -1,31 +1,35 @@ -package tests +package issues import ( "context" "errors" - "github.com/ClickHouse/clickhouse-go/v2" + "os" + "syscall" + "testing" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/ClickHouse/clickhouse-go/v2/tests" "github.com/docker/docker/api/types/container" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" - "os" - "syscall" - "testing" ) //goland:noinspection ALL const insertQry = "INSERT INTO test (foo, foo2)" -func TestBatchFlushBrokenConn(t *testing.T) { - env, err := GetNativeTestEnvironment() +func Test1421BatchFlushBrokenConn(t *testing.T) { + // create a dedicated test environment for this test + // note: test environment management is a bit messy, consider refactoring + env, err := tests.CreateClickHouseTestEnvironment(t.Name()) + tests.SetTestEnvironment(t.Name(), env) + require.NoError(t, tests.CreateDatabase(t.Name())) + require.NoError(t, err) require.NotNil(t, env) ctx := context.Background() client, err := testcontainers.NewDockerClientWithOpts(ctx) require.NoError(t, err) - chClient, err := getConnection(env, env.Database, clickhouse.Settings{}, nil, &clickhouse.Compression{ - Method: clickhouse.CompressionLZ4, - }) + chClient, err := tests.TestClientWithDefaultSettings(env) err = chClient.Exec(ctx, "CREATE TABLE test (foo String, foo2 String) ENGINE = MergeTree ORDER BY (foo)") require.NoError(t, err)