Skip to content

Commit

Permalink
s2: Add 'best' compression option (#310)
Browse files Browse the repository at this point in the history
Add compression mode that will compress the data very well, but at a significant performance decrease.

Mainly for offline compression, but where decompression speed should still be high and compatible with other S2 compressed data.

Comparing on 16 core CPU:

```
* enwik10:
Default... 10000000000 -> 4761467548 [47.61%]; 1.098s, 8685.6MB/s
Better... 10000000000 -> 4225922984 [42.26%]; 2.817s, 3385.4MB/s
Best... 10000000000 -> 3667646858 [36.68%]; 35.995s, 264.9MB/s

* github-june-2days-2019.json:
Default... 6273951764 -> 1043196283 [16.63%]; 431ms, 13882.3MB/s
Better... 6273951764 -> 950079555 [15.14%]; 736ms, 8129.5MB/s
Best... 6273951764 -> 846260870 [13.49%]; 8.125s, 736.4MB/s

* nyc-taxi-data-10M.csv:
Default... 3325605752 -> 1095998837 [32.96%]; 324ms, 9788.7MB/s
Better... 3325605752 -> 960330423 [28.88%]; 602ms, 5268.4MB/s
Best... 3325605752 -> 794873295 [23.90%]; 6.619s, 479.1MB/s

* 10gb.tar
Default... 10065157632 -> 5916578242 [58.78%]; 1.028s, 9337.4MB/s
Better... 10065157632 -> 5650133605 [56.14%]; 2.172s, 4419.4MB/s
Best... 10065157632 -> 5246578570 [52.13%]; 25.696s, 373.6MB/s

* consensus.db.10gb:
Default... 10737418240 -> 4562648848 [42.49%]; 882ms, 11610.0MB/s
Better... 10737418240 -> 4542443833 [42.30%]; 3.3s, 3103.5MB/s
Best... 10737418240 -> 4272335558 [39.79%]; 38.955s, 262.9MB/s
```
  • Loading branch information
klauspost authored Jan 13, 2021
1 parent dedf03c commit fb371c6
Show file tree
Hide file tree
Showing 6 changed files with 510 additions and 23 deletions.
42 changes: 41 additions & 1 deletion s2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -442,11 +442,51 @@ The PDF sample shows a significant slowdown compared to Snappy, as this mode tri
to compress the data. Very small blocks are also not favorable for better compression, so throughput is way down.

This mode aims to provide better compression at the expense of performance and achieves that
without a huge performance pentalty, except on very small blocks.
without a huge performance penalty, except on very small blocks.

Decompression speed suffers a little compared to the regular S2 mode,
but still manages to be close to Snappy in spite of increased compression.

# Best compression mode

S2 offers a "best" compression mode.

This will compress as much as possible with little regard to CPU usage.

Mainly for offline compression, but where decompression speed should still
be high and compatible with other S2 compressed data.

Some examples compared on 16 core CPU:

```
* enwik10
Default... 10000000000 -> 4761467548 [47.61%]; 1.098s, 8685.6MB/s
Better... 10000000000 -> 4225922984 [42.26%]; 2.817s, 3385.4MB/s
Best... 10000000000 -> 3667646858 [36.68%]; 35.995s, 264.9MB/s
* github-june-2days-2019.json
Default... 6273951764 -> 1043196283 [16.63%]; 431ms, 13882.3MB/s
Better... 6273951764 -> 950079555 [15.14%]; 736ms, 8129.5MB/s
Best... 6273951764 -> 846260870 [13.49%]; 8.125s, 736.4MB/s
* nyc-taxi-data-10M.csv
Default... 3325605752 -> 1095998837 [32.96%]; 324ms, 9788.7MB/s
Better... 3325605752 -> 960330423 [28.88%]; 602ms, 5268.4MB/s
Best... 3325605752 -> 794873295 [23.90%]; 6.619s, 479.1MB/s
* 10gb.tar
Default... 10065157632 -> 5916578242 [58.78%]; 1.028s, 9337.4MB/s
Better... 10065157632 -> 5650133605 [56.14%]; 2.172s, 4419.4MB/s
Best... 10065157632 -> 5246578570 [52.13%]; 25.696s, 373.6MB/s
* consensus.db.10gb
Default... 10737418240 -> 4562648848 [42.49%]; 882ms, 11610.0MB/s
Better... 10737418240 -> 4542443833 [42.30%]; 3.3s, 3103.5MB/s
Best... 10737418240 -> 4272335558 [39.79%]; 38.955s, 262.9MB/s
```

Decompression speed should be around the same as using the 'better' compression mode.

# Concatenating blocks and streams.

Concatenating streams will concatenate the output of both without recompressing them.
Expand Down
52 changes: 48 additions & 4 deletions s2/cmd/s2c/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os/signal"
"path/filepath"
"runtime"
"runtime/debug"
"runtime/pprof"
"runtime/trace"
"strconv"
Expand All @@ -26,6 +27,7 @@ import (

var (
faster = flag.Bool("faster", false, "Compress faster, but with a minor compression loss")
slower = flag.Bool("slower", false, "Compress more, but a lot slower")
cpu = flag.Int("cpu", runtime.GOMAXPROCS(0), "Compress using this amount of threads")
blockSize = flag.String("blocksize", "4M", "Max block size. Examples: 64K, 256K, 1M, 4M. Must be power of two and <= 4MB")
safe = flag.Bool("safe", false, "Do not overwrite output files")
Expand Down Expand Up @@ -55,7 +57,7 @@ func main() {
exitErr(err)

args := flag.Args()
if len(args) == 0 || *help {
if len(args) == 0 || *help || (*slower && *faster) {
_, _ = fmt.Fprintf(os.Stderr, "s2 compress v%v, built at %v.\n\n", version, date)
_, _ = fmt.Fprintf(os.Stderr, "Copyright (c) 2011 The Snappy-Go Authors. All rights reserved.\n"+
"Copyright (c) 2019 Klaus Post. All rights reserved.\n\n")
Expand All @@ -77,6 +79,9 @@ Options:`)
if !*faster {
opts = append(opts, s2.WriterBetterCompression())
}
if *slower {
opts = append(opts, s2.WriterBestCompression())
}
wr := s2.NewWriter(nil, opts...)

// No args, use stdin/stdout
Expand Down Expand Up @@ -130,10 +135,49 @@ Options:`)
}

*quiet = *quiet || *stdout
allFiles := files
for i := 0; i < *bench; i++ {
files = append(files, allFiles...)
if *bench > 0 {
debug.SetGCPercent(10)
for _, filename := range files {
func() {
if !*quiet {
fmt.Print("Reading ", filename, "...")
}
// Input file.
file, err := os.Open(filename)
exitErr(err)
finfo, err := file.Stat()
exitErr(err)
b := make([]byte, finfo.Size())
_, err = io.ReadFull(file, b)
exitErr(err)
file.Close()
for i := 0; i < *bench; i++ {
wc := wCounter{out: ioutil.Discard}
if !*quiet {
fmt.Print("\nCompressing...")
}
wr.Reset(&wc)
start := time.Now()
err := wr.EncodeBuffer(b)
exitErr(err)
err = wr.Close()
exitErr(err)
if !*quiet {
input := len(b)
elapsed := time.Since(start)
mbpersec := (float64(input) / (1024 * 1024)) / (float64(elapsed) / (float64(time.Second)))
pct := float64(wc.n) * 100 / float64(input)
ms := elapsed.Round(time.Millisecond)
fmt.Printf(" %d -> %d [%.02f%%]; %v, %.01fMB/s", input, wc.n, pct, ms, mbpersec)
}
}
fmt.Println("")
wr.Close()
}()
}
os.Exit(0)
}

for _, filename := range files {
func() {
var closeOnce sync.Once
Expand Down
105 changes: 87 additions & 18 deletions s2/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,48 @@ func EncodeBetter(dst, src []byte) []byte {
return dst[:d]
}

// EncodeBest returns the encoded form of src. The returned slice may be a sub-
// slice of dst if dst was large enough to hold the entire encoded block.
// Otherwise, a newly allocated slice will be returned.
//
// EncodeBest compresses as good as reasonably possible but with a
// big speed decrease.
//
// The dst and src must not overlap. It is valid to pass a nil dst.
//
// The blocks will require the same amount of memory to decode as encoding,
// and does not make for concurrent decoding.
// Also note that blocks do not contain CRC information, so corruption may be undetected.
//
// If you need to encode larger amounts of data, consider using
// the streaming interface which gives all of these features.
func EncodeBest(dst, src []byte) []byte {
if n := MaxEncodedLen(len(src)); n < 0 {
panic(ErrTooLarge)
} else if len(dst) < n {
dst = make([]byte, n)
}

// The block starts with the varint-encoded length of the decompressed bytes.
d := binary.PutUvarint(dst, uint64(len(src)))

if len(src) == 0 {
return dst[:d]
}
if len(src) < minNonLiteralBlockSize {
d += emitLiteral(dst[d:], src)
return dst[:d]
}
n := encodeBlockBest(dst[d:], src)
if n > 0 {
d += n
return dst[:d]
}
// Not compressible
d += emitLiteral(dst[d:], src)
return dst[:d]
}

// EncodeSnappy returns the encoded form of src. The returned slice may be a sub-
// slice of dst if dst was large enough to hold the entire encoded block.
// Otherwise, a newly allocated slice will be returned.
Expand Down Expand Up @@ -239,6 +281,7 @@ func NewWriter(w io.Writer, opts ...WriterOption) *Writer {
blockSize: defaultBlockSize,
concurrency: runtime.GOMAXPROCS(0),
randSrc: rand.Reader,
level: levelFast,
}
for _, opt := range opts {
if err := opt(&w2); err != nil {
Expand Down Expand Up @@ -279,10 +322,16 @@ type Writer struct {
// wroteStreamHeader is whether we have written the stream header.
wroteStreamHeader bool
paramsOK bool
better bool
uncompressed bool
level uint8
}

const (
levelUncompressed = iota + 1
levelFast
levelBetter
levelBest
)

type result []byte

// err returns the previously set error.
Expand Down Expand Up @@ -490,10 +539,13 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) {
// Attempt compressing.
n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
var n2 int
if w.better {
n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
} else if !w.uncompressed {
switch w.level {
case levelFast:
n2 = encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
case levelBetter:
n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
case levelBest:
n2 = encodeBlockBest(obuf[obufHeaderLen+n:], uncompressed)
}

// Check if we should use this, or store as uncompressed instead.
Expand Down Expand Up @@ -567,10 +619,13 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) {
// Attempt compressing.
n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
var n2 int
if w.better {
n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
} else if !w.uncompressed {
switch w.level {
case levelFast:
n2 = encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
case levelBetter:
n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
case levelBest:
n2 = encodeBlockBest(obuf[obufHeaderLen+n:], uncompressed)
}

// Check if we should use this, or store as uncompressed instead.
Expand Down Expand Up @@ -643,10 +698,13 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) {
// Attempt compressing.
n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
var n2 int
if w.better {
n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
} else if !w.uncompressed {
switch w.level {
case levelFast:
n2 = encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
case levelBetter:
n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
case levelBest:
n2 = encodeBlockBest(obuf[obufHeaderLen+n:], uncompressed)
}

// Check if we should use this, or store as uncompressed instead.
Expand Down Expand Up @@ -712,10 +770,13 @@ func (w *Writer) writeSync(p []byte) (nRet int, errRet error) {
// Attempt compressing.
n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
var n2 int
if w.better {
n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
} else if !w.uncompressed {
switch w.level {
case levelFast:
n2 = encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
case levelBetter:
n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
case levelBest:
n2 = encodeBlockBest(obuf[obufHeaderLen+n:], uncompressed)
}

if n2 > 0 {
Expand Down Expand Up @@ -887,8 +948,17 @@ func WriterConcurrency(n int) WriterOption {
// 10-40% speed decrease on both compression and decompression.
func WriterBetterCompression() WriterOption {
return func(w *Writer) error {
w.uncompressed = false
w.better = true
w.level = levelBetter
return nil
}
}

// WriterBestCompression will enable better compression.
// EncodeBetter compresses better than Encode but typically with a
// big speed decrease on compression.
func WriterBestCompression() WriterOption {
return func(w *Writer) error {
w.level = levelBest
return nil
}
}
Expand All @@ -898,8 +968,7 @@ func WriterBetterCompression() WriterOption {
// If concurrency is > 1 CRC and output will still be done async.
func WriterUncompressed() WriterOption {
return func(w *Writer) error {
w.better = false
w.uncompressed = true
w.level = levelUncompressed
return nil
}
}
Expand Down
Loading

0 comments on commit fb371c6

Please sign in to comment.