diff --git a/batch.go b/batch.go index f9f3e5227..19dcef8cd 100644 --- a/batch.go +++ b/batch.go @@ -79,10 +79,16 @@ func (batch *Batch) close() (err error) { batch.conn = nil batch.lock = nil + if batch.msgs != nil { batch.msgs.discard() } + if batch.msgs != nil && batch.msgs.decompressed != nil { + releaseBuffer(batch.msgs.decompressed) + batch.msgs.decompressed = nil + } + if err = batch.err; errors.Is(batch.err, io.EOF) { err = nil } diff --git a/message_reader.go b/message_reader.go index 35e1067f2..cf9c0c36d 100644 --- a/message_reader.go +++ b/message_reader.go @@ -22,7 +22,7 @@ type messageSetReader struct { // This is used to detect truncation of the response. lengthRemain int - decompressed bytes.Buffer + decompressed *bytes.Buffer } type readerStack struct { @@ -87,6 +87,7 @@ func newMessageSetReader(reader *bufio.Reader, remain int) (*messageSetReader, e reader: reader, remain: remain, }, + decompressed: acquireBuffer(), } err := res.readHeader() return res, err @@ -199,7 +200,7 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB // Allocate a buffer of size 0, which gets capped at 16 bytes // by the bufio package. We are already reading buffered data // here, no need to reserve another 4KB buffer. - reader: bufio.NewReaderSize(&r.decompressed, 0), + reader: bufio.NewReaderSize(r.decompressed, 0), remain: r.decompressed.Len(), base: offset, parent: r.readerStack, @@ -278,7 +279,7 @@ func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readByt } r.remain -= batchRemain - int(limitReader.N) r.readerStack = &readerStack{ - reader: bufio.NewReaderSize(&r.decompressed, 0), // the new stack reads from the decompressed buffer + reader: bufio.NewReaderSize(r.decompressed, 0), // the new stack reads from the decompressed buffer remain: r.decompressed.Len(), base: -1, // base is unused here parent: r.readerStack,