Skip to content

Commit

Permalink
Merge pull request #413 from tdakkota/perf/use-writev
Browse files Browse the repository at this point in the history
feat(ch): use `writev` when possible
  • Loading branch information
ernado authored Sep 25, 2024
2 parents e8fbf58 + c420f0b commit e4a2d07
Show file tree
Hide file tree
Showing 135 changed files with 1,147 additions and 48 deletions.
72 changes: 50 additions & 22 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
type Client struct {
lg *zap.Logger
conn net.Conn
buf *proto.Buffer
writer *proto.Writer
reader *proto.Reader
info proto.ClientHello
server proto.ServerHello
Expand Down Expand Up @@ -276,19 +276,38 @@ func (c *Client) flushBuf(ctx context.Context, b *proto.Buffer) error {
if n != len(b.Buf) {
return errors.Wrap(io.ErrShortWrite, "wrote less than expected")
}
if ce := c.lg.Check(zap.DebugLevel, "Flush"); ce != nil {
if ce := c.lg.Check(zap.DebugLevel, "Buffer flush"); ce != nil {
ce.Write(zap.Int("bytes", n))
}
b.Reset()
return nil
}

func (c *Client) flush(ctx context.Context) error {
return c.flushBuf(ctx, c.buf)
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "context")
}
if deadline, ok := ctx.Deadline(); ok {
if err := c.conn.SetWriteDeadline(deadline); err != nil {
return errors.Wrap(err, "set write deadline")
}
// Reset deadline.
defer func() { _ = c.conn.SetWriteDeadline(time.Time{}) }()
}
n, err := c.writer.Flush()
if err != nil {
return err
}
if ce := c.lg.Check(zap.DebugLevel, "Flush"); ce != nil {
ce.Write(zap.Int64("bytes", n))
}
return nil
}

func (c *Client) encode(v proto.AwareEncoder) {
v.EncodeAware(c.buf, c.protocolVersion)
c.writer.ChainBuffer(func(b *proto.Buffer) {
v.EncodeAware(b, c.protocolVersion)
})
}

//go:generate go run github.com/dmarkham/enumer -transform upper -type Compression -trimprefix Compression -output compression_enum.go
Expand Down Expand Up @@ -451,9 +470,32 @@ func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error) {
ctx = newCtx
defer span.End()
}

var (
compressor = compress.NewWriterWithLevel(compress.Level(opt.CompressionLevel))
compression proto.Compression
compressionMethod compress.Method
)
switch opt.Compression {
case CompressionLZ4:
compression = proto.CompressionEnabled
compressionMethod = compress.LZ4
case CompressionLZ4HC:
compression = proto.CompressionEnabled
compressionMethod = compress.LZ4HC
case CompressionZSTD:
compression = proto.CompressionEnabled
compressionMethod = compress.ZSTD
case CompressionNone:
compression = proto.CompressionEnabled
compressionMethod = compress.None
default:
compression = proto.CompressionDisabled
}

c := &Client{
conn: conn,
buf: new(proto.Buffer),
writer: proto.NewWriter(conn, new(proto.Buffer)),
reader: proto.NewReader(conn),
settings: opt.Settings,
lg: opt.Logger,
Expand All @@ -464,7 +506,9 @@ func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error) {

readTimeout: opt.ReadTimeout,

compressor: compress.NewWriterWithLevel(compress.Level(opt.CompressionLevel)),
compression: compression,
compressionMethod: compressionMethod,
compressor: compressor,

version: ver,
protocolVersion: opt.ProtocolVersion,
Expand All @@ -480,22 +524,6 @@ func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error) {
Password: opt.Password,
},
}
switch opt.Compression {
case CompressionLZ4:
c.compression = proto.CompressionEnabled
c.compressionMethod = compress.LZ4
case CompressionLZ4HC:
c.compression = proto.CompressionEnabled
c.compressionMethod = compress.LZ4HC
case CompressionZSTD:
c.compression = proto.CompressionEnabled
c.compressionMethod = compress.ZSTD
case CompressionNone:
c.compression = proto.CompressionEnabled
c.compressionMethod = compress.None
default:
c.compression = proto.CompressionDisabled
}

handshakeCtx, cancel := context.WithTimeout(ctx, opt.HandshakeTimeout)
defer cancel()
Expand Down
9 changes: 6 additions & 3 deletions handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (

func (c *Client) encodeAddendum() {
if proto.FeatureQuotaKey.In(c.protocolVersion) {
c.buf.PutString(c.quotaKey)
c.writer.ChainBuffer(func(b *proto.Buffer) {
b.PutString(c.quotaKey)
})
}
}

Expand Down Expand Up @@ -45,8 +47,9 @@ func (c *Client) handshake(ctx context.Context) error {
wg.Go(func() error {
defer cancel()

c.buf.Reset()
c.info.Encode(c.buf)
c.writer.ChainBuffer(func(b *proto.Buffer) {
c.info.Encode(b)
})
if err := c.flush(wgCtx); err != nil {
return errors.Wrap(err, "flush")
}
Expand Down
66 changes: 66 additions & 0 deletions insert_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package ch

import (
"context"
"fmt"
"testing"

"github.com/go-faster/errors"

"github.com/ClickHouse/ch-go/cht"
"github.com/ClickHouse/ch-go/proto"
)

func BenchmarkInsert(b *testing.B) {
cht.Skip(b)
srv := cht.New(b)

bench := func(rows int) func(b *testing.B) {
return func(b *testing.B) {
ctx := context.Background()
c, err := Dial(ctx, Options{
Address: srv.TCP,
Compression: CompressionDisabled,
})
if err != nil {
b.Fatal(errors.Wrap(err, "dial"))
}
defer func() { _ = c.Close() }()

if err := c.Do(ctx, Query{
Body: "CREATE TABLE IF NOT EXISTS test_table (id Int64) ENGINE = Null",
}); err != nil {
b.Fatal(err)
}

var id proto.ColInt64
for i := 0; i < rows; i++ {
id = append(id, 1)
}

b.SetBytes(int64(rows) * 8)
b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
if err := c.Do(ctx, Query{
Body: "INSERT INTO test_table VALUES",
Input: []proto.InputColumn{
{Name: "id", Data: id},
},
}); err != nil {
b.Fatal()
}
}
}
}
for _, rows := range []int{
10_000,
100_000,
1_000_000,
10_000_000,
100_000_000,
} {
b.Run(fmt.Sprintf("Rows%d", rows), bench(rows))
}
}
4 changes: 3 additions & 1 deletion ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ func (c *Client) Ping(ctx context.Context) (err error) {
span.End()
}()
}
c.buf.Encode(proto.ClientCodePing)
c.writer.ChainBuffer(func(b *proto.Buffer) {
b.Encode(proto.ClientCodePing)
})
if err := c.flush(ctx); err != nil {
return errors.Wrap(err, "flush")
}
Expand Down
32 changes: 32 additions & 0 deletions proto/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,38 @@ func (b Block) EncodeRawBlock(buf *Buffer, version int, input []InputColumn) err
return nil
}

func (b Block) WriteBlock(w *Writer, version int, input []InputColumn) error {
w.ChainBuffer(func(buf *Buffer) {
if FeatureBlockInfo.In(version) {
b.Info.Encode(buf)
}
buf.PutInt(b.Columns)
buf.PutInt(b.Rows)
})

for _, col := range input {
if r := col.Data.Rows(); r != b.Rows {
return errors.Errorf("%q has %d rows, expected %d", col.Name, r, b.Rows)
}
w.ChainBuffer(func(buf *Buffer) {
col.EncodeStart(buf, version)
})
if v, ok := col.Data.(Preparable); ok {
if err := v.Prepare(); err != nil {
return errors.Wrapf(err, "prepare %q", col.Name)
}
}
if col.Data.Rows() == 0 {
continue
}
if v, ok := col.Data.(StateEncoder); ok {
w.ChainBuffer(v.EncodeState)
}
col.Data.WriteColumn(w)
}
return nil
}

// This constrains can prevent accidental OOM and allow early detection
// of erroneous column or row count.
//
Expand Down
8 changes: 8 additions & 0 deletions proto/cmd/ch-gen-col/safe.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,11 @@ func (c {{ .Type }}) EncodeColumn(b *Buffer) {
}
{{- end }}
}

func (c {{ .Type }}) WriteColumn(w *Writer) {
{{- if .Byte }}
w.ChainWrite([]byte(c))
{{- else }}
w.ChainBuffer(c.EncodeColumn)
{{- end }}
}
1 change: 1 addition & 0 deletions proto/cmd/ch-gen-col/test.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func Test{{ .Type }}_DecodeColumn(t *testing.T) {
var v {{ .Type }}
v.EncodeColumn(nil) // should be no-op
})
t.Run("WriteColumn", checkWriteColumn(data))
}

{{- if not .Time }}
Expand Down
29 changes: 29 additions & 0 deletions proto/cmd/ch-gen-col/unsafe.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,32 @@ func (c {{ .Type }}) EncodeColumn(b *Buffer) {
dst := b.Buf[offset:]
copy(dst, src)
}

func (c {{ .Type }}) WriteColumn(w *Writer) {
{{- if .DateTime }}
v := c.Data
{{- else }}
v := c
{{- end }}
if len(v) == 0 {
return
}
{{- if .SingleByte }}
src := *(*[]byte)(unsafe.Pointer(&v))
{{- else }}
{{- if .FixedStr }}
const size = {{ .Bytes }}
{{- else }}
const size = {{ .Bits }} / 8
{{- end }}

s := *(*slice)(unsafe.Pointer(&v))
{{- if not .SingleByte }}
s.Len *= size
s.Cap *= size
{{- end }}

src := *(*[]byte)(unsafe.Pointer(&s))
{{- end }}
w.ChainWrite(src)
}
6 changes: 6 additions & 0 deletions proto/col_arr.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ func (c ColArr[T]) EncodeColumn(b *Buffer) {
c.Data.EncodeColumn(b)
}

// WriteColumn implements ColInput.
func (c ColArr[T]) WriteColumn(w *Writer) {
c.Offsets.WriteColumn(w)
c.Data.WriteColumn(w)
}

// Append appends new row to column.
func (c *ColArr[T]) Append(v []T) {
c.Data.AppendArr(v)
Expand Down
3 changes: 3 additions & 0 deletions proto/col_arr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func testColumn[T any](t *testing.T, name string, f func() ColumnOf[T], values .
t.Run("Golden", func(t *testing.T) {
gold.Bytes(t, buf.Buf, "column_of_"+name)
})
t.Run("WriteColumn", checkWriteColumn(data))
t.Run("Ok", func(t *testing.T) {
br := bytes.NewReader(buf.Buf)
r := NewReader(br)
Expand Down Expand Up @@ -84,6 +85,7 @@ func TestColArrOfStr(t *testing.T) {
dec := (&ColStr{}).Array()
requireNoShortRead(t, buf.Buf, colAware(dec, col.Rows()))
})
t.Run("WriteColumn", checkWriteColumn(col))
}

func TestArrOfLowCordStr(t *testing.T) {
Expand Down Expand Up @@ -118,6 +120,7 @@ func TestArrOfLowCordStr(t *testing.T) {
dec := NewArray[string](new(ColStr).LowCardinality())
requireNoShortRead(t, buf.Buf, colAware(dec, col.Rows()))
})
t.Run("WriteColumn", checkWriteColumn(col))
}

func TestColArr_DecodeColumn(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions proto/col_auto.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,7 @@ func (c ColAuto) Reset() {
func (c ColAuto) EncodeColumn(b *Buffer) {
c.Data.EncodeColumn(b)
}

func (c ColAuto) WriteColumn(w *Writer) {
c.Data.WriteColumn(w)
}
8 changes: 8 additions & 0 deletions proto/col_bool_safe.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,11 @@ func (c *ColBool) DecodeColumn(r *Reader, rows int) error {
*c = v
return nil
}

// WriteColumn encodes ColBool rows to *Writer.
func (c ColBool) WriteColumn(w *Writer) {
if len(c) == 0 {
return
}
w.ChainBuffer(c.EncodeColumn)
}
1 change: 1 addition & 0 deletions proto/col_bool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestColBool_DecodeColumn(t *testing.T) {
var dec ColBool
requireNoShortRead(t, buf.Buf, colAware(&dec, rows))
})
t.Run("WriteColumn", checkWriteColumn(data))
}

func BenchmarkColBool_DecodeColumn(b *testing.B) {
Expand Down
10 changes: 10 additions & 0 deletions proto/col_bool_unsafe.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,13 @@ func (c *ColBool) DecodeColumn(r *Reader, rows int) error {
}
return nil
}

// WriteColumn writes Bool rows to *Writer.
func (c ColBool) WriteColumn(w *Writer) {
if len(c) == 0 {
return
}
s := *(*slice)(unsafe.Pointer(&c)) // #nosec G103
src := *(*[]byte)(unsafe.Pointer(&s)) // #nosec G103
w.ChainWrite(src)
}
1 change: 1 addition & 0 deletions proto/col_date32_gen_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e4a2d07

Please sign in to comment.