Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry for batch API #941

Merged
merged 7 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import (
"sync/atomic"
"time"

_ "time/tzdata"

"github.com/ClickHouse/clickhouse-go/v2/contributors"
"github.com/ClickHouse/clickhouse-go/v2/lib/column"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/ClickHouse/clickhouse-go/v2/lib/proto"
_ "time/tzdata"
)

type Conn = driver.Conn
Expand All @@ -43,10 +44,12 @@ type (
var (
ErrBatchInvalid = errors.New("clickhouse: batch is invalid. check appended data is correct")
ErrBatchAlreadySent = errors.New("clickhouse: batch has already been sent")
ErrBatchNotSent = errors.New("clickhouse: invalid retry, batch not sent yet")
ErrAcquireConnTimeout = errors.New("clickhouse: acquire conn timeout. you can increase the number of max open conn or the dial timeout")
ErrUnsupportedServerRevision = errors.New("clickhouse: unsupported server revision")
ErrBindMixedParamsFormats = errors.New("clickhouse [bind]: mixed named, numeric or positional parameters")
ErrAcquireConnNoAddress = errors.New("clickhouse: no valid address supplied")
ErrServerUnexpectedData = errors.New("code: 101, message: Unexpected packet Data received from client")
)

type OpError struct {
Expand Down Expand Up @@ -153,7 +156,7 @@ func (ch *clickhouse) PrepareBatch(ctx context.Context, query string) (driver.Ba
if err != nil {
return nil, err
}
batch, err := conn.prepareBatch(ctx, query, ch.release)
batch, err := conn.prepareBatch(ctx, query, ch.release, ch.acquire)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions clickhouse_std.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ type stdConnect interface {
query(ctx context.Context, release func(*connect, error), query string, args ...any) (*rows, error)
exec(ctx context.Context, query string, args ...any) error
ping(ctx context.Context) (err error)
prepareBatch(ctx context.Context, query string, release func(*connect, error)) (ldriver.Batch, error)
prepareBatch(ctx context.Context, query string, release func(*connect, error), acquire func(context.Context) (*connect, error)) (ldriver.Batch, error)
asyncInsert(ctx context.Context, query string, wait bool) error
}

Expand Down Expand Up @@ -273,7 +273,7 @@ func (std *stdDriver) Prepare(query string) (driver.Stmt, error) {
}

func (std *stdDriver) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
batch, err := std.conn.prepareBatch(ctx, query, func(*connect, error) {})
batch, err := std.conn.prepareBatch(ctx, query, func(*connect, error) {}, func(context.Context) (*connect, error) { return nil, nil })
if err != nil {
if isConnBrokenError(err) {
std.debugf("PrepareContext got a fatal error, resetting connection: %v\n", err)
Expand Down
24 changes: 21 additions & 3 deletions conn_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ package clickhouse
import (
"context"
"fmt"
"github.com/pkg/errors"
"os"
"regexp"
"strings"
"time"

"github.com/pkg/errors"

"github.com/ClickHouse/clickhouse-go/v2/lib/column"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/ClickHouse/clickhouse-go/v2/lib/proto"
Expand All @@ -34,7 +35,7 @@ import (
var splitInsertRe = regexp.MustCompile(`(?i)\sVALUES\s*\(`)
var columnMatch = regexp.MustCompile(`.*\((?P<Columns>.+)\)$`)

func (c *connect) prepareBatch(ctx context.Context, query string, release func(*connect, error)) (driver.Batch, error) {
func (c *connect) prepareBatch(ctx context.Context, query string, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) {
//defer func() {
// if err := recover(); err != nil {
// fmt.Printf("panic occurred on %d:\n", c.num)
Expand Down Expand Up @@ -79,6 +80,7 @@ func (c *connect) prepareBatch(ctx context.Context, query string, release func(*
block: block,
released: false,
connRelease: release,
connAcquire: acquire,
onProcess: onProcess,
}, nil
}
Expand All @@ -91,6 +93,7 @@ type batch struct {
released bool
block *proto.Block
connRelease func(*connect, error)
connAcquire func(context.Context) (*connect, error)
onProcess *onProcess
}

Expand Down Expand Up @@ -168,7 +171,7 @@ func (b *batch) Send() (err error) {
b.release(err)
}()
if b.sent {
return ErrBatchAlreadySent
return b.retry()
}
if b.err != nil {
return b.err
Expand All @@ -187,6 +190,21 @@ func (b *batch) Send() (err error) {
return nil
}

func (b *batch) retry() (err error) {
// exit early if Send() hasn't been attepted
if !b.sent {
return ErrBatchNotSent
}

// acquire a new conn
if b.conn, err = b.connAcquire(b.ctx); err != nil {
return err
}
b.sent = false
b.released = false
return b.Send()
}

func (b *batch) Flush() error {
if b.sent {
return ErrBatchAlreadySent
Expand Down
2 changes: 1 addition & 1 deletion conn_http_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
var httpInsertRe = regexp.MustCompile(`(?i)^INSERT INTO\s+\x60?([\w.^\(]+)\x60?\s*(\([^\)]*\))?`)

// release is ignored, because http used by std with empty release function
func (h *httpConnect) prepareBatch(ctx context.Context, query string, release func(*connect, error)) (driver.Batch, error) {
func (h *httpConnect) prepareBatch(ctx context.Context, query string, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) {
matches := httpInsertRe.FindStringSubmatch(query)
if len(matches) < 3 {
return nil, errors.New("cannot get table name from query")
Expand Down
11 changes: 6 additions & 5 deletions tests/issues/798_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package issues

import (
"context"
"github.com/ClickHouse/clickhouse-go/v2"
clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
"github.com/stretchr/testify/require"
"sync"
"sync/atomic"
"testing"

"github.com/ClickHouse/clickhouse-go/v2"
clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
"github.com/stretchr/testify/require"
)

func Test798(t *testing.T) {
Expand Down Expand Up @@ -52,8 +53,8 @@ func Test798(t *testing.T) {
require.NoError(t, err)
require.NoError(t, batch.Append(true, false, []bool{true, false, true}))
require.NoError(t, batch.Send())
// test resend
require.ErrorIs(t, batch.Send(), clickhouse.ErrBatchAlreadySent)
// resend
require.ErrorAs(t, batch.Send(), &clickhouse.ErrServerUnexpectedData)
batch, err = conn.PrepareBatch(ctx, "INSERT INTO test_issue_798")
require.NoError(t, err)
// test empty batch
Expand Down