Skip to content

Commit

Permalink
fix context cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
jkaflik committed Oct 11, 2023
1 parent 4b8830c commit cc83e07
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
11 changes: 8 additions & 3 deletions conn_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ func (b *batch) Column(idx int) driver.BatchColumn {

func (b *batch) Send() (err error) {
stopCW := contextWatchdog(b.ctx, func() {
_ = b.conn.close()
// close TCP connection on context cancel. There is no other way simple way to interrupt underlying operations.
// as verified in the test, this is safe to do and cleanups resources later on
_ = b.conn.conn.Close()
})

defer func() {
Expand All @@ -197,8 +199,11 @@ func (b *batch) Send() (err error) {
}
if b.block.Rows() != 0 {
if err = b.conn.sendData(b.block, ""); err != nil {
// todo: return context.DeadlineExceeded if deadline is exceeded
// use b.err ?
// there might be an error caused by context cancellation
// in this case we should return context error instead of net.OpError
if ctxErr := b.ctx.Err(); ctxErr != nil {
return ctxErr
}

return err
}
Expand Down
21 changes: 16 additions & 5 deletions tests/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,27 @@ import (
)

func TestBatchContextCancellation(t *testing.T) {
conn, err := GetNativeConnection(nil, nil, &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
})
te, err := GetTestEnvironment(testSet)
require.NoError(t, err)
opts := ClientOptionsFromEnv(te, clickhouse.Settings{})
opts.MaxOpenConns = 1
conn, err := GetConnectionWithOptions(&opts)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Microsecond)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

b, err := conn.PrepareBatch(ctx, "INSERT INTO test_batch_ctx_cancellation (x)")
require.NoError(t, conn.Exec(context.Background(), "create table if not exists test_batch_cancellation (x String) engine=Memory"))
defer conn.Exec(context.Background(), "drop table if exists test_batch_cancellation")

b, err := conn.PrepareBatch(ctx, "insert into test_batch_cancellation")
require.NoError(t, err)
for i := 0; i < 1_000_000; i++ {
require.NoError(t, b.Append("value"))
}

require.Equal(t, context.DeadlineExceeded, b.Send())

// assert if connection is properly released after context cancellation
require.NoError(t, conn.Exec(context.Background(), "SELECT 1"))
}

0 comments on commit cc83e07

Please sign in to comment.