From 47158f2625031ec47adfc85c98c86e6d7a0598a1 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Fri, 24 Feb 2023 14:41:46 +0100 Subject: [PATCH] s2: Add support for custom stream encoder (#755) ```Go // WriterCustomEncoder allows to override the encoder for blocks on the stream. // The function must compress 'src' into 'dst' and return the bytes used in dst as an integer. // Block size (initial varint) should not be added by the encoder. // Returning value 0 indicates the block could not be compressed. // The function should expect to be called concurrently. func WriterCustomEncoder(fn func(dst, src []byte) int) WriterOption ``` --- internal/snapref/encode_other.go | 22 ++++++++++++++++++++++ s2/encode.go | 24 ++++++++++++++++++++---- s2/encode_test.go | 1 + 3 files changed, 43 insertions(+), 4 deletions(-) diff --git a/internal/snapref/encode_other.go b/internal/snapref/encode_other.go index 298c4f8e97..05db94d39a 100644 --- a/internal/snapref/encode_other.go +++ b/internal/snapref/encode_other.go @@ -103,6 +103,28 @@ func hash(u, shift uint32) uint32 { return (u * 0x1e35a7bd) >> shift } +// EncodeBlockInto exposes encodeBlock but checks dst size. +func EncodeBlockInto(dst, src []byte) (d int) { + if MaxEncodedLen(len(src)) > len(dst) { + return 0 + } + + // encodeBlock breaks on too big blocks, so split. + for len(src) > 0 { + p := src + src = nil + if len(p) > maxBlockSize { + p, src = p[:maxBlockSize], p[maxBlockSize:] + } + if len(p) < minNonLiteralBlockSize { + d += emitLiteral(dst[d:], p) + } else { + d += encodeBlock(dst[d:], p) + } + } + return d +} + // encodeBlock encodes a non-empty src to a guaranteed-large-enough dst. It // assumes that the varint-encoded length of the decompressed bytes has already // been written. diff --git a/s2/encode.go b/s2/encode.go index 8e688dfbb1..59edf1a576 100644 --- a/s2/encode.go +++ b/s2/encode.go @@ -430,10 +430,11 @@ type Writer struct { buffers sync.Pool pad int - writer io.Writer - randSrc io.Reader - writerWg sync.WaitGroup - index Index + writer io.Writer + randSrc io.Reader + writerWg sync.WaitGroup + index Index + customEnc func(dst, src []byte) int // wroteStreamHeader is whether we have written the stream header. wroteStreamHeader bool @@ -799,6 +800,9 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) { } func (w *Writer) encodeBlock(obuf, uncompressed []byte) int { + if w.customEnc != nil { + return w.customEnc(obuf, uncompressed) + } if w.snappy { switch w.level { case levelFast: @@ -1365,3 +1369,15 @@ func WriterFlushOnWrite() WriterOption { return nil } } + +// WriterCustomEncoder allows to override the encoder for blocks on the stream. +// The function must compress 'src' into 'dst' and return the bytes used in dst as an integer. +// Block size (initial varint) should not be added by the encoder. +// Returning value 0 indicates the block could not be compressed. +// The function should expect to be called concurrently. +func WriterCustomEncoder(fn func(dst, src []byte) int) WriterOption { + return func(w *Writer) error { + w.customEnc = fn + return nil + } +} diff --git a/s2/encode_test.go b/s2/encode_test.go index 2787563700..c9901eea6d 100644 --- a/s2/encode_test.go +++ b/s2/encode_test.go @@ -59,6 +59,7 @@ func testOptions(t testing.TB) map[string][]WriterOption { for name, opt := range testOptions { x[name] = opt x[name+"-snappy"] = cloneAdd(opt, WriterSnappyCompat()) + x[name+"-custom"] = cloneAdd(opt, WriterCustomEncoder(snapref.EncodeBlockInto)) } testOptions = x return testOptions