Skip to content

Commit

Permalink
feat: use proto.Writer even if writev is not available
Browse files Browse the repository at this point in the history
`proto.Writer` API still reduces total memory consumption and prevents unnecessary data copying
  • Loading branch information
tdakkota committed Sep 25, 2024
1 parent 09c3a4d commit c420f0b
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 97 deletions.
76 changes: 49 additions & 27 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
type Client struct {
lg *zap.Logger
conn net.Conn
buf *proto.Buffer
writer *proto.Writer
reader *proto.Reader
info proto.ClientHello
Expand Down Expand Up @@ -277,19 +276,38 @@ func (c *Client) flushBuf(ctx context.Context, b *proto.Buffer) error {
if n != len(b.Buf) {
return errors.Wrap(io.ErrShortWrite, "wrote less than expected")
}
if ce := c.lg.Check(zap.DebugLevel, "Flush"); ce != nil {
if ce := c.lg.Check(zap.DebugLevel, "Buffer flush"); ce != nil {
ce.Write(zap.Int("bytes", n))
}
b.Reset()
return nil
}

func (c *Client) flush(ctx context.Context) error {
return c.flushBuf(ctx, c.buf)
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "context")
}
if deadline, ok := ctx.Deadline(); ok {
if err := c.conn.SetWriteDeadline(deadline); err != nil {
return errors.Wrap(err, "set write deadline")
}
// Reset deadline.
defer func() { _ = c.conn.SetWriteDeadline(time.Time{}) }()
}
n, err := c.writer.Flush()
if err != nil {
return err
}
if ce := c.lg.Check(zap.DebugLevel, "Flush"); ce != nil {
ce.Write(zap.Int64("bytes", n))
}
return nil
}

func (c *Client) encode(v proto.AwareEncoder) {
v.EncodeAware(c.buf, c.protocolVersion)
c.writer.ChainBuffer(func(b *proto.Buffer) {
v.EncodeAware(b, c.protocolVersion)
})
}

//go:generate go run github.com/dmarkham/enumer -transform upper -type Compression -trimprefix Compression -output compression_enum.go
Expand Down Expand Up @@ -452,9 +470,32 @@ func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error) {
ctx = newCtx
defer span.End()
}

var (
compressor = compress.NewWriterWithLevel(compress.Level(opt.CompressionLevel))
compression proto.Compression
compressionMethod compress.Method
)
switch opt.Compression {
case CompressionLZ4:
compression = proto.CompressionEnabled
compressionMethod = compress.LZ4
case CompressionLZ4HC:
compression = proto.CompressionEnabled
compressionMethod = compress.LZ4HC
case CompressionZSTD:
compression = proto.CompressionEnabled
compressionMethod = compress.ZSTD
case CompressionNone:
compression = proto.CompressionEnabled
compressionMethod = compress.None
default:
compression = proto.CompressionDisabled
}

c := &Client{
conn: conn,
buf: new(proto.Buffer),
writer: proto.NewWriter(conn, new(proto.Buffer)),
reader: proto.NewReader(conn),
settings: opt.Settings,
lg: opt.Logger,
Expand All @@ -465,7 +506,9 @@ func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error) {

readTimeout: opt.ReadTimeout,

compressor: compress.NewWriterWithLevel(compress.Level(opt.CompressionLevel)),
compression: compression,
compressionMethod: compressionMethod,
compressor: compressor,

version: ver,
protocolVersion: opt.ProtocolVersion,
Expand All @@ -481,27 +524,6 @@ func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error) {
Password: opt.Password,
},
}
switch opt.Compression {
case CompressionLZ4:
c.compression = proto.CompressionEnabled
c.compressionMethod = compress.LZ4
case CompressionLZ4HC:
c.compression = proto.CompressionEnabled
c.compressionMethod = compress.LZ4HC
case CompressionZSTD:
c.compression = proto.CompressionEnabled
c.compressionMethod = compress.ZSTD
case CompressionNone:
c.compression = proto.CompressionEnabled
c.compressionMethod = compress.None
default:
c.compression = proto.CompressionDisabled
}

if _, ok := conn.(*net.TCPConn); writevAvailable && // writev available only on Unix platforms.
ok && c.compression == proto.CompressionDisabled { // Could not be used with TLS and compression.
c.writer = proto.NewWriter(c.conn, c.buf)
}

handshakeCtx, cancel := context.WithTimeout(ctx, opt.HandshakeTimeout)
defer cancel()
Expand Down
9 changes: 6 additions & 3 deletions handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (

func (c *Client) encodeAddendum() {
if proto.FeatureQuotaKey.In(c.protocolVersion) {
c.buf.PutString(c.quotaKey)
c.writer.ChainBuffer(func(b *proto.Buffer) {
b.PutString(c.quotaKey)
})
}
}

Expand Down Expand Up @@ -45,8 +47,9 @@ func (c *Client) handshake(ctx context.Context) error {
wg.Go(func() error {
defer cancel()

c.buf.Reset()
c.info.Encode(c.buf)
c.writer.ChainBuffer(func(b *proto.Buffer) {
c.info.Encode(b)
})
if err := c.flush(wgCtx); err != nil {
return errors.Wrap(err, "flush")
}
Expand Down
4 changes: 3 additions & 1 deletion ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ func (c *Client) Ping(ctx context.Context) (err error) {
span.End()
}()
}
c.buf.Encode(proto.ClientCodePing)
c.writer.ChainBuffer(func(b *proto.Buffer) {
b.Encode(proto.ClientCodePing)
})
if err := c.flush(ctx); err != nil {
return errors.Wrap(err, "flush")
}
Expand Down
3 changes: 2 additions & 1 deletion proto/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func checkWriteColumn(data ColInput) func(*testing.T) {
w = NewWriter(&got, new(Buffer))
)
data.WriteColumn(w)
require.NoError(t, w.Flush())
_, err := w.Flush()
require.NoError(t, err)

require.Equal(t, expect.Buf, got.Bytes())
}
Expand Down
19 changes: 5 additions & 14 deletions proto/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,14 @@ func NewWriter(conn io.Writer, buf *Buffer) *Writer {
buf: buf,
vec: make(net.Buffers, 0, 16),
}
// In case if passed buf is not empty.
w.cutBuffer()
return w
}

// ChainWrite adds buffer to the vector to write later.
//
// Passed byte slice may be captured until [Writer.Flush] is called.
func (w *Writer) ChainWrite(data []byte) {
if w.needCut {
w.cutBuffer()
}
w.cutBuffer()
w.vec = append(w.vec, data)
}

Expand All @@ -47,12 +43,9 @@ func (w *Writer) ChainWrite(data []byte) {
// NB: do not retain buffer.
func (w *Writer) ChainBuffer(cb func(*Buffer)) {
cb(w.buf)
w.needCut = true
}

func (w *Writer) cutBuffer() {
w.needCut = false

newOffset := len(w.buf.Buf)
data := w.buf.Buf[w.bufOffset:newOffset:newOffset]
if len(data) == 0 {
Expand All @@ -72,11 +65,9 @@ func (w *Writer) reset() {
}

// Flush flushes all data to writer.
func (w *Writer) Flush() (err error) {
if w.needCut {
w.cutBuffer()
}
_, err = w.vec.WriteTo(w.conn)
func (w *Writer) Flush() (n int64, err error) {
w.cutBuffer()
n, err = w.vec.WriteTo(w.conn)
w.reset()
return err
return n, err
}
77 changes: 36 additions & 41 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,16 +271,16 @@ func (c *Client) decodeBlock(ctx context.Context, opt decodeOptions) error {
// If input length is zero, blank block will be encoded, which is special case
// for "end of data".
func (c *Client) encodeBlock(ctx context.Context, tableName string, input []proto.InputColumn) error {
proto.ClientCodeData.Encode(c.buf)
clientData := proto.ClientData{
// External data table name.
// https://clickhouse.com/docs/en/engines/table-engines/special/external-data/
TableName: tableName,
}
clientData.EncodeAware(c.buf, c.protocolVersion)
c.writer.ChainBuffer(func(buf *proto.Buffer) {
proto.ClientCodeData.Encode(buf)
clientData := proto.ClientData{
// External data table name.
// https://clickhouse.com/docs/en/engines/table-engines/special/external-data/
TableName: tableName,
}
clientData.EncodeAware(buf, c.protocolVersion)
})

// Saving offset of compressible data.
start := len(c.buf.Buf)
b := proto.Block{
Columns: len(input),
}
Expand All @@ -293,48 +293,43 @@ func (c *Client) encodeBlock(ctx context.Context, tableName string, input []prot
}
}

if w := c.writer; w != nil {
if err := b.WriteBlock(w, c.protocolVersion, input); err != nil {
if c.compression == proto.CompressionDisabled {
if err := b.WriteBlock(c.writer, c.protocolVersion, input); err != nil {
return err
}
if err := c.flushWritev(ctx); err != nil {
return errors.Wrap(err, "write buffers")
}
} else {
if err := b.EncodeBlock(c.buf, c.protocolVersion, input); err != nil {
return errors.Wrap(err, "encode")
}
}
// TODO(tdakkota): find out if we can actually stream compressed blocks.

var rerr error
c.writer.ChainBuffer(func(buf *proto.Buffer) {
// Saving offset of compressible data.
start := len(buf.Buf)
if err := b.EncodeBlock(buf, c.protocolVersion, input); err != nil {
rerr = errors.Wrap(err, "encode")
return
}

// Performing compression.
//
// Note: only blocks are compressed.
// See "Compressible" method of server or client code for reference.
if c.compression == proto.CompressionEnabled {
data := c.buf.Buf[start:]
if err := c.compressor.Compress(c.compressionMethod, data); err != nil {
return errors.Wrap(err, "compress")
// Performing compression.
//
// Note: only blocks are compressed.
// See "Compressible" method of server or client code for reference.
if c.compression == proto.CompressionEnabled {
data := buf.Buf[start:]
if err := c.compressor.Compress(c.compressionMethod, data); err != nil {
rerr = errors.Wrap(err, "compress")
return
}
buf.Buf = append(buf.Buf[:start], c.compressor.Data...)
}
})
if rerr != nil {
return rerr
}
c.buf.Buf = append(c.buf.Buf[:start], c.compressor.Data...)
}

return nil
}

func (c *Client) flushWritev(ctx context.Context) error {
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "context")
}
if deadline, ok := ctx.Deadline(); ok {
if err := c.conn.SetWriteDeadline(deadline); err != nil {
return errors.Wrap(err, "set write deadline")
}
// Reset deadline.
defer func() { _ = c.conn.SetWriteDeadline(time.Time{}) }()
}
return c.writer.Flush()
}

// encodeBlankBlock encodes block with zero columns and rows which is special
// case for "end of data".
func (c *Client) encodeBlankBlock(ctx context.Context) error {
Expand Down
5 changes: 0 additions & 5 deletions writev_other.go

This file was deleted.

5 changes: 0 additions & 5 deletions writev_unix.go

This file was deleted.

0 comments on commit c420f0b

Please sign in to comment.