Skip to content

Commit

Permalink
s2: Add support for custom stream encoder (#755)
Browse files Browse the repository at this point in the history
```Go
// WriterCustomEncoder allows to override the encoder for blocks on the stream.
// The function must compress 'src' into 'dst' and return the bytes used in dst as an integer.
// Block size (initial varint) should not be added by the encoder.
// Returning value 0 indicates the block could not be compressed.
// The function should expect to be called concurrently.
func WriterCustomEncoder(fn func(dst, src []byte) int) WriterOption
```
  • Loading branch information
klauspost authored Feb 24, 2023
1 parent fdc8ab0 commit 47158f2
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 4 deletions.
22 changes: 22 additions & 0 deletions internal/snapref/encode_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,28 @@ func hash(u, shift uint32) uint32 {
return (u * 0x1e35a7bd) >> shift
}

// EncodeBlockInto exposes encodeBlock but checks dst size.
func EncodeBlockInto(dst, src []byte) (d int) {
if MaxEncodedLen(len(src)) > len(dst) {
return 0
}

// encodeBlock breaks on too big blocks, so split.
for len(src) > 0 {
p := src
src = nil
if len(p) > maxBlockSize {
p, src = p[:maxBlockSize], p[maxBlockSize:]
}
if len(p) < minNonLiteralBlockSize {
d += emitLiteral(dst[d:], p)
} else {
d += encodeBlock(dst[d:], p)
}
}
return d
}

// encodeBlock encodes a non-empty src to a guaranteed-large-enough dst. It
// assumes that the varint-encoded length of the decompressed bytes has already
// been written.
Expand Down
24 changes: 20 additions & 4 deletions s2/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,11 @@ type Writer struct {
buffers sync.Pool
pad int

writer io.Writer
randSrc io.Reader
writerWg sync.WaitGroup
index Index
writer io.Writer
randSrc io.Reader
writerWg sync.WaitGroup
index Index
customEnc func(dst, src []byte) int

// wroteStreamHeader is whether we have written the stream header.
wroteStreamHeader bool
Expand Down Expand Up @@ -799,6 +800,9 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) {
}

func (w *Writer) encodeBlock(obuf, uncompressed []byte) int {
if w.customEnc != nil {
return w.customEnc(obuf, uncompressed)
}
if w.snappy {
switch w.level {
case levelFast:
Expand Down Expand Up @@ -1365,3 +1369,15 @@ func WriterFlushOnWrite() WriterOption {
return nil
}
}

// WriterCustomEncoder allows to override the encoder for blocks on the stream.
// The function must compress 'src' into 'dst' and return the bytes used in dst as an integer.
// Block size (initial varint) should not be added by the encoder.
// Returning value 0 indicates the block could not be compressed.
// The function should expect to be called concurrently.
func WriterCustomEncoder(fn func(dst, src []byte) int) WriterOption {
return func(w *Writer) error {
w.customEnc = fn
return nil
}
}
1 change: 1 addition & 0 deletions s2/encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func testOptions(t testing.TB) map[string][]WriterOption {
for name, opt := range testOptions {
x[name] = opt
x[name+"-snappy"] = cloneAdd(opt, WriterSnappyCompat())
x[name+"-custom"] = cloneAdd(opt, WriterCustomEncoder(snapref.EncodeBlockInto))
}
testOptions = x
return testOptions
Expand Down

0 comments on commit 47158f2

Please sign in to comment.