Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/issue 59 support streaming insert #60

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type (
)

var (
ErrStreamingBufferClosed = errors.New("proton: streaming buffer has already been closed")
ErrBatchAlreadySent = errors.New("proton: batch has already been sent")
ErrAcquireConnTimeout = errors.New("proton: acquire conn timeout. you can increase the number of max open conn or the dial timeout")
ErrUnsupportedServerRevision = errors.New("proton: unsupported server revision")
Expand Down Expand Up @@ -146,6 +147,14 @@ func (ch *proton) PrepareBatch(ctx context.Context, query string) (driver.Batch,
return conn.prepareBatch(ctx, query, ch.release)
}

func (ch *proton) PrepareStreamingBuffer(ctx context.Context, query string) (driver.StreamingBuffer, error) {
conn, err := ch.acquire(ctx)
if err != nil {
return nil, err
}
return conn.prepareStreamingBuffer(ctx, query, ch.release)
}

func (ch *proton) AsyncInsert(ctx context.Context, query string, wait bool) error {
conn, err := ch.acquire(ctx)
if err != nil {
Expand Down
208 changes: 208 additions & 0 deletions conn_streaming_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package proton

import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/timeplus-io/proton-go-driver/v2/lib/column"
"github.com/timeplus-io/proton-go-driver/v2/lib/driver"
"github.com/timeplus-io/proton-go-driver/v2/lib/proto"
)

func (c *connect) prepareStreamingBuffer(ctx context.Context, query string, release func(*connect, error)) (*streamingBuffer, error) {
query = splitInsertRe.Split(query, -1)[0]
if !strings.HasSuffix(strings.TrimSpace(strings.ToUpper(query)), "VALUES") {
query += " VALUES"
}
options := queryOptions(ctx)
if deadline, ok := ctx.Deadline(); ok {
c.conn.SetDeadline(deadline)
defer c.conn.SetDeadline(time.Time{})
}
if err := c.sendQuery(query, &options); err != nil {
release(c, err)
return nil, err
}
var (
onProcess = options.onProcess()
block, err = c.firstBlock(ctx, onProcess)
)
if err != nil {
release(c, err)
return nil, err
}
return &streamingBuffer{
ctx: ctx,
conn: c,
block: block,
release: func(err error) {
release(c, err)
},
onProcess: onProcess,
done: make(chan struct{}),
}, nil
}

type streamingBuffer struct {
err error
ctx context.Context
conn *connect
sent bool
block *proto.Block
release func(error)
onProcess *onProcess
once sync.Once
done chan struct{}
}

func (b *streamingBuffer) Append(v ...interface{}) error {
if b.sent {
return ErrStreamingBufferClosed
}
if err := b.block.Append(v...); err != nil {
b.release(err)
return err
}
return nil
}

func (b *streamingBuffer) AppendStruct(v interface{}) error {
values, err := b.conn.structMap.Map("AppendStruct", b.block.ColumnsNames(), v, false)
if err != nil {
return err
}
return b.Append(values...)
}

func (b *streamingBuffer) Column(idx int) driver.StreamingBufferColumn {
if len(b.block.Columns) <= idx {
b.release(nil)
return &streamingBufferColumn{
err: &OpError{
Op: "streamingBuffer.Column",
Err: fmt.Errorf("invalid column index %d", idx),
},
}
}
return &streamingBufferColumn{
buffer: b,
column: b.block.Columns[idx],
release: func(err error) {
b.err = err
b.release(err)
},
}
}

func (b *streamingBuffer) Close() (err error) {
defer func() {
b.sent = true
b.release(err)
}()
if err = b.Send(); err != nil {
return err
}
if err = b.conn.sendData(&proto.Block{}, ""); err != nil {
return err
}
if err = b.conn.encoder.Flush(); err != nil {
return err
}
<-b.done
return nil
}

func (b *streamingBuffer) Clear() (err error) {
for i := range b.block.Columns {
b.block.Columns[i], err = b.block.Columns[i].Type().Column()
if err != nil {
return
}
}
return nil
}

func (b *streamingBuffer) Send() (err error) {
if b.sent {
return ErrStreamingBufferClosed
}
if b.err != nil {
return b.err
}
if b.block.Rows() != 0 {
if err = b.conn.sendData(b.block, ""); err != nil {
return err
}
}
if err = b.conn.encoder.Flush(); err != nil {
return err
}
b.once.Do(func() {
go func() {
if err = b.conn.process(b.ctx, b.onProcess); err != nil {
b.err = err
}
b.done <- struct{}{}
}()
})
if err = b.Clear(); err != nil {
return err
}
return nil
}

func (b *streamingBuffer) ReplaceBy(cols ...column.Interface) (err error) {
if b.sent {
return ErrStreamingBufferClosed
}
if b.err != nil {
return b.err
}
if len(b.block.Columns) != len(cols) {
return errors.New(fmt.Sprintf("colomn number is %d, not %d", len(b.block.Columns), len(cols)))
}
for i := 0; i < len(cols); i++ {
if b.block.Columns[i].Type() != cols[i].Type() {
return errors.New(fmt.Sprintf("type of colomn[%d] is %s, not %s", i, b.block.Columns[i].Type(), cols[i].Type()))
}
}
rows := cols[0].Rows()
for i := 1; i < len(cols); i++ {
if rows != cols[i].Rows() {
return errors.New("cols with different length")
}
}
b.block.Columns = cols
return nil
}

type streamingBufferColumn struct {
err error
buffer *streamingBuffer
column column.Interface
release func(error)
}

func (b *streamingBufferColumn) Append(v interface{}) (err error) {
if b.buffer.sent {
return ErrStreamingBufferClosed
}
if b.err != nil {
b.release(b.err)
return b.err
}
if _, err = b.column.Append(v); err != nil {
b.release(err)
return err
}
return nil
}

var (
_ (driver.StreamingBuffer) = (*streamingBuffer)(nil)
_ (driver.StreamingBufferColumn) = (*streamingBufferColumn)(nil)
)
76 changes: 76 additions & 0 deletions examples/streaming/buffer_replace/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package main

import (
"context"
"fmt"
"github.com/timeplus-io/proton-go-driver/v2"
"github.com/timeplus-io/proton-go-driver/v2/lib/column"
"log"
"time"
)

func BufferReplaceExample() {
var (
ctx = context.Background()
conn, err = proton.Open(&proton.Options{
Addr: []string{"127.0.0.1:8463"},
Auth: proton.Auth{
Database: "default",
Username: "default",
Password: "",
},
//Debug: true,
DialTimeout: time.Second,
MaxOpenConns: 10,
MaxIdleConns: 5,
ConnMaxLifetime: time.Hour,
})
)
ctx = proton.Context(ctx, proton.WithProgress(func(p *proton.Progress) {
fmt.Println("progress: ", p)
}))
if err != nil {
log.Fatal(err)
}
if err := conn.Exec(ctx, `DROP STREAM IF EXISTS example`); err != nil {
log.Fatal(err)
}
err = conn.Exec(ctx, `
CREATE STREAM IF NOT EXISTS example (
Col1 uint64
, Col2 string
)
`)
if err != nil {
log.Fatal(err)
}
const rows = 200_000
var (
col1 column.UInt64 = make([]uint64, rows)
col2 column.String = make([]string, rows)
)
for i := 0; i < rows; i++ {
col1[i] = uint64(i)
col2[i] = fmt.Sprintf("num%03d", i)
}
buffer, err := conn.PrepareStreamingBuffer(ctx, "INSERT INTO example (* except _tp_time)")
err = buffer.ReplaceBy(
&col1,
&col2,
)
if err != nil {
return
}
err = buffer.Send()
if err != nil {
return
}
err = buffer.Close()
if err != nil {
return
}
}

func main() {
BufferReplaceExample()
}
75 changes: 75 additions & 0 deletions examples/streaming/streaming_send/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package main

import (
"context"
"fmt"
"log"
"time"

"github.com/timeplus-io/proton-go-driver/v2"
)

func example() error {
var (
ctx = context.Background()
conn, err = proton.Open(&proton.Options{
Addr: []string{"127.0.0.1:8463"},
Auth: proton.Auth{
Database: "default",
Username: "default",
Password: "",
},
// Debug: true,
DialTimeout: time.Second,
MaxOpenConns: 10,
MaxIdleConns: 5,
ConnMaxLifetime: time.Hour,
})
)
ctx = proton.Context(ctx, proton.WithProgress(func(p *proton.Progress) {
fmt.Println("progress: ", p)
}))
if err != nil {
return err
}
if err := conn.Exec(ctx, `DROP STREAM IF EXISTS example`); err != nil {
return err
}
err = conn.Exec(ctx, `
CREATE STREAM IF NOT EXISTS example (
Col1 uint64
, Col2 string
)
`)
if err != nil {
return err
}

buffer, err := conn.PrepareStreamingBuffer(ctx, "INSERT INTO example (* except _tp_time)")
if err != nil {
return err
}
for i := 0; i < 100; i++ {
for j := 0; j < 100000; j++ {
err := buffer.Append(
uint64(i*100000+j),
fmt.Sprintf("num_%d_%d", j, i),
)
if err != nil {
return err
}
}
if err := buffer.Send(); err != nil {
return err
}
}
return buffer.Close()
}

func main() {
start := time.Now()
if err := example(); err != nil {
log.Fatal(err)
}
fmt.Println(time.Since(start))
}
Loading
Loading