diff --git a/clickhouse.go b/clickhouse.go index 1872484..ae2c0ed 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -40,6 +40,7 @@ type ( ) var ( + ErrStreamingBufferClosed = errors.New("proton: streaming buffer has already been closed") ErrBatchAlreadySent = errors.New("proton: batch has already been sent") ErrAcquireConnTimeout = errors.New("proton: acquire conn timeout. you can increase the number of max open conn or the dial timeout") ErrUnsupportedServerRevision = errors.New("proton: unsupported server revision") @@ -146,6 +147,14 @@ func (ch *proton) PrepareBatch(ctx context.Context, query string) (driver.Batch, return conn.prepareBatch(ctx, query, ch.release) } +func (ch *proton) PrepareStreamingBuffer(ctx context.Context, query string) (driver.StreamingBuffer, error) { + conn, err := ch.acquire(ctx) + if err != nil { + return nil, err + } + return conn.prepareStreamingBuffer(ctx, query, ch.release) +} + func (ch *proton) AsyncInsert(ctx context.Context, query string, wait bool) error { conn, err := ch.acquire(ctx) if err != nil { diff --git a/conn_streaming_buffer.go b/conn_streaming_buffer.go new file mode 100644 index 0000000..6e73162 --- /dev/null +++ b/conn_streaming_buffer.go @@ -0,0 +1,208 @@ +package proton + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "time" + + "github.com/timeplus-io/proton-go-driver/v2/lib/column" + "github.com/timeplus-io/proton-go-driver/v2/lib/driver" + "github.com/timeplus-io/proton-go-driver/v2/lib/proto" +) + +func (c *connect) prepareStreamingBuffer(ctx context.Context, query string, release func(*connect, error)) (*streamingBuffer, error) { + query = splitInsertRe.Split(query, -1)[0] + if !strings.HasSuffix(strings.TrimSpace(strings.ToUpper(query)), "VALUES") { + query += " VALUES" + } + options := queryOptions(ctx) + if deadline, ok := ctx.Deadline(); ok { + c.conn.SetDeadline(deadline) + defer c.conn.SetDeadline(time.Time{}) + } + if err := c.sendQuery(query, &options); err != nil { + release(c, err) + return nil, err + } + var ( + onProcess = options.onProcess() + block, err = c.firstBlock(ctx, onProcess) + ) + if err != nil { + release(c, err) + return nil, err + } + return &streamingBuffer{ + ctx: ctx, + conn: c, + block: block, + release: func(err error) { + release(c, err) + }, + onProcess: onProcess, + done: make(chan struct{}), + }, nil +} + +type streamingBuffer struct { + err error + ctx context.Context + conn *connect + sent bool + block *proto.Block + release func(error) + onProcess *onProcess + once sync.Once + done chan struct{} +} + +func (b *streamingBuffer) Append(v ...interface{}) error { + if b.sent { + return ErrStreamingBufferClosed + } + if err := b.block.Append(v...); err != nil { + b.release(err) + return err + } + return nil +} + +func (b *streamingBuffer) AppendStruct(v interface{}) error { + values, err := b.conn.structMap.Map("AppendStruct", b.block.ColumnsNames(), v, false) + if err != nil { + return err + } + return b.Append(values...) +} + +func (b *streamingBuffer) Column(idx int) driver.StreamingBufferColumn { + if len(b.block.Columns) <= idx { + b.release(nil) + return &streamingBufferColumn{ + err: &OpError{ + Op: "streamingBuffer.Column", + Err: fmt.Errorf("invalid column index %d", idx), + }, + } + } + return &streamingBufferColumn{ + buffer: b, + column: b.block.Columns[idx], + release: func(err error) { + b.err = err + b.release(err) + }, + } +} + +func (b *streamingBuffer) Close() (err error) { + defer func() { + b.sent = true + b.release(err) + }() + if err = b.Send(); err != nil { + return err + } + if err = b.conn.sendData(&proto.Block{}, ""); err != nil { + return err + } + if err = b.conn.encoder.Flush(); err != nil { + return err + } + <-b.done + return nil +} + +func (b *streamingBuffer) Clear() (err error) { + for i := range b.block.Columns { + b.block.Columns[i], err = b.block.Columns[i].Type().Column() + if err != nil { + return + } + } + return nil +} + +func (b *streamingBuffer) Send() (err error) { + if b.sent { + return ErrStreamingBufferClosed + } + if b.err != nil { + return b.err + } + if b.block.Rows() != 0 { + if err = b.conn.sendData(b.block, ""); err != nil { + return err + } + } + if err = b.conn.encoder.Flush(); err != nil { + return err + } + b.once.Do(func() { + go func() { + if err = b.conn.process(b.ctx, b.onProcess); err != nil { + b.err = err + } + b.done <- struct{}{} + }() + }) + if err = b.Clear(); err != nil { + return err + } + return nil +} + +func (b *streamingBuffer) ReplaceBy(cols ...column.Interface) (err error) { + if b.sent { + return ErrStreamingBufferClosed + } + if b.err != nil { + return b.err + } + if len(b.block.Columns) != len(cols) { + return errors.New(fmt.Sprintf("colomn number is %d, not %d", len(b.block.Columns), len(cols))) + } + for i := 0; i < len(cols); i++ { + if b.block.Columns[i].Type() != cols[i].Type() { + return errors.New(fmt.Sprintf("type of colomn[%d] is %s, not %s", i, b.block.Columns[i].Type(), cols[i].Type())) + } + } + rows := cols[0].Rows() + for i := 1; i < len(cols); i++ { + if rows != cols[i].Rows() { + return errors.New("cols with different length") + } + } + b.block.Columns = cols + return nil +} + +type streamingBufferColumn struct { + err error + buffer *streamingBuffer + column column.Interface + release func(error) +} + +func (b *streamingBufferColumn) Append(v interface{}) (err error) { + if b.buffer.sent { + return ErrStreamingBufferClosed + } + if b.err != nil { + b.release(b.err) + return b.err + } + if _, err = b.column.Append(v); err != nil { + b.release(err) + return err + } + return nil +} + +var ( + _ (driver.StreamingBuffer) = (*streamingBuffer)(nil) + _ (driver.StreamingBufferColumn) = (*streamingBufferColumn)(nil) +) diff --git a/examples/streaming/buffer_replace/main.go b/examples/streaming/buffer_replace/main.go new file mode 100644 index 0000000..a2565df --- /dev/null +++ b/examples/streaming/buffer_replace/main.go @@ -0,0 +1,76 @@ +package main + +import ( + "context" + "fmt" + "github.com/timeplus-io/proton-go-driver/v2" + "github.com/timeplus-io/proton-go-driver/v2/lib/column" + "log" + "time" +) + +func BufferReplaceExample() { + var ( + ctx = context.Background() + conn, err = proton.Open(&proton.Options{ + Addr: []string{"127.0.0.1:8463"}, + Auth: proton.Auth{ + Database: "default", + Username: "default", + Password: "", + }, + //Debug: true, + DialTimeout: time.Second, + MaxOpenConns: 10, + MaxIdleConns: 5, + ConnMaxLifetime: time.Hour, + }) + ) + ctx = proton.Context(ctx, proton.WithProgress(func(p *proton.Progress) { + fmt.Println("progress: ", p) + })) + if err != nil { + log.Fatal(err) + } + if err := conn.Exec(ctx, `DROP STREAM IF EXISTS example`); err != nil { + log.Fatal(err) + } + err = conn.Exec(ctx, ` + CREATE STREAM IF NOT EXISTS example ( + Col1 uint64 + , Col2 string + ) + `) + if err != nil { + log.Fatal(err) + } + const rows = 200_000 + var ( + col1 column.UInt64 = make([]uint64, rows) + col2 column.String = make([]string, rows) + ) + for i := 0; i < rows; i++ { + col1[i] = uint64(i) + col2[i] = fmt.Sprintf("num%03d", i) + } + buffer, err := conn.PrepareStreamingBuffer(ctx, "INSERT INTO example (* except _tp_time)") + err = buffer.ReplaceBy( + &col1, + &col2, + ) + if err != nil { + return + } + err = buffer.Send() + if err != nil { + return + } + err = buffer.Close() + if err != nil { + return + } +} + +func main() { + BufferReplaceExample() +} diff --git a/examples/streaming/streaming_send/main.go b/examples/streaming/streaming_send/main.go new file mode 100644 index 0000000..2855a5a --- /dev/null +++ b/examples/streaming/streaming_send/main.go @@ -0,0 +1,75 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/timeplus-io/proton-go-driver/v2" +) + +func example() error { + var ( + ctx = context.Background() + conn, err = proton.Open(&proton.Options{ + Addr: []string{"127.0.0.1:8463"}, + Auth: proton.Auth{ + Database: "default", + Username: "default", + Password: "", + }, + // Debug: true, + DialTimeout: time.Second, + MaxOpenConns: 10, + MaxIdleConns: 5, + ConnMaxLifetime: time.Hour, + }) + ) + ctx = proton.Context(ctx, proton.WithProgress(func(p *proton.Progress) { + fmt.Println("progress: ", p) + })) + if err != nil { + return err + } + if err := conn.Exec(ctx, `DROP STREAM IF EXISTS example`); err != nil { + return err + } + err = conn.Exec(ctx, ` + CREATE STREAM IF NOT EXISTS example ( + Col1 uint64 + , Col2 string + ) + `) + if err != nil { + return err + } + + buffer, err := conn.PrepareStreamingBuffer(ctx, "INSERT INTO example (* except _tp_time)") + if err != nil { + return err + } + for i := 0; i < 100; i++ { + for j := 0; j < 100000; j++ { + err := buffer.Append( + uint64(i*100000+j), + fmt.Sprintf("num_%d_%d", j, i), + ) + if err != nil { + return err + } + } + if err := buffer.Send(); err != nil { + return err + } + } + return buffer.Close() +} + +func main() { + start := time.Now() + if err := example(); err != nil { + log.Fatal(err) + } + fmt.Println(time.Since(start)) +} diff --git a/lib/driver/driver.go b/lib/driver/driver.go index adbca2c..f0ff400 100644 --- a/lib/driver/driver.go +++ b/lib/driver/driver.go @@ -19,6 +19,7 @@ package driver import ( "context" + "github.com/timeplus-io/proton-go-driver/v2/lib/column" "reflect" "github.com/timeplus-io/proton-go-driver/v2/lib/proto" @@ -47,6 +48,7 @@ type ( Query(ctx context.Context, query string, args ...interface{}) (Rows, error) QueryRow(ctx context.Context, query string, args ...interface{}) Row PrepareBatch(ctx context.Context, query string) (Batch, error) + PrepareStreamingBuffer(ctx context.Context, query string) (StreamingBuffer, error) Exec(ctx context.Context, query string, args ...interface{}) error AsyncInsert(ctx context.Context, query string, wait bool) error Ping(context.Context) error @@ -75,6 +77,18 @@ type ( Column(int) BatchColumn Send() error } + StreamingBuffer interface { + Append(v ...interface{}) error + AppendStruct(v interface{}) error + Column(int) StreamingBufferColumn + Clear() error + Send() error + Close() error + ReplaceBy(cols ...column.Interface) error + } + StreamingBufferColumn interface { + Append(interface{}) error + } BatchColumn interface { Append(interface{}) error }