Skip to content

Commit

Permalink
feat: use proto.Writer even if writev is not available
Browse files Browse the repository at this point in the history
`proto.Writer` API still reduces total memory consumption and prevents unnecessary data copying
  • Loading branch information
tdakkota committed Sep 25, 2024
1 parent 09c3a4d commit 333a54b
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 40 deletions.
31 changes: 28 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,17 +277,43 @@ func (c *Client) flushBuf(ctx context.Context, b *proto.Buffer) error {
if n != len(b.Buf) {
return errors.Wrap(io.ErrShortWrite, "wrote less than expected")
}
if ce := c.lg.Check(zap.DebugLevel, "Flush"); ce != nil {
if ce := c.lg.Check(zap.DebugLevel, "Buffer flush"); ce != nil {
ce.Write(zap.Int("bytes", n))
}
b.Reset()
return nil
}

func (c *Client) flush(ctx context.Context) error {
if w := c.writer; w != nil {
if err := c.flushWritev(ctx); err != nil {
return err
}
}
return c.flushBuf(ctx, c.buf)
}

func (c *Client) flushWritev(ctx context.Context) error {
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "context")
}
if deadline, ok := ctx.Deadline(); ok {
if err := c.conn.SetWriteDeadline(deadline); err != nil {
return errors.Wrap(err, "set write deadline")
}
// Reset deadline.
defer func() { _ = c.conn.SetWriteDeadline(time.Time{}) }()
}
n, err := c.writer.Flush()
if err != nil {
return err
}
if ce := c.lg.Check(zap.DebugLevel, "Writer flush"); ce != nil {
ce.Write(zap.Int64("bytes", n))
}
return nil
}

func (c *Client) encode(v proto.AwareEncoder) {
v.EncodeAware(c.buf, c.protocolVersion)
}
Expand Down Expand Up @@ -498,8 +524,7 @@ func Connect(ctx context.Context, conn net.Conn, opt Options) (*Client, error) {
c.compression = proto.CompressionDisabled
}

if _, ok := conn.(*net.TCPConn); writevAvailable && // writev available only on Unix platforms.
ok && c.compression == proto.CompressionDisabled { // Could not be used with TLS and compression.
if c.compression == proto.CompressionDisabled { // Could not be used with TLS and compression.
c.writer = proto.NewWriter(c.conn, c.buf)
}

Expand Down
6 changes: 3 additions & 3 deletions proto/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ func (w *Writer) reset() {
}

// Flush flushes all data to writer.
func (w *Writer) Flush() (err error) {
func (w *Writer) Flush() (n int64, err error) {
if w.needCut {
w.cutBuffer()
}
_, err = w.vec.WriteTo(w.conn)
n, err = w.vec.WriteTo(w.conn)
w.reset()
return err
return n, err
}
34 changes: 10 additions & 24 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,37 +304,23 @@ func (c *Client) encodeBlock(ctx context.Context, tableName string, input []prot
if err := b.EncodeBlock(c.buf, c.protocolVersion, input); err != nil {
return errors.Wrap(err, "encode")
}
}

// Performing compression.
//
// Note: only blocks are compressed.
// See "Compressible" method of server or client code for reference.
if c.compression == proto.CompressionEnabled {
data := c.buf.Buf[start:]
if err := c.compressor.Compress(c.compressionMethod, data); err != nil {
return errors.Wrap(err, "compress")
// Performing compression.
//
// Note: only blocks are compressed.
// See "Compressible" method of server or client code for reference.
if c.compression == proto.CompressionEnabled {
data := c.buf.Buf[start:]
if err := c.compressor.Compress(c.compressionMethod, data); err != nil {
return errors.Wrap(err, "compress")
}
c.buf.Buf = append(c.buf.Buf[:start], c.compressor.Data...)
}
c.buf.Buf = append(c.buf.Buf[:start], c.compressor.Data...)
}

return nil
}

func (c *Client) flushWritev(ctx context.Context) error {
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "context")
}
if deadline, ok := ctx.Deadline(); ok {
if err := c.conn.SetWriteDeadline(deadline); err != nil {
return errors.Wrap(err, "set write deadline")
}
// Reset deadline.
defer func() { _ = c.conn.SetWriteDeadline(time.Time{}) }()
}
return c.writer.Flush()
}

// encodeBlankBlock encodes block with zero columns and rows which is special
// case for "end of data".
func (c *Client) encodeBlankBlock(ctx context.Context) error {
Expand Down
5 changes: 0 additions & 5 deletions writev_other.go

This file was deleted.

5 changes: 0 additions & 5 deletions writev_unix.go

This file was deleted.

0 comments on commit 333a54b

Please sign in to comment.