Skip to content

Commit

Permalink
s2: Add stream index (#462)
Browse files Browse the repository at this point in the history
# Stream Seek Index

S2 and Snappy streams can have indexes. These indexes will allow random seeking within the compressed data.

The index can either be appended to the stream as a skippable block or returned for separate storage.

When the index is appended to a stream it will be skipped by regular decoders, 
so the output remains compatible with other decoders. 

## Creating an Index

To automatically add an index to a stream, add `WriterAddIndex()` option to your writer.
Then the index will be added to the stream when `Close()` is called.

```
    // Add Index to stream...
	enc := s2.NewWriter(w, s2.WriterAddIndex())
	io.Copy(enc, r)
	enc.Close()
```

If you want to store the index separately, you can use `CloseIndex()` instead of the regular `Close()`.
This will return the index. Note that `CloseIndex()` should only be called once, and you shouldn't call `Close()`.

```
    // Get index for separate storage... 
	enc := s2.NewWriter(w)
	io.Copy(enc, r)
	index, err := enc.CloseIndex()
```

The `index` can then be used needing to read from the stream. 
This means the index can be used without needing to seek to the end of the stream 
or for manually forwarding streams. See below.

Finally, an existing S2/Snappy stream can be indexed using the `s2.IndexStream(r io.Reader)` function.

## Using Indexes

To use indexes there is a `ReadSeeker(random bool, index []byte) (*ReadSeeker, error)` function available.

Calling ReadSeeker will return an [io.ReadSeeker](https://pkg.go.dev/io#ReadSeeker) compatible version of the reader.

If 'random' is specified the returned io.Seeker can be used for random seeking, otherwise only forward seeking is supported.
Enabling random seeking requires the original input to support the [io.Seeker](https://pkg.go.dev/io#Seeker) interface.

```
	dec := s2.NewReader(r)
	rs, err := dec.ReadSeeker(false, nil)
	rs.Seek(wantOffset, io.SeekStart)	
```

Get a seeker to seek forward. Since no index is provided, the index is read from the stream.
This requires that an index was added and that `r` supports the [io.Seeker](https://pkg.go.dev/io#Seeker) interface.

A custom index can be specified which will be used if supplied.
When using a custom index, it will not be read from the input stream.

```
	dec := s2.NewReader(r)
	rs, err := dec.ReadSeeker(false, index)
	rs.Seek(wantOffset, io.SeekStart)	
```

This will read the index from `index`. Since we specify non-random (forward only) seeking `r` does not have to be an io.Seeker

```
	dec := s2.NewReader(r)
	rs, err := dec.ReadSeeker(true, index)
	rs.Seek(wantOffset, io.SeekStart)	
```

Finally, since we specify that we want to do random seeking `r` must be an io.Seeker. 

The returned [ReadSeeker](https://pkg.go.dev/github.com/klauspost/compress/s2#ReadSeeker) contains a shallow reference to the existing Reader,
meaning changes performed to one is reflected in the other.

To check if a stream contains an index at the end, the `(*Index).LoadStream(rs io.ReadSeeker) error` can be used.

## Manually Forwarding Streams

Indexes can also be read outside the decoder using the [Index](https://pkg.go.dev/github.com/klauspost/compress/s2#Index) type.
This can be used for parsing indexes, either separate or in streams.

In some cases it may not be possible to serve a seekable stream.
This can for instance be an HTTP stream, where the Range request 
is sent at the start of the stream. 

With a little bit of extra code it is still possible to use indexes
to forward to specific offset with a single forward skip. 

It is possible to load the index manually like this: 
```
	var index s2.Index
	_, err = index.Load(idxBytes)
```

This can be used to figure out how much to offset the compressed stream:

```
	compressedOffset, uncompressedOffset, err := index.Find(wantOffset)
```

The `compressedOffset` is the number of bytes that should be skipped 
from the beginning of the compressed file.

The `uncompressedOffset` will then be offset of the uncompressed bytes returned
when decoding from that position. This will always be <= wantOffset.

When creating a decoder it must be specified that it should *not* expect a stream identifier
at the beginning of the stream. Assuming the io.Reader `r` has been forwarded to `compressedOffset`
we create the decoder like this:

```
	dec := s2.NewReader(r, s2.ReaderIgnoreStreamIdentifier())
```

We are not completely done. We still need to forward the stream the uncompressed bytes we didn't want.
This is done using the regular "Skip" function:

```
	err = dec.Skip(wantOffset - uncompressedOffset)
```

This will ensure that we are at exactly the offset we want, and reading from `dec` will start at the requested offset.
  • Loading branch information
klauspost authored Jan 11, 2022
1 parent fd6816a commit 469ba13
Show file tree
Hide file tree
Showing 10 changed files with 1,793 additions and 91 deletions.
256 changes: 238 additions & 18 deletions s2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ This is important, so you don't have to worry about spending CPU cycles on alrea
* Concurrent stream compression
* Faster decompression, even for Snappy compatible content
* Ability to quickly skip forward in compressed stream
* Random seeking with indexes
* Compatible with reading Snappy compressed content
* Smaller block size overhead on incompressible blocks
* Block concatenation
Expand All @@ -29,8 +30,8 @@ This is important, so you don't have to worry about spending CPU cycles on alrea

## Drawbacks over Snappy

* Not optimized for 32 bit systems.
* Streams use slightly more memory due to larger blocks and concurrency (configurable).
* Not optimized for 32 bit systems
* Streams use slightly more memory due to larger blocks and concurrency (configurable)

# Usage

Expand Down Expand Up @@ -141,7 +142,7 @@ Binaries can be downloaded on the [Releases Page](https://github.com/klauspost/c

Installing then requires Go to be installed. To install them, use:

`go install github.com/klauspost/compress/s2/cmd/s2c && go install github.com/klauspost/compress/s2/cmd/s2d`
`go install github.com/klauspost/compress/s2/cmd/s2c@latest && go install github.com/klauspost/compress/s2/cmd/s2d@latest`

To build binaries to the current folder use:

Expand Down Expand Up @@ -176,6 +177,8 @@ Options:
Compress faster, but with a minor compression loss
-help
Display help
-index
Add seek index (default true)
-o string
Write output to another file. Single input file only
-pad string
Expand Down Expand Up @@ -217,11 +220,15 @@ Options:
Display help
-o string
Write output to another file. Single input file only
-q Don't write any output to terminal, except errors
-offset string
Start at offset. Examples: 92, 64K, 256K, 1M, 4M. Requires Index
-q Don't write any output to terminal, except errors
-rm
Delete source file(s) after successful decompression
Delete source file(s) after successful decompression
-safe
Do not overwrite output files
Do not overwrite output files
-tail string
Return last of compressed file. Examples: 92, 64K, 256K, 1M, 4M. Requires Index
-verify
Verify files, but do not write output
```
Expand Down Expand Up @@ -633,12 +640,12 @@ Compression and speed is typically a bit better `MaxEncodedLen` is also smaller
Comparison of [`webdevdata.org-2015-01-07-subset`](https://files.klauspost.com/compress/webdevdata.org-2015-01-07-4GB-subset.7z),
53927 files, total input size: 4,014,735,833 bytes. amd64, single goroutine used:

| Encoder | Size | MB/s | Reduction |
|-----------------------|------------|--------|------------
| snappy.Encode | 1128706759 | 725.59 | 71.89% |
| s2.EncodeSnappy | 1093823291 | 899.16 | 72.75% |
| s2.EncodeSnappyBetter | 1001158548 | 578.49 | 75.06% |
| s2.EncodeSnappyBest | 944507998 | 66.00 | 76.47% |
| Encoder | Size | MB/s | Reduction |
|-----------------------|------------|------------|------------
| snappy.Encode | 1128706759 | 725.59 | 71.89% |
| s2.EncodeSnappy | 1093823291 | **899.16** | 72.75% |
| s2.EncodeSnappyBetter | 1001158548 | 578.49 | 75.06% |
| s2.EncodeSnappyBest | 944507998 | 66.00 | **76.47%**|

## Streams

Expand All @@ -649,11 +656,11 @@ Comparison of different streams, AMD Ryzen 3950x, 16 cores. Size and throughput:

| File | snappy.NewWriter | S2 Snappy | S2 Snappy, Better | S2 Snappy, Best |
|-----------------------------|--------------------------|---------------------------|--------------------------|-------------------------|
| nyc-taxi-data-10M.csv | 1316042016 - 517.54MB/s | 1307003093 - 8406.29MB/s | 1174534014 - 4984.35MB/s | 1115904679 - 177.81MB/s |
| enwik10 | 5088294643 - 433.45MB/s | 5175840939 - 8454.52MB/s | 4560784526 - 4403.10MB/s | 4340299103 - 159.71MB/s |
| 10gb.tar | 6056946612 - 703.25MB/s | 6208571995 - 9035.75MB/s | 5741646126 - 2402.08MB/s | 5548973895 - 171.17MB/s |
| github-june-2days-2019.json | 1525176492 - 908.11MB/s | 1476519054 - 12625.93MB/s | 1400547532 - 6163.61MB/s | 1321887137 - 200.71MB/s |
| consensus.db.10gb | 5412897703 - 1054.38MB/s | 5354073487 - 12634.82MB/s | 5335069899 - 2472.23MB/s | 5201000954 - 166.32MB/s |
| nyc-taxi-data-10M.csv | 1316042016 - 539.47MB/s | 1307003093 - 10132.73MB/s | 1174534014 - 5002.44MB/s | 1115904679 - 177.97MB/s |
| enwik10 (xml) | 5088294643 - 451.13MB/s | 5175840939 - 9440.69MB/s | 4560784526 - 4487.21MB/s | 4340299103 - 158.92MB/s |
| 10gb.tar (mixed) | 6056946612 - 729.73MB/s | 6208571995 - 9978.05MB/s | 5741646126 - 4919.98MB/s | 5548973895 - 180.44MB/s |
| github-june-2days-2019.json | 1525176492 - 933.00MB/s | 1476519054 - 13150.12MB/s | 1400547532 - 5803.40MB/s | 1321887137 - 204.29MB/s |
| consensus.db.10gb (db) | 5412897703 - 1102.14MB/s | 5354073487 - 13562.91MB/s | 5335069899 - 5294.73MB/s | 5201000954 - 175.72MB/s |

# Decompression

Expand All @@ -679,7 +686,220 @@ The 10 byte 'stream identifier' of the second stream can optionally be stripped,

Blocks can be concatenated using the `ConcatBlocks` function.

Snappy blocks/streams can safely be concatenated with S2 blocks and streams.
Snappy blocks/streams can safely be concatenated with S2 blocks and streams.
Streams with indexes (see below) will currently not work on concatenated streams.

# Stream Seek Index

S2 and Snappy streams can have indexes. These indexes will allow random seeking within the compressed data.

The index can either be appended to the stream as a skippable block or returned for separate storage.

When the index is appended to a stream it will be skipped by regular decoders,
so the output remains compatible with other decoders.

## Creating an Index

To automatically add an index to a stream, add `WriterAddIndex()` option to your writer.
Then the index will be added to the stream when `Close()` is called.

```
// Add Index to stream...
enc := s2.NewWriter(w, s2.WriterAddIndex())
io.Copy(enc, r)
enc.Close()
```

If you want to store the index separately, you can use `CloseIndex()` instead of the regular `Close()`.
This will return the index. Note that `CloseIndex()` should only be called once, and you shouldn't call `Close()`.

```
// Get index for separate storage...
enc := s2.NewWriter(w)
io.Copy(enc, r)
index, err := enc.CloseIndex()
```

The `index` can then be used needing to read from the stream.
This means the index can be used without needing to seek to the end of the stream
or for manually forwarding streams. See below.

Finally, an existing S2/Snappy stream can be indexed using the `s2.IndexStream(r io.Reader)` function.

## Using Indexes

To use indexes there is a `ReadSeeker(random bool, index []byte) (*ReadSeeker, error)` function available.

Calling ReadSeeker will return an [io.ReadSeeker](https://pkg.go.dev/io#ReadSeeker) compatible version of the reader.

If 'random' is specified the returned io.Seeker can be used for random seeking, otherwise only forward seeking is supported.
Enabling random seeking requires the original input to support the [io.Seeker](https://pkg.go.dev/io#Seeker) interface.

```
dec := s2.NewReader(r)
rs, err := dec.ReadSeeker(false, nil)
rs.Seek(wantOffset, io.SeekStart)
```

Get a seeker to seek forward. Since no index is provided, the index is read from the stream.
This requires that an index was added and that `r` supports the [io.Seeker](https://pkg.go.dev/io#Seeker) interface.

A custom index can be specified which will be used if supplied.
When using a custom index, it will not be read from the input stream.

```
dec := s2.NewReader(r)
rs, err := dec.ReadSeeker(false, index)
rs.Seek(wantOffset, io.SeekStart)
```

This will read the index from `index`. Since we specify non-random (forward only) seeking `r` does not have to be an io.Seeker

```
dec := s2.NewReader(r)
rs, err := dec.ReadSeeker(true, index)
rs.Seek(wantOffset, io.SeekStart)
```

Finally, since we specify that we want to do random seeking `r` must be an io.Seeker.

The returned [ReadSeeker](https://pkg.go.dev/github.com/klauspost/compress/s2#ReadSeeker) contains a shallow reference to the existing Reader,
meaning changes performed to one is reflected in the other.

To check if a stream contains an index at the end, the `(*Index).LoadStream(rs io.ReadSeeker) error` can be used.

## Manually Forwarding Streams

Indexes can also be read outside the decoder using the [Index](https://pkg.go.dev/github.com/klauspost/compress/s2#Index) type.
This can be used for parsing indexes, either separate or in streams.

In some cases it may not be possible to serve a seekable stream.
This can for instance be an HTTP stream, where the Range request
is sent at the start of the stream.

With a little bit of extra code it is still possible to use indexes
to forward to specific offset with a single forward skip.

It is possible to load the index manually like this:
```
var index s2.Index
_, err = index.Load(idxBytes)
```

This can be used to figure out how much to offset the compressed stream:

```
compressedOffset, uncompressedOffset, err := index.Find(wantOffset)
```

The `compressedOffset` is the number of bytes that should be skipped
from the beginning of the compressed file.

The `uncompressedOffset` will then be offset of the uncompressed bytes returned
when decoding from that position. This will always be <= wantOffset.

When creating a decoder it must be specified that it should *not* expect a stream identifier
at the beginning of the stream. Assuming the io.Reader `r` has been forwarded to `compressedOffset`
we create the decoder like this:

```
dec := s2.NewReader(r, s2.ReaderIgnoreStreamIdentifier())
```

We are not completely done. We still need to forward the stream the uncompressed bytes we didn't want.
This is done using the regular "Skip" function:

```
err = dec.Skip(wantOffset - uncompressedOffset)
```

This will ensure that we are at exactly the offset we want, and reading from `dec` will start at the requested offset.

## Index Format:

Each block is structured as a snappy skippable block, with the chunk ID 0x99.

The block can be read from the front, but contains information so it can be read from the back as well.

Numbers are stored as fixed size little endian values or [zigzag encoded](https://developers.google.com/protocol-buffers/docs/encoding#signed_integers) [base 128 varints](https://developers.google.com/protocol-buffers/docs/encoding),
with un-encoded value length of 64 bits, unless other limits are specified.

| Content | Format |
|---------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|
| ID, `[1]byte` | Always 0x99. |
| Data Length, `[3]byte` | 3 byte little-endian length of the chunk in bytes, following this. |
| Header `[6]byte` | Header, must be `[115, 50, 105, 100, 120, 0]` or in text: "s2idx\x00". |
| UncompressedSize, Varint | Total Uncompressed size. |
| CompressedSize, Varint | Total Compressed size if known. Should be -1 if unknown. |
| EstBlockSize, Varint | Block Size, used for guessing uncompressed offsets. Must be >= 0. |
| Entries, Varint | Number of Entries in index, must be < 65536 and >=0. |
| HasUncompressedOffsets `byte` | 0 if no uncompressed offsets are present, 1 if present. Other values are invalid. |
| UncompressedOffsets, [Entries]VarInt | Uncompressed offsets. See below how to decode. |
| CompressedOffsets, [Entries]VarInt | Compressed offsets. See below how to decode. |
| Block Size, `[4]byte` | Little Endian total encoded size (including header and trailer). Can be used for searching backwards to start of block. |
| Trailer `[6]byte` | Trailer, must be `[0, 120, 100, 105, 50, 115]` or in text: "\x00xdi2s". Can be used for identifying block from end of stream. |

For regular streams the uncompressed offsets are fully predictable,
so `HasUncompressedOffsets` allows to specify that compressed blocks all have
exactly `EstBlockSize` bytes of uncompressed content.

Entries *must* be in order, starting with the lowest offset,
and there *must* be no uncompressed offset duplicates.
Entries *may* point to the start of a skippable block,
but it is then not allowed to also have an entry for the next block since
that would give an uncompressed offset duplicate.

There is no requirement for all blocks to be represented in the index.
In fact there is a maximum of 65536 block entries in an index.

The writer can use any method to reduce the number of entries.
An implicit block start at 0,0 can be assumed.

### Decoding entries:

```
// Read Uncompressed entries.
// Each assumes EstBlockSize delta from previous.
for each entry {
uOff = 0
if HasUncompressedOffsets == 1 {
uOff = ReadVarInt // Read value from stream
}
// Except for the first entry, use previous values.
if entryNum == 0 {
entry[entryNum].UncompressedOffset = uOff
continue
}
// Uncompressed uses previous offset and adds EstBlockSize
entry[entryNum].UncompressedOffset = entry[entryNum-1].UncompressedOffset + EstBlockSize
}
// Guess that the first block will be 50% of uncompressed size.
// Integer truncating division must be used.
CompressGuess := EstBlockSize / 2
// Read Compressed entries.
// Each assumes CompressGuess delta from previous.
// CompressGuess is adjusted for each value.
for each entry {
cOff = ReadVarInt // Read value from stream
// Except for the first entry, use previous values.
if entryNum == 0 {
entry[entryNum].CompressedOffset = cOff
continue
}
// Compressed uses previous and our estimate.
entry[entryNum].CompressedOffset = entry[entryNum-1].CompressedOffset + CompressGuess
// Adjust compressed offset for next loop, integer truncating division must be used.
CompressGuess += cOff/2
}
```

# Format Extensions

Expand Down
Loading

0 comments on commit 469ba13

Please sign in to comment.