From 469ba13d50f3d52dafab0a1c693d9f4dce8d3628 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Tue, 11 Jan 2022 07:40:02 -0800 Subject: [PATCH] s2: Add stream index (#462) # Stream Seek Index S2 and Snappy streams can have indexes. These indexes will allow random seeking within the compressed data. The index can either be appended to the stream as a skippable block or returned for separate storage. When the index is appended to a stream it will be skipped by regular decoders, so the output remains compatible with other decoders. ## Creating an Index To automatically add an index to a stream, add `WriterAddIndex()` option to your writer. Then the index will be added to the stream when `Close()` is called. ``` // Add Index to stream... enc := s2.NewWriter(w, s2.WriterAddIndex()) io.Copy(enc, r) enc.Close() ``` If you want to store the index separately, you can use `CloseIndex()` instead of the regular `Close()`. This will return the index. Note that `CloseIndex()` should only be called once, and you shouldn't call `Close()`. ``` // Get index for separate storage... enc := s2.NewWriter(w) io.Copy(enc, r) index, err := enc.CloseIndex() ``` The `index` can then be used needing to read from the stream. This means the index can be used without needing to seek to the end of the stream or for manually forwarding streams. See below. Finally, an existing S2/Snappy stream can be indexed using the `s2.IndexStream(r io.Reader)` function. ## Using Indexes To use indexes there is a `ReadSeeker(random bool, index []byte) (*ReadSeeker, error)` function available. Calling ReadSeeker will return an [io.ReadSeeker](https://pkg.go.dev/io#ReadSeeker) compatible version of the reader. If 'random' is specified the returned io.Seeker can be used for random seeking, otherwise only forward seeking is supported. Enabling random seeking requires the original input to support the [io.Seeker](https://pkg.go.dev/io#Seeker) interface. ``` dec := s2.NewReader(r) rs, err := dec.ReadSeeker(false, nil) rs.Seek(wantOffset, io.SeekStart) ``` Get a seeker to seek forward. Since no index is provided, the index is read from the stream. This requires that an index was added and that `r` supports the [io.Seeker](https://pkg.go.dev/io#Seeker) interface. A custom index can be specified which will be used if supplied. When using a custom index, it will not be read from the input stream. ``` dec := s2.NewReader(r) rs, err := dec.ReadSeeker(false, index) rs.Seek(wantOffset, io.SeekStart) ``` This will read the index from `index`. Since we specify non-random (forward only) seeking `r` does not have to be an io.Seeker ``` dec := s2.NewReader(r) rs, err := dec.ReadSeeker(true, index) rs.Seek(wantOffset, io.SeekStart) ``` Finally, since we specify that we want to do random seeking `r` must be an io.Seeker. The returned [ReadSeeker](https://pkg.go.dev/github.com/klauspost/compress/s2#ReadSeeker) contains a shallow reference to the existing Reader, meaning changes performed to one is reflected in the other. To check if a stream contains an index at the end, the `(*Index).LoadStream(rs io.ReadSeeker) error` can be used. ## Manually Forwarding Streams Indexes can also be read outside the decoder using the [Index](https://pkg.go.dev/github.com/klauspost/compress/s2#Index) type. This can be used for parsing indexes, either separate or in streams. In some cases it may not be possible to serve a seekable stream. This can for instance be an HTTP stream, where the Range request is sent at the start of the stream. With a little bit of extra code it is still possible to use indexes to forward to specific offset with a single forward skip. It is possible to load the index manually like this: ``` var index s2.Index _, err = index.Load(idxBytes) ``` This can be used to figure out how much to offset the compressed stream: ``` compressedOffset, uncompressedOffset, err := index.Find(wantOffset) ``` The `compressedOffset` is the number of bytes that should be skipped from the beginning of the compressed file. The `uncompressedOffset` will then be offset of the uncompressed bytes returned when decoding from that position. This will always be <= wantOffset. When creating a decoder it must be specified that it should *not* expect a stream identifier at the beginning of the stream. Assuming the io.Reader `r` has been forwarded to `compressedOffset` we create the decoder like this: ``` dec := s2.NewReader(r, s2.ReaderIgnoreStreamIdentifier()) ``` We are not completely done. We still need to forward the stream the uncompressed bytes we didn't want. This is done using the regular "Skip" function: ``` err = dec.Skip(wantOffset - uncompressedOffset) ``` This will ensure that we are at exactly the offset we want, and reading from `dec` will start at the requested offset. --- s2/README.md | 256 +++++++++++++- s2/cmd/internal/readahead/reader.go | 286 ++++++++++++++- s2/cmd/s2c/main.go | 4 + s2/cmd/s2d/main.go | 108 +++++- s2/decode.go | 229 +++++++++++- s2/encode.go | 241 +++++++++++-- s2/encode_test.go | 130 ++++++- s2/index.go | 525 ++++++++++++++++++++++++++++ s2/index_test.go | 101 ++++++ s2/s2.go | 4 + 10 files changed, 1793 insertions(+), 91 deletions(-) create mode 100644 s2/index.go create mode 100644 s2/index_test.go diff --git a/s2/README.md b/s2/README.md index 81fad65243..e6716aeaee 100644 --- a/s2/README.md +++ b/s2/README.md @@ -20,6 +20,7 @@ This is important, so you don't have to worry about spending CPU cycles on alrea * Concurrent stream compression * Faster decompression, even for Snappy compatible content * Ability to quickly skip forward in compressed stream +* Random seeking with indexes * Compatible with reading Snappy compressed content * Smaller block size overhead on incompressible blocks * Block concatenation @@ -29,8 +30,8 @@ This is important, so you don't have to worry about spending CPU cycles on alrea ## Drawbacks over Snappy -* Not optimized for 32 bit systems. -* Streams use slightly more memory due to larger blocks and concurrency (configurable). +* Not optimized for 32 bit systems +* Streams use slightly more memory due to larger blocks and concurrency (configurable) # Usage @@ -141,7 +142,7 @@ Binaries can be downloaded on the [Releases Page](https://github.com/klauspost/c Installing then requires Go to be installed. To install them, use: -`go install github.com/klauspost/compress/s2/cmd/s2c && go install github.com/klauspost/compress/s2/cmd/s2d` +`go install github.com/klauspost/compress/s2/cmd/s2c@latest && go install github.com/klauspost/compress/s2/cmd/s2d@latest` To build binaries to the current folder use: @@ -176,6 +177,8 @@ Options: Compress faster, but with a minor compression loss -help Display help + -index + Add seek index (default true) -o string Write output to another file. Single input file only -pad string @@ -217,11 +220,15 @@ Options: Display help -o string Write output to another file. Single input file only - -q Don't write any output to terminal, except errors + -offset string + Start at offset. Examples: 92, 64K, 256K, 1M, 4M. Requires Index + -q Don't write any output to terminal, except errors -rm - Delete source file(s) after successful decompression + Delete source file(s) after successful decompression -safe - Do not overwrite output files + Do not overwrite output files + -tail string + Return last of compressed file. Examples: 92, 64K, 256K, 1M, 4M. Requires Index -verify Verify files, but do not write output ``` @@ -633,12 +640,12 @@ Compression and speed is typically a bit better `MaxEncodedLen` is also smaller Comparison of [`webdevdata.org-2015-01-07-subset`](https://files.klauspost.com/compress/webdevdata.org-2015-01-07-4GB-subset.7z), 53927 files, total input size: 4,014,735,833 bytes. amd64, single goroutine used: -| Encoder | Size | MB/s | Reduction | -|-----------------------|------------|--------|------------ -| snappy.Encode | 1128706759 | 725.59 | 71.89% | -| s2.EncodeSnappy | 1093823291 | 899.16 | 72.75% | -| s2.EncodeSnappyBetter | 1001158548 | 578.49 | 75.06% | -| s2.EncodeSnappyBest | 944507998 | 66.00 | 76.47% | +| Encoder | Size | MB/s | Reduction | +|-----------------------|------------|------------|------------ +| snappy.Encode | 1128706759 | 725.59 | 71.89% | +| s2.EncodeSnappy | 1093823291 | **899.16** | 72.75% | +| s2.EncodeSnappyBetter | 1001158548 | 578.49 | 75.06% | +| s2.EncodeSnappyBest | 944507998 | 66.00 | **76.47%**| ## Streams @@ -649,11 +656,11 @@ Comparison of different streams, AMD Ryzen 3950x, 16 cores. Size and throughput: | File | snappy.NewWriter | S2 Snappy | S2 Snappy, Better | S2 Snappy, Best | |-----------------------------|--------------------------|---------------------------|--------------------------|-------------------------| -| nyc-taxi-data-10M.csv | 1316042016 - 517.54MB/s | 1307003093 - 8406.29MB/s | 1174534014 - 4984.35MB/s | 1115904679 - 177.81MB/s | -| enwik10 | 5088294643 - 433.45MB/s | 5175840939 - 8454.52MB/s | 4560784526 - 4403.10MB/s | 4340299103 - 159.71MB/s | -| 10gb.tar | 6056946612 - 703.25MB/s | 6208571995 - 9035.75MB/s | 5741646126 - 2402.08MB/s | 5548973895 - 171.17MB/s | -| github-june-2days-2019.json | 1525176492 - 908.11MB/s | 1476519054 - 12625.93MB/s | 1400547532 - 6163.61MB/s | 1321887137 - 200.71MB/s | -| consensus.db.10gb | 5412897703 - 1054.38MB/s | 5354073487 - 12634.82MB/s | 5335069899 - 2472.23MB/s | 5201000954 - 166.32MB/s | +| nyc-taxi-data-10M.csv | 1316042016 - 539.47MB/s | 1307003093 - 10132.73MB/s | 1174534014 - 5002.44MB/s | 1115904679 - 177.97MB/s | +| enwik10 (xml) | 5088294643 - 451.13MB/s | 5175840939 - 9440.69MB/s | 4560784526 - 4487.21MB/s | 4340299103 - 158.92MB/s | +| 10gb.tar (mixed) | 6056946612 - 729.73MB/s | 6208571995 - 9978.05MB/s | 5741646126 - 4919.98MB/s | 5548973895 - 180.44MB/s | +| github-june-2days-2019.json | 1525176492 - 933.00MB/s | 1476519054 - 13150.12MB/s | 1400547532 - 5803.40MB/s | 1321887137 - 204.29MB/s | +| consensus.db.10gb (db) | 5412897703 - 1102.14MB/s | 5354073487 - 13562.91MB/s | 5335069899 - 5294.73MB/s | 5201000954 - 175.72MB/s | # Decompression @@ -679,7 +686,220 @@ The 10 byte 'stream identifier' of the second stream can optionally be stripped, Blocks can be concatenated using the `ConcatBlocks` function. -Snappy blocks/streams can safely be concatenated with S2 blocks and streams. +Snappy blocks/streams can safely be concatenated with S2 blocks and streams. +Streams with indexes (see below) will currently not work on concatenated streams. + +# Stream Seek Index + +S2 and Snappy streams can have indexes. These indexes will allow random seeking within the compressed data. + +The index can either be appended to the stream as a skippable block or returned for separate storage. + +When the index is appended to a stream it will be skipped by regular decoders, +so the output remains compatible with other decoders. + +## Creating an Index + +To automatically add an index to a stream, add `WriterAddIndex()` option to your writer. +Then the index will be added to the stream when `Close()` is called. + +``` + // Add Index to stream... + enc := s2.NewWriter(w, s2.WriterAddIndex()) + io.Copy(enc, r) + enc.Close() +``` + +If you want to store the index separately, you can use `CloseIndex()` instead of the regular `Close()`. +This will return the index. Note that `CloseIndex()` should only be called once, and you shouldn't call `Close()`. + +``` + // Get index for separate storage... + enc := s2.NewWriter(w) + io.Copy(enc, r) + index, err := enc.CloseIndex() +``` + +The `index` can then be used needing to read from the stream. +This means the index can be used without needing to seek to the end of the stream +or for manually forwarding streams. See below. + +Finally, an existing S2/Snappy stream can be indexed using the `s2.IndexStream(r io.Reader)` function. + +## Using Indexes + +To use indexes there is a `ReadSeeker(random bool, index []byte) (*ReadSeeker, error)` function available. + +Calling ReadSeeker will return an [io.ReadSeeker](https://pkg.go.dev/io#ReadSeeker) compatible version of the reader. + +If 'random' is specified the returned io.Seeker can be used for random seeking, otherwise only forward seeking is supported. +Enabling random seeking requires the original input to support the [io.Seeker](https://pkg.go.dev/io#Seeker) interface. + +``` + dec := s2.NewReader(r) + rs, err := dec.ReadSeeker(false, nil) + rs.Seek(wantOffset, io.SeekStart) +``` + +Get a seeker to seek forward. Since no index is provided, the index is read from the stream. +This requires that an index was added and that `r` supports the [io.Seeker](https://pkg.go.dev/io#Seeker) interface. + +A custom index can be specified which will be used if supplied. +When using a custom index, it will not be read from the input stream. + +``` + dec := s2.NewReader(r) + rs, err := dec.ReadSeeker(false, index) + rs.Seek(wantOffset, io.SeekStart) +``` + +This will read the index from `index`. Since we specify non-random (forward only) seeking `r` does not have to be an io.Seeker + +``` + dec := s2.NewReader(r) + rs, err := dec.ReadSeeker(true, index) + rs.Seek(wantOffset, io.SeekStart) +``` + +Finally, since we specify that we want to do random seeking `r` must be an io.Seeker. + +The returned [ReadSeeker](https://pkg.go.dev/github.com/klauspost/compress/s2#ReadSeeker) contains a shallow reference to the existing Reader, +meaning changes performed to one is reflected in the other. + +To check if a stream contains an index at the end, the `(*Index).LoadStream(rs io.ReadSeeker) error` can be used. + +## Manually Forwarding Streams + +Indexes can also be read outside the decoder using the [Index](https://pkg.go.dev/github.com/klauspost/compress/s2#Index) type. +This can be used for parsing indexes, either separate or in streams. + +In some cases it may not be possible to serve a seekable stream. +This can for instance be an HTTP stream, where the Range request +is sent at the start of the stream. + +With a little bit of extra code it is still possible to use indexes +to forward to specific offset with a single forward skip. + +It is possible to load the index manually like this: +``` + var index s2.Index + _, err = index.Load(idxBytes) +``` + +This can be used to figure out how much to offset the compressed stream: + +``` + compressedOffset, uncompressedOffset, err := index.Find(wantOffset) +``` + +The `compressedOffset` is the number of bytes that should be skipped +from the beginning of the compressed file. + +The `uncompressedOffset` will then be offset of the uncompressed bytes returned +when decoding from that position. This will always be <= wantOffset. + +When creating a decoder it must be specified that it should *not* expect a stream identifier +at the beginning of the stream. Assuming the io.Reader `r` has been forwarded to `compressedOffset` +we create the decoder like this: + +``` + dec := s2.NewReader(r, s2.ReaderIgnoreStreamIdentifier()) +``` + +We are not completely done. We still need to forward the stream the uncompressed bytes we didn't want. +This is done using the regular "Skip" function: + +``` + err = dec.Skip(wantOffset - uncompressedOffset) +``` + +This will ensure that we are at exactly the offset we want, and reading from `dec` will start at the requested offset. + +## Index Format: + +Each block is structured as a snappy skippable block, with the chunk ID 0x99. + +The block can be read from the front, but contains information so it can be read from the back as well. + +Numbers are stored as fixed size little endian values or [zigzag encoded](https://developers.google.com/protocol-buffers/docs/encoding#signed_integers) [base 128 varints](https://developers.google.com/protocol-buffers/docs/encoding), +with un-encoded value length of 64 bits, unless other limits are specified. + +| Content | Format | +|---------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------| +| ID, `[1]byte` | Always 0x99. | +| Data Length, `[3]byte` | 3 byte little-endian length of the chunk in bytes, following this. | +| Header `[6]byte` | Header, must be `[115, 50, 105, 100, 120, 0]` or in text: "s2idx\x00". | +| UncompressedSize, Varint | Total Uncompressed size. | +| CompressedSize, Varint | Total Compressed size if known. Should be -1 if unknown. | +| EstBlockSize, Varint | Block Size, used for guessing uncompressed offsets. Must be >= 0. | +| Entries, Varint | Number of Entries in index, must be < 65536 and >=0. | +| HasUncompressedOffsets `byte` | 0 if no uncompressed offsets are present, 1 if present. Other values are invalid. | +| UncompressedOffsets, [Entries]VarInt | Uncompressed offsets. See below how to decode. | +| CompressedOffsets, [Entries]VarInt | Compressed offsets. See below how to decode. | +| Block Size, `[4]byte` | Little Endian total encoded size (including header and trailer). Can be used for searching backwards to start of block. | +| Trailer `[6]byte` | Trailer, must be `[0, 120, 100, 105, 50, 115]` or in text: "\x00xdi2s". Can be used for identifying block from end of stream. | + +For regular streams the uncompressed offsets are fully predictable, +so `HasUncompressedOffsets` allows to specify that compressed blocks all have +exactly `EstBlockSize` bytes of uncompressed content. + +Entries *must* be in order, starting with the lowest offset, +and there *must* be no uncompressed offset duplicates. +Entries *may* point to the start of a skippable block, +but it is then not allowed to also have an entry for the next block since +that would give an uncompressed offset duplicate. + +There is no requirement for all blocks to be represented in the index. +In fact there is a maximum of 65536 block entries in an index. + +The writer can use any method to reduce the number of entries. +An implicit block start at 0,0 can be assumed. + +### Decoding entries: + +``` +// Read Uncompressed entries. +// Each assumes EstBlockSize delta from previous. +for each entry { + uOff = 0 + if HasUncompressedOffsets == 1 { + uOff = ReadVarInt // Read value from stream + } + + // Except for the first entry, use previous values. + if entryNum == 0 { + entry[entryNum].UncompressedOffset = uOff + continue + } + + // Uncompressed uses previous offset and adds EstBlockSize + entry[entryNum].UncompressedOffset = entry[entryNum-1].UncompressedOffset + EstBlockSize +} + + +// Guess that the first block will be 50% of uncompressed size. +// Integer truncating division must be used. +CompressGuess := EstBlockSize / 2 + +// Read Compressed entries. +// Each assumes CompressGuess delta from previous. +// CompressGuess is adjusted for each value. +for each entry { + cOff = ReadVarInt // Read value from stream + + // Except for the first entry, use previous values. + if entryNum == 0 { + entry[entryNum].CompressedOffset = cOff + continue + } + + // Compressed uses previous and our estimate. + entry[entryNum].CompressedOffset = entry[entryNum-1].CompressedOffset + CompressGuess + + // Adjust compressed offset for next loop, integer truncating division must be used. + CompressGuess += cOff/2 +} +``` # Format Extensions diff --git a/s2/cmd/internal/readahead/reader.go b/s2/cmd/internal/readahead/reader.go index 706908f099..7f1e3811d0 100644 --- a/s2/cmd/internal/readahead/reader.go +++ b/s2/cmd/internal/readahead/reader.go @@ -1,6 +1,6 @@ // Copyright (c) 2015 Klaus Post, released under MIT License. See LICENSE file. -// The readahead package will do asynchronous read-ahead from an input io.Reader +// Package readahead will do asynchronous read-ahead from an input io.Reader // and make the data available as an io.Reader. // // This should be fully transparent, except that once an error @@ -19,6 +19,14 @@ import ( "io" ) +const ( + // DefaultBuffers is the default number of buffers used. + DefaultBuffers = 4 + + // DefaultBufferSize is the default buffer size, 1 MB. + DefaultBufferSize = 1 << 20 +) + type seekable struct { *reader } @@ -39,6 +47,85 @@ type reader struct { err error // If an error has occurred it is here cur *buffer // Current buffer being served exited chan struct{} // Channel is closed been the async reader shuts down + bufs [][]byte +} + +// NewReader returns a reader that will asynchronously read from +// the supplied reader into 4 buffers of 1MB each. +// +// It will start reading from the input at once, maybe even before this +// function has returned. +// +// The input can be read from the returned reader. +// When done use Close() to release the buffers. +// If a reader supporting the io.Seeker is given, +// the returned reader will also support it. +func NewReader(rd io.Reader) io.ReadCloser { + if rd == nil { + return nil + } + + ret, err := NewReaderSize(rd, DefaultBuffers, DefaultBufferSize) + + // Should not be possible to trigger from other packages. + if err != nil { + panic("unexpected error:" + err.Error()) + } + return ret +} + +// NewReadCloser returns a reader that will asynchronously read from +// the supplied reader into 4 buffers of 1MB each. +// +// It will start reading from the input at once, maybe even before this +// function has returned. +// +// The input can be read from the returned reader. +// When done use Close() to release the buffers, +// which will also close the supplied closer. +// If a reader supporting the io.Seeker is given, +// the returned reader will also support it. +func NewReadCloser(rd io.ReadCloser) io.ReadCloser { + if rd == nil { + return nil + } + + ret, err := NewReadCloserSize(rd, DefaultBuffers, DefaultBufferSize) + + // Should not be possible to trigger from other packages. + if err != nil { + panic("unexpected error:" + err.Error()) + } + return ret +} + +// NewReadSeeker returns a reader that will asynchronously read from +// the supplied reader into 4 buffers of 1MB each. +// +// It will start reading from the input at once, maybe even before this +// function has returned. +// +// The input can be read and seeked from the returned reader. +// When done use Close() to release the buffers. +func NewReadSeeker(rd io.ReadSeeker) ReadSeekCloser { + //Not checking for result as the input interface guarantees it's seekable + res, _ := NewReader(rd).(ReadSeekCloser) + return res +} + +// NewReadSeekCloser returns a reader that will asynchronously read from +// the supplied reader into 4 buffers of 1MB each. +// +// It will start reading from the input at once, maybe even before this +// function has returned. +// +// The input can be read and seeked from the returned reader. +// When done use Close() to release the buffers, +// which will also close the supplied closer. +func NewReadSeekCloser(rd ReadSeekCloser) ReadSeekCloser { + // Not checking for result as the input interface guarantees it's seekable + res, _ := NewReadCloser(rd).(ReadSeekCloser) + return res } // NewReaderSize returns a reader with a custom number of buffers and size. @@ -64,21 +151,160 @@ func NewReaderSize(rd io.Reader, buffers, size int) (res io.ReadCloser, err erro return } +// NewReaderBuffer returns a reader with a custom number of buffers and size. +// All buffers must be the same size. +// Buffers can be reused after Close has been called. +func NewReaderBuffer(rd io.Reader, buffers [][]byte) (res io.ReadCloser, err error) { + if len(buffers) == 0 { + return nil, fmt.Errorf("number of buffers too small") + } + sz := 0 + for _, buf := range buffers { + if len(buf) == 0 { + return nil, fmt.Errorf("zero size buffer sent") + } + if sz == 0 { + sz = len(buf) + } + if sz != len(buf) { + return nil, fmt.Errorf("buffers should have similar size") + } + } + if rd == nil { + return nil, fmt.Errorf("nil input reader supplied") + } + a := &reader{} + if _, ok := rd.(io.Seeker); ok { + res = &seekable{a} + } else { + res = a + } + a.initBuffers(rd, buffers, sz) + + return +} + +// NewReadCloserSize returns a reader with a custom number of buffers and size. +// buffers is the number of queued buffers and size is the size of each +// buffer in bytes. +func NewReadCloserSize(rc io.ReadCloser, buffers, size int) (res io.ReadCloser, err error) { + if size <= 0 { + return nil, fmt.Errorf("buffer size too small") + } + if buffers <= 0 { + return nil, fmt.Errorf("number of buffers too small") + } + if rc == nil { + return nil, fmt.Errorf("nil input reader supplied") + } + a := &reader{closer: rc} + if _, ok := rc.(io.Seeker); ok { + res = &seekable{a} + } else { + res = a + } + a.init(rc, buffers, size) + return +} + +// NewReadCloserBuffer returns a reader with a custom number of buffers and size. +// All buffers must be the same size. +// Buffers can be reused after Close has been called. +func NewReadCloserBuffer(rc io.ReadCloser, buffers [][]byte) (res io.ReadCloser, err error) { + if len(buffers) == 0 { + return nil, fmt.Errorf("number of buffers too small") + } + sz := 0 + for _, buf := range buffers { + if len(buf) == 0 { + return nil, fmt.Errorf("zero size buffer sent") + } + if sz == 0 { + sz = len(buf) + } + if sz != len(buf) { + return nil, fmt.Errorf("buffers should have similar size") + } + } + + if rc == nil { + return nil, fmt.Errorf("nil input reader supplied") + } + a := &reader{closer: rc} + if _, ok := rc.(io.Seeker); ok { + res = &seekable{a} + } else { + res = a + } + a.initBuffers(rc, buffers, sz) + return +} + +// NewReadSeekerSize returns a reader with a custom number of buffers and size. +// buffers is the number of queued buffers and size is the size of each +// buffer in bytes. +func NewReadSeekerSize(rd io.ReadSeeker, buffers, size int) (res ReadSeekCloser, err error) { + reader, err := NewReaderSize(rd, buffers, size) + if err != nil { + return nil, err + } + //Not checking for result as the input interface guarantees it's seekable + res, _ = reader.(ReadSeekCloser) + return +} + +// NewReadSeekCloserSize returns a reader with a custom number of buffers and size. +// buffers is the number of queued buffers and size is the size of each +// buffer in bytes. +func NewReadSeekCloserSize(rd ReadSeekCloser, buffers, size int) (res ReadSeekCloser, err error) { + reader, err := NewReadCloserSize(rd, buffers, size) + if err != nil { + return nil, err + } + //Not checking for result as the input interface guarantees it's seekable + res, _ = reader.(ReadSeekCloser) + return +} + +// NewReadSeekCloserBuffer returns a reader with a custom number of buffers and size. +// All buffers must be the same size. +// Buffers can be reused after Close has been called. +func NewReadSeekCloserBuffer(rd ReadSeekCloser, buffers [][]byte) (res ReadSeekCloser, err error) { + reader, err := NewReadCloserBuffer(rd, buffers) + if err != nil { + return nil, err + } + //Not checking for result as the input interface guarantees it's seekable + res, _ = reader.(ReadSeekCloser) + return +} + // initialize the reader func (a *reader) init(rd io.Reader, buffers, size int) { + x := make([]byte, buffers*size) + bufs := make([][]byte, buffers) + for i := range bufs { + bufs[i] = x[i*size : (i+1)*size : (i+1)*size] + } + a.initBuffers(rd, bufs, size) +} + +// initialize the reader +func (a *reader) initBuffers(rd io.Reader, buffers [][]byte, size int) { a.in = rd - a.ready = make(chan *buffer, buffers) - a.reuse = make(chan *buffer, buffers) - a.exit = make(chan struct{}) - a.exited = make(chan struct{}) - a.buffers = buffers + a.ready = make(chan *buffer, len(buffers)) + a.reuse = make(chan *buffer, len(buffers)) + a.exit = make(chan struct{}, 0) + a.exited = make(chan struct{}, 0) + a.buffers = len(buffers) a.size = size a.cur = nil a.err = nil + a.bufs = buffers // Create buffers - for i := 0; i < buffers; i++ { - a.reuse <- newBuffer(size) + for _, buf := range buffers { + a.reuse <- newBuffer(buf) } // Start async reader @@ -86,10 +312,25 @@ func (a *reader) init(rd io.Reader, buffers, size int) { // Ensure that when we exit this is signalled. defer close(a.exited) defer close(a.ready) + var atEOF bool for { select { case b := <-a.reuse: + if atEOF { + // Return delay + b.err = io.EOF + b.buf = b.buf[:0] + b.offset = 0 + a.ready <- b + return + } err := b.read(a.in) + // Delay EOF if we have content. + if err == io.EOF && len(b.buf) > 0 { + atEOF = true + err = nil + b.err = nil + } a.ready <- b if err != nil { return @@ -136,9 +377,14 @@ func (a *reader) Read(p []byte) (n int, err error) { n = copy(p, a.cur.buffer()) a.cur.inc(n) - // If at end of buffer, return any error, if present if a.cur.isEmpty() { - a.err = a.cur.err + // Return current, so a fetch can start. + if a.cur != nil { + // If at end of buffer, return any error, if present + a.err = a.cur.err + a.reuse <- a.cur + a.cur = nil + } return n, a.err } return n, nil @@ -166,8 +412,8 @@ func (a *seekable) Seek(offset int64, whence int) (res int64, err error) { } //Seek the actual Seeker if res, err = seeker.Seek(offset, whence); err == nil { - //If the seek was successful, reinitialize ourselves (with the new position). - a.init(a.in, a.buffers, a.size) + //If the seek was successful, reinitalize ourselves (with the new position). + a.initBuffers(a.in, a.bufs, a.size) } return } @@ -231,8 +477,8 @@ type buffer struct { size int } -func newBuffer(size int) *buffer { - return &buffer{buf: make([]byte, size), err: nil, size: size} +func newBuffer(buf []byte) *buffer { + return &buffer{buf: buf, err: nil, size: len(buf)} } // isEmpty returns true is offset is at end of @@ -257,8 +503,18 @@ func (b *buffer) read(rd io.Reader) (err error) { b.err = err } }() + var n int - n, b.err = rd.Read(b.buf[0:b.size]) + buf := b.buf[0:b.size] + for n < b.size { + n2, err := rd.Read(buf) + n += n2 + if err != nil { + b.err = err + break + } + buf = buf[n2:] + } b.buf = b.buf[0:n] b.offset = 0 return b.err diff --git a/s2/cmd/s2c/main.go b/s2/cmd/s2c/main.go index 6875139c7b..7c43f11e18 100644 --- a/s2/cmd/s2c/main.go +++ b/s2/cmd/s2c/main.go @@ -35,6 +35,7 @@ var ( blockSize = flag.String("blocksize", "4M", "Max block size. Examples: 64K, 256K, 1M, 4M. Must be power of two and <= 4MB") block = flag.Bool("block", false, "Compress as a single block. Will load content into memory.") safe = flag.Bool("safe", false, "Do not overwrite output files") + index = flag.Bool("index", true, "Add seek index") padding = flag.String("pad", "1", "Pad size to a multiple of this value, Examples: 500, 64K, 256K, 1M, 4M, etc") stdout = flag.Bool("c", false, "Write all output to stdout. Multiple input files will be concatenated") out = flag.String("o", "", "Write output to another file. Single input file only") @@ -85,6 +86,9 @@ Options:`) os.Exit(0) } opts := []s2.WriterOption{s2.WriterBlockSize(int(sz)), s2.WriterConcurrency(*cpu), s2.WriterPadding(int(pad))} + if *index { + opts = append(opts, s2.WriterAddIndex()) + } if !*faster { opts = append(opts, s2.WriterBetterCompression()) } diff --git a/s2/cmd/s2d/main.go b/s2/cmd/s2d/main.go index 5f973c5100..3d152f00f6 100644 --- a/s2/cmd/s2d/main.go +++ b/s2/cmd/s2d/main.go @@ -11,9 +11,11 @@ import ( "net/http" "os" "runtime/debug" + "strconv" "strings" "sync" "time" + "unicode" "github.com/klauspost/compress/s2" "github.com/klauspost/compress/s2/cmd/internal/filepathx" @@ -27,6 +29,8 @@ var ( remove = flag.Bool("rm", false, "Delete source file(s) after successful decompression") quiet = flag.Bool("q", false, "Don't write any output to terminal, except errors") bench = flag.Int("bench", 0, "Run benchmark n times. No output will be written") + tail = flag.String("tail", "", "Return last of compressed file. Examples: 92, 64K, 256K, 1M, 4M. Requires Index") + offset = flag.String("offset", "", "Start at offset. Examples: 92, 64K, 256K, 1M, 4M. Requires Index") help = flag.Bool("help", false, "Display help") out = flag.String("o", "", "Write output to another file. Single input file only") block = flag.Bool("block", false, "Decompress as a single block. Will load content into memory.") @@ -61,6 +65,13 @@ Options:`) flag.PrintDefaults() os.Exit(0) } + tailBytes, err := toSize(*tail) + exitErr(err) + offset, err := toSize(*offset) + exitErr(err) + if tailBytes > 0 && offset > 0 { + exitErr(errors.New("--offset and --tail cannot be used together")) + } if len(args) == 1 && args[0] == "-" { r.Reset(os.Stdin) if *verify { @@ -204,15 +215,31 @@ Options:`) // Input file. file, _, mode := openFile(filename) defer closeOnce.Do(func() { file.Close() }) - rc := rCounter{in: file} + var rc interface { + io.Reader + BytesRead() int64 + } + if tailBytes > 0 || offset > 0 { + rs, ok := file.(io.ReadSeeker) + if !ok && tailBytes > 0 { + exitErr(errors.New("cannot tail with non-seekable input")) + } + if ok { + rc = &rCountSeeker{in: rs} + } else { + rc = &rCounter{in: file} + } + } else { + rc = &rCounter{in: file} + } var src io.Reader - if !block { - ra, err := readahead.NewReaderSize(&rc, 2, 4<<20) + if !block && tailBytes == 0 && offset == 0 { + ra, err := readahead.NewReaderSize(rc, 2, 4<<20) exitErr(err) defer ra.Close() src = ra } else { - src = &rc + src = rc } if *safe { _, err := os.Stat(dstFilename) @@ -247,6 +274,16 @@ Options:`) decoded = bytes.NewReader(b) } else { r.Reset(src) + if tailBytes > 0 || offset > 0 { + rs, err := r.ReadSeeker(tailBytes > 0, nil) + exitErr(err) + if tailBytes > 0 { + _, err = rs.Seek(int64(tailBytes), io.SeekEnd) + } else { + _, err = rs.Seek(int64(offset), io.SeekStart) + } + exitErr(err) + } decoded = r } output, err := io.Copy(out, decoded) @@ -254,8 +291,8 @@ Options:`) if !*quiet { elapsed := time.Since(start) mbPerSec := (float64(output) / (1024 * 1024)) / (float64(elapsed) / (float64(time.Second))) - pct := float64(output) * 100 / float64(rc.n) - fmt.Printf(" %d -> %d [%.02f%%]; %.01fMB/s\n", rc.n, output, pct, mbPerSec) + pct := float64(output) * 100 / float64(rc.BytesRead()) + fmt.Printf(" %d -> %d [%.02f%%]; %.01fMB/s\n", rc.BytesRead(), output, pct, mbPerSec) } if *remove && !*verify { closeOnce.Do(func() { @@ -317,13 +354,68 @@ func exitErr(err error) { } type rCounter struct { - n int + n int64 in io.Reader } func (w *rCounter) Read(p []byte) (n int, err error) { n, err = w.in.Read(p) - w.n += n + w.n += int64(n) return n, err +} + +func (w *rCounter) BytesRead() int64 { + return w.n +} +type rCountSeeker struct { + n int64 + in io.ReadSeeker +} + +func (w *rCountSeeker) Read(p []byte) (n int, err error) { + n, err = w.in.Read(p) + w.n += int64(n) + return n, err +} + +func (w *rCountSeeker) Seek(offset int64, whence int) (int64, error) { + return w.in.Seek(offset, whence) +} + +func (w *rCountSeeker) BytesRead() int64 { + return w.n +} + +// toSize converts a size indication to bytes. +func toSize(size string) (uint64, error) { + if len(size) == 0 { + return 0, nil + } + size = strings.ToUpper(strings.TrimSpace(size)) + firstLetter := strings.IndexFunc(size, unicode.IsLetter) + if firstLetter == -1 { + firstLetter = len(size) + } + + bytesString, multiple := size[:firstLetter], size[firstLetter:] + bytes, err := strconv.ParseUint(bytesString, 10, 64) + if err != nil { + return 0, fmt.Errorf("unable to parse size: %v", err) + } + + switch multiple { + case "T", "TB", "TIB": + return bytes * 1 << 40, nil + case "G", "GB", "GIB": + return bytes * 1 << 30, nil + case "M", "MB", "MIB": + return bytes * 1 << 20, nil + case "K", "KB", "KIB": + return bytes * 1 << 10, nil + case "B", "": + return bytes, nil + default: + return 0, fmt.Errorf("unknown size suffix: %v", multiple) + } } diff --git a/s2/decode.go b/s2/decode.go index d0ae5304ef..9e7fce8856 100644 --- a/s2/decode.go +++ b/s2/decode.go @@ -8,7 +8,9 @@ package s2 import ( "encoding/binary" "errors" + "fmt" "io" + "io/ioutil" ) var ( @@ -22,6 +24,16 @@ var ( ErrUnsupported = errors.New("s2: unsupported input") ) +// ErrCantSeek is returned if the stream cannot be seeked. +type ErrCantSeek struct { + Reason string +} + +// Error returns the error as string. +func (e ErrCantSeek) Error() string { + return fmt.Sprintf("s2: Can't seek because %s", e.Reason) +} + // DecodedLen returns the length of the decoded block. func DecodedLen(src []byte) (int, error) { v, _, err := decodedLen(src) @@ -88,6 +100,7 @@ func NewReader(r io.Reader, opts ...ReaderOption) *Reader { } else { nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize) } + nr.readHeader = nr.ignoreStreamID nr.paramsOK = true return &nr } @@ -131,12 +144,41 @@ func ReaderAllocBlock(blockSize int) ReaderOption { } } +// ReaderIgnoreStreamIdentifier will make the reader skip the expected +// stream identifier at the beginning of the stream. +// This can be used when serving a stream that has been forwarded to a specific point. +func ReaderIgnoreStreamIdentifier() ReaderOption { + return func(r *Reader) error { + r.ignoreStreamID = true + return nil + } +} + +// ReaderSkippableCB will register a callback for chuncks with the specified ID. +// ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive). +// For each chunk with the ID, the callback is called with the content. +// Any returned non-nil error will abort decompression. +// Only one callback per ID is supported, latest sent will be used. +func ReaderSkippableCB(id uint8, fn func(r io.Reader) error) ReaderOption { + return func(r *Reader) error { + if id < 0x80 || id > 0xfd { + return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfd (inclusive)") + } + r.skippableCB[id] = fn + return nil + } +} + // Reader is an io.Reader that can read Snappy-compressed bytes. type Reader struct { - r io.Reader - err error - decoded []byte - buf []byte + r io.Reader + err error + decoded []byte + buf []byte + skippableCB [0x80]func(r io.Reader) error + blockStart int64 // Uncompressed offset at start of current. + index *Index + // decoded[i:j] contains decoded bytes that have not yet been passed on. i, j int // maximum block size allowed. @@ -144,10 +186,11 @@ type Reader struct { // maximum expected buffer size. maxBufSize int // alloc a buffer this size if > 0. - lazyBuf int - readHeader bool - paramsOK bool - snappyFrame bool + lazyBuf int + readHeader bool + paramsOK bool + snappyFrame bool + ignoreStreamID bool } // ensureBufferSize will ensure that the buffer can take at least n bytes. @@ -172,11 +215,12 @@ func (r *Reader) Reset(reader io.Reader) { if !r.paramsOK { return } + r.index = nil r.r = reader r.err = nil r.i = 0 r.j = 0 - r.readHeader = false + r.readHeader = r.ignoreStreamID } func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) { @@ -189,11 +233,24 @@ func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) { return true } -// skipN will skip n bytes. +// skippable will skip n bytes. // If the supplied reader supports seeking that is used. // tmp is used as a temporary buffer for reading. // The supplied slice does not need to be the size of the read. -func (r *Reader) skipN(tmp []byte, n int, allowEOF bool) (ok bool) { +func (r *Reader) skippable(tmp []byte, n int, allowEOF bool, id uint8) (ok bool) { + if id < 0x80 { + r.err = fmt.Errorf("interbal error: skippable id < 0x80") + return false + } + if fn := r.skippableCB[id-0x80]; fn != nil { + rd := io.LimitReader(r.r, int64(n)) + r.err = fn(rd) + if r.err != nil { + return false + } + _, r.err = io.CopyBuffer(ioutil.Discard, rd, tmp) + return r.err == nil + } if rs, ok := r.r.(io.ReadSeeker); ok { _, err := rs.Seek(int64(n), io.SeekCurrent) if err == nil { @@ -247,6 +304,7 @@ func (r *Reader) Read(p []byte) (int, error) { // https://github.com/google/snappy/blob/master/framing_format.txt switch chunkType { case chunkTypeCompressedData: + r.blockStart += int64(r.j) // Section 4.2. Compressed data (chunk type 0x00). if chunkLen < checksumSize { r.err = ErrCorrupt @@ -294,6 +352,7 @@ func (r *Reader) Read(p []byte) (int, error) { continue case chunkTypeUncompressedData: + r.blockStart += int64(r.j) // Section 4.3. Uncompressed data (chunk type 0x01). if chunkLen < checksumSize { r.err = ErrCorrupt @@ -357,17 +416,20 @@ func (r *Reader) Read(p []byte) (int, error) { if chunkType <= 0x7f { // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f). + // fmt.Printf("ERR chunktype: 0x%x\n", chunkType) r.err = ErrUnsupported return 0, r.err } // Section 4.4 Padding (chunk type 0xfe). // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). - if chunkLen > maxBlockSize { + if chunkLen > maxChunkSize { + // fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen) r.err = ErrUnsupported return 0, r.err } - if !r.skipN(r.buf, chunkLen, false) { + // fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen) + if !r.skippable(r.buf, chunkLen, false, chunkType) { return 0, r.err } } @@ -396,7 +458,7 @@ func (r *Reader) Skip(n int64) error { return nil } n -= int64(r.j - r.i) - r.i, r.j = 0, 0 + r.i = r.j } // Buffer empty; read blocks until we have content. @@ -420,6 +482,7 @@ func (r *Reader) Skip(n int64) error { // https://github.com/google/snappy/blob/master/framing_format.txt switch chunkType { case chunkTypeCompressedData: + r.blockStart += int64(r.j) // Section 4.2. Compressed data (chunk type 0x00). if chunkLen < checksumSize { r.err = ErrCorrupt @@ -468,6 +531,7 @@ func (r *Reader) Skip(n int64) error { r.i, r.j = 0, dLen continue case chunkTypeUncompressedData: + r.blockStart += int64(r.j) // Section 4.3. Uncompressed data (chunk type 0x01). if chunkLen < checksumSize { r.err = ErrCorrupt @@ -528,19 +592,138 @@ func (r *Reader) Skip(n int64) error { r.err = ErrUnsupported return r.err } - if chunkLen > maxBlockSize { + if chunkLen > maxChunkSize { r.err = ErrUnsupported return r.err } // Section 4.4 Padding (chunk type 0xfe). // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). - if !r.skipN(r.buf, chunkLen, false) { + if !r.skippable(r.buf, chunkLen, false, chunkType) { return r.err } } return nil } +// ReadSeeker provides random or forward seeking in compressed content. +// See Reader.ReadSeeker +type ReadSeeker struct { + *Reader +} + +// ReadSeeker will return an io.ReadSeeker compatible version of the reader. +// If 'random' is specified the returned io.Seeker can be used for +// random seeking, otherwise only forward seeking is supported. +// Enabling random seeking requires the original input to support +// the io.Seeker interface. +// A custom index can be specified which will be used if supplied. +// When using a custom index, it will not be read from the input stream. +// The returned ReadSeeker contains a shallow reference to the existing Reader, +// meaning changes performed to one is reflected in the other. +func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) { + // Read index if provided. + if len(index) != 0 { + if r.index == nil { + r.index = &Index{} + } + if _, err := r.index.Load(index); err != nil { + return nil, ErrCantSeek{Reason: "loading index returned: " + err.Error()} + } + } + + // Check if input is seekable + rs, ok := r.r.(io.ReadSeeker) + if !ok { + if !random { + return &ReadSeeker{Reader: r}, nil + } + return nil, ErrCantSeek{Reason: "input stream isn't seekable"} + } + + if r.index != nil { + // Seekable and index, ok... + return &ReadSeeker{Reader: r}, nil + } + + // Load from stream. + r.index = &Index{} + + // Read current position. + pos, err := rs.Seek(0, io.SeekCurrent) + if err != nil { + return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()} + } + err = r.index.LoadStream(rs) + if err != nil { + if err == ErrUnsupported { + return nil, ErrCantSeek{Reason: "input stream does not contain an index"} + } + return nil, ErrCantSeek{Reason: "reading index returned: " + err.Error()} + } + + // reset position. + _, err = rs.Seek(pos, io.SeekStart) + if err != nil { + return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()} + } + return &ReadSeeker{Reader: r}, nil +} + +// Seek allows seeking in compressed data. +func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) { + if r.err != nil { + return 0, r.err + } + if offset == 0 && whence == io.SeekCurrent { + return r.blockStart + int64(r.i), nil + } + if !r.readHeader { + // Make sure we read the header. + _, r.err = r.Read([]byte{}) + } + rs, ok := r.r.(io.ReadSeeker) + if r.index == nil || !ok { + if whence == io.SeekCurrent && offset >= 0 { + err := r.Skip(offset) + return r.blockStart + int64(r.i), err + } + if whence == io.SeekStart && offset >= r.blockStart+int64(r.i) { + err := r.Skip(offset - r.blockStart - int64(r.i)) + return r.blockStart + int64(r.i), err + } + return 0, ErrUnsupported + + } + + switch whence { + case io.SeekCurrent: + offset += r.blockStart + int64(r.i) + case io.SeekEnd: + offset = -offset + } + c, u, err := r.index.Find(offset) + if err != nil { + return r.blockStart + int64(r.i), err + } + + // Seek to next block + _, err = rs.Seek(c, io.SeekStart) + if err != nil { + return 0, err + } + + if offset < 0 { + offset = r.index.TotalUncompressed + offset + } + + r.i = r.j // Remove rest of current block. + if u < offset { + // Forward inside block + return offset, r.Skip(offset - u) + } + return offset, nil +} + // ReadByte satisfies the io.ByteReader interface. func (r *Reader) ReadByte() (byte, error) { if r.err != nil { @@ -563,3 +746,17 @@ func (r *Reader) ReadByte() (byte, error) { } return 0, io.ErrNoProgress } + +// SkippableCB will register a callback for chunks with the specified ID. +// ID must be a Reserved skippable chunks ID, 0x80-0xfe (inclusive). +// For each chunk with the ID, the callback is called with the content. +// Any returned non-nil error will abort decompression. +// Only one callback per ID is supported, latest sent will be used. +// Sending a nil function will disable previous callbacks. +func (r *Reader) SkippableCB(id uint8, fn func(r io.Reader) error) error { + if id < 0x80 || id > chunkTypePadding { + return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfe (inclusive)") + } + r.skippableCB[id] = fn + return nil +} diff --git a/s2/encode.go b/s2/encode.go index aa8b108d01..59f992ca6e 100644 --- a/s2/encode.go +++ b/s2/encode.go @@ -395,23 +395,26 @@ type Writer struct { // ibuf is a buffer for the incoming (uncompressed) bytes. ibuf []byte - blockSize int - obufLen int - concurrency int - written int64 - output chan chan result - buffers sync.Pool - pad int + blockSize int + obufLen int + concurrency int + written int64 + uncompWritten int64 // Bytes sent to compression + output chan chan result + buffers sync.Pool + pad int writer io.Writer randSrc io.Reader writerWg sync.WaitGroup + index Index // wroteStreamHeader is whether we have written the stream header. wroteStreamHeader bool paramsOK bool snappy bool flushOnWrite bool + appendIndex bool level uint8 } @@ -422,7 +425,11 @@ const ( levelBest ) -type result []byte +type result struct { + b []byte + // Uncompressed start offset + startOffset int64 +} // err returns the previously set error. // If no error has been set it is set to err if not nil. @@ -454,6 +461,9 @@ func (w *Writer) Reset(writer io.Writer) { w.wroteStreamHeader = false w.written = 0 w.writer = writer + w.uncompWritten = 0 + w.index.reset(w.blockSize) + // If we didn't get a writer, stop here. if writer == nil { return @@ -474,7 +484,8 @@ func (w *Writer) Reset(writer io.Writer) { // Get a queued write. for write := range toWrite { // Wait for the data to be available. - in := <-write + input := <-write + in := input.b if len(in) > 0 { if w.err(nil) == nil { // Don't expose data from previous buffers. @@ -485,11 +496,12 @@ func (w *Writer) Reset(writer io.Writer) { err = io.ErrShortBuffer } _ = w.err(err) + w.err(w.index.add(w.written, input.startOffset)) w.written += int64(n) } } if cap(in) >= w.obufLen { - w.buffers.Put([]byte(in)) + w.buffers.Put(in) } // close the incoming write request. // This can be used for synchronizing flushes. @@ -500,6 +512,9 @@ func (w *Writer) Reset(writer io.Writer) { // Write satisfies the io.Writer interface. func (w *Writer) Write(p []byte) (nRet int, errRet error) { + if err := w.err(nil); err != nil { + return 0, err + } if w.flushOnWrite { return w.write(p) } @@ -535,6 +550,9 @@ func (w *Writer) Write(p []byte) (nRet int, errRet error) { // The return value n is the number of bytes read. // Any error except io.EOF encountered during the read is also returned. func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) { + if err := w.err(nil); err != nil { + return 0, err + } if len(w.ibuf) > 0 { err := w.Flush() if err != nil { @@ -577,6 +595,85 @@ func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) { return n, w.err(nil) } +// AddSkippableBlock will add a skippable block to the stream. +// The ID must be 0x80-0xfe (inclusive). +// Length of the skippable block must be <= 16777215 bytes. +func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) { + if err := w.err(nil); err != nil { + return err + } + if len(data) == 0 { + return nil + } + if id < 0x80 || id > chunkTypePadding { + return fmt.Errorf("invalid skippable block id %x", id) + } + if len(data) > maxChunkSize { + return fmt.Errorf("skippable block excessed maximum size") + } + var header [4]byte + chunkLen := 4 + len(data) + header[0] = id + header[1] = uint8(chunkLen >> 0) + header[2] = uint8(chunkLen >> 8) + header[3] = uint8(chunkLen >> 16) + if w.concurrency == 1 { + write := func(b []byte) error { + n, err := w.writer.Write(b) + if err = w.err(err); err != nil { + return err + } + if n != len(data) { + return w.err(io.ErrShortWrite) + } + w.written += int64(n) + return w.err(nil) + } + if !w.wroteStreamHeader { + w.wroteStreamHeader = true + if w.snappy { + if err := write([]byte(magicChunkSnappy)); err != nil { + return err + } + } else { + if err := write([]byte(magicChunk)); err != nil { + return err + } + } + } + if err := write(header[:]); err != nil { + return err + } + if err := write(data); err != nil { + return err + } + } + + // Create output... + if !w.wroteStreamHeader { + w.wroteStreamHeader = true + hWriter := make(chan result) + w.output <- hWriter + if w.snappy { + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} + } else { + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} + } + } + + // Copy input. + inbuf := w.buffers.Get().([]byte)[:4] + copy(inbuf, header[:]) + inbuf = append(inbuf, data...) + + output := make(chan result, 1) + // Queue output. + w.output <- output + output <- result{startOffset: w.uncompWritten, b: inbuf} + + return nil +} + // EncodeBuffer will add a buffer to the stream. // This is the fastest way to encode a stream, // but the input buffer cannot be written to by the caller @@ -614,9 +711,9 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) { hWriter := make(chan result) w.output <- hWriter if w.snappy { - hWriter <- []byte(magicChunkSnappy) + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} } else { - hWriter <- []byte(magicChunk) + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} } } @@ -632,6 +729,10 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) { output := make(chan result) // Queue output now, so we keep order. w.output <- output + res := result{ + startOffset: w.uncompWritten, + } + w.uncompWritten += int64(len(uncompressed)) go func() { checksum := crc(uncompressed) @@ -664,7 +765,8 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) { obuf[7] = uint8(checksum >> 24) // Queue final output. - output <- obuf + res.b = obuf + output <- res }() } return nil @@ -708,9 +810,9 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) { hWriter := make(chan result) w.output <- hWriter if w.snappy { - hWriter <- []byte(magicChunkSnappy) + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} } else { - hWriter <- []byte(magicChunk) + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} } } @@ -731,6 +833,11 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) { output := make(chan result) // Queue output now, so we keep order. w.output <- output + res := result{ + startOffset: w.uncompWritten, + } + w.uncompWritten += int64(len(uncompressed)) + go func() { checksum := crc(uncompressed) @@ -763,7 +870,8 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) { obuf[7] = uint8(checksum >> 24) // Queue final output. - output <- obuf + res.b = obuf + output <- res // Put unused buffer back in pool. w.buffers.Put(inbuf) @@ -793,9 +901,9 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) { hWriter := make(chan result) w.output <- hWriter if w.snappy { - hWriter <- []byte(magicChunkSnappy) + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)} } else { - hWriter <- []byte(magicChunk) + hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)} } } @@ -806,6 +914,11 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) { output := make(chan result) // Queue output now, so we keep order. w.output <- output + res := result{ + startOffset: w.uncompWritten, + } + w.uncompWritten += int64(len(uncompressed)) + go func() { checksum := crc(uncompressed) @@ -838,7 +951,8 @@ func (w *Writer) writeFull(inbuf []byte) (errRet error) { obuf[7] = uint8(checksum >> 24) // Queue final output. - output <- obuf + res.b = obuf + output <- res // Put unused buffer back in pool. w.buffers.Put(inbuf) @@ -912,7 +1026,10 @@ func (w *Writer) writeSync(p []byte) (nRet int, errRet error) { if n != len(obuf) { return 0, w.err(io.ErrShortWrite) } + w.err(w.index.add(w.written, w.uncompWritten)) w.written += int64(n) + w.uncompWritten += int64(len(uncompressed)) + if chunkType == chunkTypeUncompressedData { // Write uncompressed data. n, err := w.writer.Write(uncompressed) @@ -961,39 +1078,88 @@ func (w *Writer) Flush() error { res := make(chan result) w.output <- res // Block until this has been picked up. - res <- nil + res <- result{b: nil, startOffset: w.uncompWritten} // When it is closed, we have flushed. <-res return w.err(nil) } // Close calls Flush and then closes the Writer. -// Calling Close multiple times is ok. +// Calling Close multiple times is ok, +// but calling CloseIndex after this will make it not return the index. func (w *Writer) Close() error { + _, err := w.closeIndex(w.appendIndex) + return err +} + +// CloseIndex calls Close and returns an index on first call. +// This is not required if you are only adding index to a stream. +func (w *Writer) CloseIndex() ([]byte, error) { + return w.closeIndex(true) +} + +func (w *Writer) closeIndex(idx bool) ([]byte, error) { err := w.Flush() if w.output != nil { close(w.output) w.writerWg.Wait() w.output = nil } - if w.err(nil) == nil && w.writer != nil && w.pad > 0 { - add := calcSkippableFrame(w.written, int64(w.pad)) - frame, err := skippableFrame(w.ibuf[:0], add, w.randSrc) - if err = w.err(err); err != nil { - return err + + var index []byte + if w.err(nil) == nil && w.writer != nil { + // Create index. + if idx { + compSize := int64(-1) + if w.pad <= 1 { + compSize = w.written + } + index = w.index.appendTo(w.ibuf[:0], w.uncompWritten, compSize) + // Count as written for padding. + if w.appendIndex { + w.written += int64(len(index)) + } + if true { + _, err := w.index.Load(index) + if err != nil { + panic(err) + } + } + } + + if w.pad > 1 { + tmp := w.ibuf[:0] + if len(index) > 0 { + // Allocate another buffer. + tmp = w.buffers.Get().([]byte)[:0] + defer w.buffers.Put(tmp) + } + add := calcSkippableFrame(w.written, int64(w.pad)) + frame, err := skippableFrame(tmp, add, w.randSrc) + if err = w.err(err); err != nil { + return nil, err + } + n, err2 := w.writer.Write(frame) + if err2 == nil && n != len(frame) { + err2 = io.ErrShortWrite + } + _ = w.err(err2) + } + if len(index) > 0 && w.appendIndex { + n, err2 := w.writer.Write(index) + if err2 == nil && n != len(index) { + err2 = io.ErrShortWrite + } + _ = w.err(err2) } - _, err2 := w.writer.Write(frame) - _ = w.err(err2) } - _ = w.err(errClosed) + err = w.err(errClosed) if err == errClosed { - return nil + return index, nil } - return err + return nil, err } -const skippableFrameHeader = 4 - // calcSkippableFrame will return a total size to be added for written // to be divisible by multiple. // The value will always be > skippableFrameHeader. @@ -1057,6 +1223,15 @@ func WriterConcurrency(n int) WriterOption { } } +// WriterAddIndex will append an index to the end of a stream +// when it is closed. +func WriterAddIndex() WriterOption { + return func(w *Writer) error { + w.appendIndex = true + return nil + } +} + // WriterBetterCompression will enable better compression. // EncodeBetter compresses better than Encode but typically with a // 10-40% speed decrease on both compression and decompression. diff --git a/s2/encode_test.go b/s2/encode_test.go index db72385c49..fcf04191fa 100644 --- a/s2/encode_test.go +++ b/s2/encode_test.go @@ -10,6 +10,7 @@ import ( "io" "io/ioutil" "math/rand" + "runtime" "strings" "testing" @@ -19,7 +20,7 @@ import ( func testOptions(t testing.TB) map[string][]WriterOption { var testOptions = map[string][]WriterOption{ - "default": {}, + "default": {WriterAddIndex()}, "better": {WriterBetterCompression()}, "best": {WriterBestCompression()}, "none": {WriterUncompressed()}, @@ -223,6 +224,133 @@ func TestEncoderRegression(t *testing.T) { } } +func TestIndex(t *testing.T) { + fatalErr := func(t testing.TB, err error) { + if err != nil { + t.Fatal(err) + } + } + + // Create a test corpus + var input []byte + if !testing.Short() { + input = make([]byte, 10<<20) + } else { + input = make([]byte, 500<<10) + } + rng := rand.New(rand.NewSource(0xabeefcafe)) + rng.Read(input) + // Make it compressible... + for i, v := range input { + input[i] = '0' + v&3 + } + // Compress it... + var buf bytes.Buffer + // We use smaller blocks just for the example... + enc := NewWriter(&buf, WriterBlockSize(100<<10), WriterAddIndex(), WriterBetterCompression(), WriterConcurrency(runtime.GOMAXPROCS(0))) + todo := input + for len(todo) > 0 { + // Write random sized inputs.. + x := todo[:rng.Intn(1+len(todo)&65535)] + if len(x) == 0 { + x = todo[:1] + } + _, err := enc.Write(x) + fatalErr(t, err) + // Flush once in a while + if rng.Intn(8) == 0 { + err = enc.Flush() + fatalErr(t, err) + } + todo = todo[len(x):] + } + + // Close and also get index... + idxBytes, err := enc.CloseIndex() + fatalErr(t, err) + if false { + // Load the index. + var index Index + _, err = index.Load(idxBytes) + fatalErr(t, err) + t.Log(string(index.JSON())) + } + // This is our compressed stream... + compressed := buf.Bytes() + for wantOffset := int64(0); wantOffset < int64(len(input)); wantOffset += 65531 { + t.Run(fmt.Sprintf("offset-%d", wantOffset), func(t *testing.T) { + // Let's assume we want to read from uncompressed offset 'i' + // and we cannot seek in input, but we have the index. + want := input[wantOffset:] + + // Load the index. + var index Index + _, err = index.Load(idxBytes) + fatalErr(t, err) + + // Find offset in file: + compressedOffset, uncompressedOffset, err := index.Find(wantOffset) + fatalErr(t, err) + + // Offset the input to the compressed offset. + // Notice how we do not provide any bytes before the offset. + in := io.Reader(bytes.NewBuffer(compressed[compressedOffset:])) + + // When creating the decoder we must specify that it should not + // expect a stream identifier at the beginning og the frame. + dec := NewReader(in, ReaderIgnoreStreamIdentifier()) + + // We now have a reader, but it will start outputting at uncompressedOffset, + // and not the actual offset we want, so skip forward to that. + toSkip := wantOffset - uncompressedOffset + err = dec.Skip(toSkip) + fatalErr(t, err) + + // Read the rest of the stream... + got, err := ioutil.ReadAll(dec) + fatalErr(t, err) + if !bytes.Equal(got, want) { + t.Error("Result mismatch", wantOffset) + } + + // Test with stream index... + for i := io.SeekStart; i <= io.SeekEnd; i++ { + t.Run(fmt.Sprintf("seek-%d", i), func(t *testing.T) { + // Read it from a seekable stream + dec = NewReader(bytes.NewReader(compressed)) + + rs, err := dec.ReadSeeker(true, nil) + fatalErr(t, err) + + // Read a little... + var tmp = make([]byte, len(input)/2) + _, err = io.ReadFull(rs, tmp[:]) + fatalErr(t, err) + + toSkip := wantOffset + switch i { + case io.SeekStart: + case io.SeekCurrent: + toSkip = wantOffset - int64(len(input)/2) + case io.SeekEnd: + toSkip = int64(len(input)) - wantOffset + } + gotOffset, err := rs.Seek(toSkip, i) + if gotOffset != wantOffset { + t.Errorf("got offset %d, want %d", gotOffset, wantOffset) + } + // Read the rest of the stream... + got, err := ioutil.ReadAll(dec) + fatalErr(t, err) + if !bytes.Equal(got, want) { + t.Error("Result mismatch", wantOffset) + } + }) + } + }) + } +} + func TestWriterPadding(t *testing.T) { n := 100 if testing.Short() { diff --git a/s2/index.go b/s2/index.go new file mode 100644 index 0000000000..fd857682e4 --- /dev/null +++ b/s2/index.go @@ -0,0 +1,525 @@ +// Copyright (c) 2022+ Klaus Post. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package s2 + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "fmt" + "io" +) + +const ( + S2IndexHeader = "s2idx\x00" + S2IndexTrailer = "\x00xdi2s" + maxIndexEntries = 1 << 16 +) + +// Index represents an S2/Snappy index. +type Index struct { + TotalUncompressed int64 // Total Uncompressed size if known. Will be -1 if unknown. + TotalCompressed int64 // Total Compressed size if known. Will be -1 if unknown. + info []struct { + compressedOffset int64 + uncompressedOffset int64 + } + estBlockUncomp int64 +} + +func (i *Index) reset(maxBlock int) { + i.estBlockUncomp = int64(maxBlock) + i.TotalCompressed = -1 + i.TotalUncompressed = -1 + if len(i.info) > 0 { + i.info = i.info[:0] + } +} + +// allocInfos will allocate an empty slice of infos. +func (i *Index) allocInfos(n int) { + if n > maxIndexEntries { + panic("n > maxIndexEntries") + } + i.info = make([]struct { + compressedOffset int64 + uncompressedOffset int64 + }, 0, n) +} + +// add an uncompressed and compressed pair. +// Entries must be sent in order. +func (i *Index) add(compressedOffset, uncompressedOffset int64) error { + if i == nil { + return nil + } + lastIdx := len(i.info) - 1 + if lastIdx >= 0 { + latest := i.info[lastIdx] + if latest.uncompressedOffset == uncompressedOffset { + // Uncompressed didn't change, don't add entry, + // but update start index. + latest.compressedOffset = compressedOffset + i.info[lastIdx] = latest + return nil + } + if latest.uncompressedOffset > uncompressedOffset { + return fmt.Errorf("internal error: Earlier uncompressed received (%d > %d)", latest.uncompressedOffset, uncompressedOffset) + } + if latest.compressedOffset > compressedOffset { + return fmt.Errorf("internal error: Earlier compressed received (%d > %d)", latest.uncompressedOffset, uncompressedOffset) + } + } + i.info = append(i.info, struct { + compressedOffset int64 + uncompressedOffset int64 + }{compressedOffset: compressedOffset, uncompressedOffset: uncompressedOffset}) + return nil +} + +// Find the offset at or before the wanted (uncompressed) offset. +// If offset is 0 or positive it is the offset from the beginning of the file. +// If the uncompressed size is known, the offset must be within the file. +// If an offset outside the file is requested io.ErrUnexpectedEOF is returned. +// If the offset is negative, it is interpreted as the distance from the end of the file, +// where -1 represents the last byte. +// If offset from the end of the file is requested, but size is unknown, +// ErrUnsupported will be returned. +func (i *Index) Find(offset int64) (compressedOff, uncompressedOff int64, err error) { + if i.TotalUncompressed < 0 { + return 0, 0, ErrCorrupt + } + if offset < 0 { + offset = i.TotalUncompressed + offset + if offset < 0 { + return 0, 0, io.ErrUnexpectedEOF + } + } + if offset > i.TotalUncompressed { + return 0, 0, io.ErrUnexpectedEOF + } + for _, info := range i.info { + if info.uncompressedOffset > offset { + break + } + compressedOff = info.compressedOffset + uncompressedOff = info.uncompressedOffset + } + return compressedOff, uncompressedOff, nil +} + +// reduce to stay below maxIndexEntries +func (i *Index) reduce() { + if len(i.info) < maxIndexEntries && i.estBlockUncomp >= 1<<20 { + return + } + + // Algorithm, keep 1, remove removeN entries... + removeN := (len(i.info) + 1) / maxIndexEntries + src := i.info + j := 0 + + // Each block should be at least 1MB, but don't reduce below 1000 entries. + for i.estBlockUncomp*(int64(removeN)+1) < 1<<20 && len(i.info)/(removeN+1) > 1000 { + removeN++ + } + for idx := 0; idx < len(src); idx++ { + i.info[j] = src[idx] + j++ + idx += removeN + } + i.info = i.info[:j] + // Update maxblock estimate. + i.estBlockUncomp += i.estBlockUncomp * int64(removeN) +} + +func (i *Index) appendTo(b []byte, uncompTotal, compTotal int64) []byte { + i.reduce() + var tmp [binary.MaxVarintLen64]byte + + initSize := len(b) + // We make the start a skippable header+size. + b = append(b, ChunkTypeIndex, 0, 0, 0) + b = append(b, []byte(S2IndexHeader)...) + // Total Uncompressed size + n := binary.PutVarint(tmp[:], uncompTotal) + b = append(b, tmp[:n]...) + // Total Compressed size + n = binary.PutVarint(tmp[:], compTotal) + b = append(b, tmp[:n]...) + // Put EstBlockUncomp size + n = binary.PutVarint(tmp[:], i.estBlockUncomp) + b = append(b, tmp[:n]...) + // Put length + n = binary.PutVarint(tmp[:], int64(len(i.info))) + b = append(b, tmp[:n]...) + + // Check if we should add uncompressed offsets + var hasUncompressed byte + for idx, info := range i.info { + if idx == 0 { + if info.uncompressedOffset != 0 { + hasUncompressed = 1 + break + } + continue + } + if info.uncompressedOffset != i.info[idx-1].uncompressedOffset+i.estBlockUncomp { + hasUncompressed = 1 + break + } + } + b = append(b, hasUncompressed) + + // Add each entry + if hasUncompressed == 1 { + for idx, info := range i.info { + uOff := info.uncompressedOffset + if idx > 0 { + prev := i.info[idx-1] + uOff -= prev.uncompressedOffset + (i.estBlockUncomp) + } + n = binary.PutVarint(tmp[:], uOff) + b = append(b, tmp[:n]...) + } + } + + // Initial compressed size estimate. + cPredict := i.estBlockUncomp / 2 + + for idx, info := range i.info { + cOff := info.compressedOffset + if idx > 0 { + prev := i.info[idx-1] + cOff -= prev.compressedOffset + cPredict + // Update compressed size prediction, with half the error. + cPredict += cOff / 2 + } + n = binary.PutVarint(tmp[:], cOff) + b = append(b, tmp[:n]...) + } + + // Add Total Size. + // Stored as fixed size for easier reading. + binary.LittleEndian.PutUint32(tmp[:], uint32(len(b)-initSize+4+len(S2IndexTrailer))) + b = append(b, tmp[:4]...) + // Trailer + b = append(b, []byte(S2IndexTrailer)...) + + // Update size + chunkLen := len(b) - initSize - skippableFrameHeader + b[initSize+1] = uint8(chunkLen >> 0) + b[initSize+2] = uint8(chunkLen >> 8) + b[initSize+3] = uint8(chunkLen >> 16) + //fmt.Printf("chunklen: 0x%x Uncomp:%d, Comp:%d\n", chunkLen, uncompTotal, compTotal) + return b +} + +// Load a binary index. +// A zero value Index can be used or a previous one can be reused. +func (i *Index) Load(b []byte) ([]byte, error) { + if len(b) <= 4+len(S2IndexHeader)+len(S2IndexTrailer) { + return b, io.ErrUnexpectedEOF + } + if b[0] != ChunkTypeIndex { + return b, ErrCorrupt + } + chunkLen := int(b[1]) | int(b[2])<<8 | int(b[3])<<16 + b = b[4:] + + // Validate we have enough... + if len(b) < chunkLen { + return b, io.ErrUnexpectedEOF + } + if !bytes.Equal(b[:len(S2IndexHeader)], []byte(S2IndexHeader)) { + return b, ErrUnsupported + } + b = b[len(S2IndexHeader):] + + // Total Uncompressed + if v, n := binary.Varint(b); n <= 0 || v < 0 { + return b, ErrCorrupt + } else { + i.TotalUncompressed = v + b = b[n:] + } + + // Total Compressed + if v, n := binary.Varint(b); n <= 0 { + return b, ErrCorrupt + } else { + i.TotalCompressed = v + b = b[n:] + } + + // Read EstBlockUncomp + if v, n := binary.Varint(b); n <= 0 { + return b, ErrCorrupt + } else { + if v < 0 { + return b, ErrCorrupt + } + i.estBlockUncomp = v + b = b[n:] + } + + var entries int + if v, n := binary.Varint(b); n <= 0 { + return b, ErrCorrupt + } else { + if v < 0 || v > maxIndexEntries { + return b, ErrCorrupt + } + entries = int(v) + b = b[n:] + } + if cap(i.info) < entries { + i.allocInfos(entries) + } + i.info = i.info[:entries] + + if len(b) < 1 { + return b, io.ErrUnexpectedEOF + } + hasUncompressed := b[0] + b = b[1:] + if hasUncompressed&1 != hasUncompressed { + return b, ErrCorrupt + } + + // Add each uncompressed entry + for idx := range i.info { + var uOff int64 + if hasUncompressed != 0 { + // Load delta + if v, n := binary.Varint(b); n <= 0 { + return b, ErrCorrupt + } else { + uOff = v + b = b[n:] + } + } + + if idx > 0 { + prev := i.info[idx-1].uncompressedOffset + uOff += prev + (i.estBlockUncomp) + if uOff <= prev { + return b, ErrCorrupt + } + } + if uOff < 0 { + return b, ErrCorrupt + } + i.info[idx].uncompressedOffset = uOff + } + + // Initial compressed size estimate. + cPredict := i.estBlockUncomp / 2 + + // Add each compressed entry + for idx := range i.info { + var cOff int64 + if v, n := binary.Varint(b); n <= 0 { + return b, ErrCorrupt + } else { + cOff = v + b = b[n:] + } + + if idx > 0 { + // Update compressed size prediction, with half the error. + cPredictNew := cPredict + cOff/2 + + prev := i.info[idx-1].compressedOffset + cOff += prev + cPredict + if cOff <= prev { + return b, ErrCorrupt + } + cPredict = cPredictNew + } + if cOff < 0 { + return b, ErrCorrupt + } + i.info[idx].compressedOffset = cOff + } + if len(b) < 4+len(S2IndexTrailer) { + return b, io.ErrUnexpectedEOF + } + // Skip size... + b = b[4:] + + // Check trailer... + if !bytes.Equal(b[:len(S2IndexTrailer)], []byte(S2IndexTrailer)) { + return b, ErrCorrupt + } + return b[len(S2IndexTrailer):], nil +} + +// LoadStream will load an index from the end of the supplied stream. +// ErrUnsupported will be returned if the signature cannot be found. +// ErrCorrupt will be returned if unexpected values are found. +// io.ErrUnexpectedEOF is returned if there are too few bytes. +// IO errors are returned as-is. +func (i *Index) LoadStream(rs io.ReadSeeker) error { + // Go to end. + _, err := rs.Seek(-10, io.SeekEnd) + if err != nil { + return err + } + var tmp [10]byte + _, err = io.ReadFull(rs, tmp[:]) + if err != nil { + return err + } + // Check trailer... + if !bytes.Equal(tmp[4:4+len(S2IndexTrailer)], []byte(S2IndexTrailer)) { + return ErrUnsupported + } + sz := binary.LittleEndian.Uint32(tmp[:4]) + if sz > maxChunkSize+skippableFrameHeader { + return ErrCorrupt + } + _, err = rs.Seek(-int64(sz), io.SeekEnd) + if err != nil { + return err + } + + // Read index. + buf := make([]byte, sz) + _, err = io.ReadFull(rs, buf) + if err != nil { + return err + } + _, err = i.Load(buf) + return err +} + +// IndexStream will return an index for a stream. +// The stream structure will be checked, but +// data within blocks is not verified. +// The returned index can either be appended to the end of the stream +// or stored separately. +func IndexStream(r io.Reader) ([]byte, error) { + var i Index + var buf [maxChunkSize]byte + var readHeader bool + for { + _, err := io.ReadFull(r, buf[:4]) + if err != nil { + if err == io.EOF { + return i.appendTo(nil, i.TotalUncompressed, i.TotalCompressed), nil + } + return nil, err + } + // Start of this chunk. + startChunk := i.TotalCompressed + i.TotalCompressed += 4 + + chunkType := buf[0] + if !readHeader { + if chunkType != chunkTypeStreamIdentifier { + return nil, ErrCorrupt + } + readHeader = true + } + chunkLen := int(buf[1]) | int(buf[2])<<8 | int(buf[3])<<16 + if chunkLen < checksumSize { + return nil, ErrCorrupt + } + + i.TotalCompressed += int64(chunkLen) + _, err = io.ReadFull(r, buf[:chunkLen]) + if err != nil { + return nil, io.ErrUnexpectedEOF + } + // The chunk types are specified at + // https://github.com/google/snappy/blob/master/framing_format.txt + switch chunkType { + case chunkTypeCompressedData: + // Section 4.2. Compressed data (chunk type 0x00). + // Skip checksum. + dLen, err := DecodedLen(buf[checksumSize:]) + if err != nil { + return nil, err + } + if dLen > maxBlockSize { + return nil, ErrCorrupt + } + if i.estBlockUncomp == 0 { + // Use first block for estimate... + i.estBlockUncomp = int64(dLen) + } + err = i.add(startChunk, i.TotalUncompressed) + if err != nil { + return nil, err + } + i.TotalUncompressed += int64(dLen) + continue + case chunkTypeUncompressedData: + n2 := chunkLen - checksumSize + if n2 > maxBlockSize { + return nil, ErrCorrupt + } + if i.estBlockUncomp == 0 { + // Use first block for estimate... + i.estBlockUncomp = int64(n2) + } + err = i.add(startChunk, i.TotalUncompressed) + if err != nil { + return nil, err + } + i.TotalUncompressed += int64(n2) + continue + case chunkTypeStreamIdentifier: + // Section 4.1. Stream identifier (chunk type 0xff). + if chunkLen != len(magicBody) { + return nil, ErrCorrupt + } + + if string(buf[:len(magicBody)]) != magicBody { + if string(buf[:len(magicBody)]) != magicBodySnappy { + return nil, ErrCorrupt + } + } + + continue + } + + if chunkType <= 0x7f { + // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f). + return nil, ErrUnsupported + } + if chunkLen > maxChunkSize { + return nil, ErrUnsupported + } + // Section 4.4 Padding (chunk type 0xfe). + // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd). + } +} + +// JSON returns the index as JSON text. +func (i *Index) JSON() []byte { + x := struct { + TotalUncompressed int64 `json:"total_uncompressed"` // Total Uncompressed size if known. Will be -1 if unknown. + TotalCompressed int64 `json:"total_compressed"` // Total Compressed size if known. Will be -1 if unknown. + Offsets []struct { + CompressedOffset int64 `json:"compressed"` + UncompressedOffset int64 `json:"uncompressed"` + } `json:"offsets"` + EstBlockUncomp int64 `json:"est_block_uncompressed"` + }{ + TotalUncompressed: i.TotalUncompressed, + TotalCompressed: i.TotalCompressed, + EstBlockUncomp: i.estBlockUncomp, + } + for _, v := range i.info { + x.Offsets = append(x.Offsets, struct { + CompressedOffset int64 `json:"compressed"` + UncompressedOffset int64 `json:"uncompressed"` + }{CompressedOffset: v.compressedOffset, UncompressedOffset: v.uncompressedOffset}) + } + b, _ := json.MarshalIndent(x, "", " ") + return b +} diff --git a/s2/index_test.go b/s2/index_test.go new file mode 100644 index 0000000000..38975df21b --- /dev/null +++ b/s2/index_test.go @@ -0,0 +1,101 @@ +package s2_test + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "math/rand" + "sync" + + "github.com/klauspost/compress/s2" +) + +func ExampleIndex_Load() { + fatalErr := func(err error) { + if err != nil { + panic(err) + } + } + + // Create a test corpus + tmp := make([]byte, 5<<20) + rng := rand.New(rand.NewSource(0xbeefcafe)) + rng.Read(tmp) + // Make it compressible... + for i, v := range tmp { + tmp[i] = '0' + v&3 + } + // Compress it... + var buf bytes.Buffer + // We use smaller blocks just for the example... + enc := s2.NewWriter(&buf, s2.WriterBlockSize(100<<10)) + err := enc.EncodeBuffer(tmp) + fatalErr(err) + + // Close and get index... + idxBytes, err := enc.CloseIndex() + fatalErr(err) + + // This is our compressed stream... + compressed := buf.Bytes() + + var once sync.Once + for wantOffset := int64(0); wantOffset < int64(len(tmp)); wantOffset += 555555 { + // Let's assume we want to read from uncompressed offset 'i' + // and we cannot seek in input, but we have the index. + want := tmp[wantOffset:] + + // Load the index. + var index s2.Index + _, err = index.Load(idxBytes) + fatalErr(err) + + // Find offset in file: + compressedOffset, uncompressedOffset, err := index.Find(wantOffset) + fatalErr(err) + + // Offset the input to the compressed offset. + // Notice how we do not provide any bytes before the offset. + input := io.Reader(bytes.NewBuffer(compressed[compressedOffset:])) + if _, ok := input.(io.Seeker); !ok { + // Notice how the input cannot be seeked... + once.Do(func() { + fmt.Println("Input does not support seeking...") + }) + } else { + panic("did you implement seeking on bytes.Buffer?") + } + + // When creating the decoder we must specify that it should not + // expect a stream identifier at the beginning og the frame. + dec := s2.NewReader(input, s2.ReaderIgnoreStreamIdentifier()) + + // We now have a reader, but it will start outputting at uncompressedOffset, + // and not the actual offset we want, so skip forward to that. + toSkip := wantOffset - uncompressedOffset + err = dec.Skip(toSkip) + fatalErr(err) + + // Read the rest of the stream... + got, err := ioutil.ReadAll(dec) + fatalErr(err) + if bytes.Equal(got, want) { + fmt.Println("Successfully skipped forward to", wantOffset) + } else { + fmt.Println("Failed to skip forward to", wantOffset) + } + } + // OUTPUT: + //Input does not support seeking... + //Successfully skipped forward to 0 + //Successfully skipped forward to 555555 + //Successfully skipped forward to 1111110 + //Successfully skipped forward to 1666665 + //Successfully skipped forward to 2222220 + //Successfully skipped forward to 2777775 + //Successfully skipped forward to 3333330 + //Successfully skipped forward to 3888885 + //Successfully skipped forward to 4444440 + //Successfully skipped forward to 4999995 +} diff --git a/s2/s2.go b/s2/s2.go index 89d69e965b..dae3f731fa 100644 --- a/s2/s2.go +++ b/s2/s2.go @@ -87,6 +87,9 @@ const ( // minBlockSize is the minimum size of block setting when creating a writer. minBlockSize = 4 << 10 + skippableFrameHeader = 4 + maxChunkSize = 1<<24 - 1 // 16777215 + // Default block size defaultBlockSize = 1 << 20 @@ -99,6 +102,7 @@ const ( const ( chunkTypeCompressedData = 0x00 chunkTypeUncompressedData = 0x01 + ChunkTypeIndex = 0x99 chunkTypePadding = 0xfe chunkTypeStreamIdentifier = 0xff )