Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement columnar encoders and TSM compaction process #10124

Closed
17 tasks done
stuartcarnie opened this issue Jul 25, 2018 · 1 comment
Closed
17 tasks done

Implement columnar encoders and TSM compaction process #10124

stuartcarnie opened this issue Jul 25, 2018 · 1 comment
Assignees

Comments

@stuartcarnie
Copy link
Contributor

stuartcarnie commented Jul 25, 2018

Compactions

There are two distinct compaction processes:

  1. TSM compactions, which aim to replace a set of TSM files with versions that are of increased density along with removing any tombstoned data.
  2. WAL Snapshots, which write time / value data from the in-memory cache to level-0 TSM files

For reference, issue #9981 implements the columnar decoders and readers.

For the sake of simplicity, some examples will refer to the Float data type, however, any of the other supported data types (Integer, String, etc) could be substituted.

TSM Compactions

When merging blocks of data for a series + field, the existing compaction implementation decodes and encodes compressed data using the iterative decoder / encoder APIs. This process should be refactored to use batch-oriented APIs to improve performance.

Starting from the top and working down, the key component responsible for merging a set of TSM files is the tsmKeyIterator, which is:

  • an implementation of the LSM Tree merge operation
  • that merges a specific set of TSM files
  • in series-key order
  • producing compressed and merged blocks of time and value data for each series
  • in ascending time order
  • for all series keys across the set of TSM files

Under certain conditions, as an optimization, blocks may not require decoding and re-encoding. This behavior can be observed for Float values:

for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
// If we this block is already full, just add it as is
if BlockCount(k.blocks[i].b) >= k.size {
k.merged = append(k.merged, k.blocks[i])
} else {
break
}
i++
}

The KeyIterator, defined as:

type KeyIterator interface {
// Next returns true if there are any values remaining in the iterator.
Next() bool
// Read returns the key, time range, and raw data for the next block,
// or any error that occurred.
Read() (key []byte, minTime int64, maxTime int64, data []byte, err error)
// Close closes the iterator.
Close() error
// Err returns any errors encountered during iteration.
Err() error
// EstimatedIndexSize returns the estimated size of the index that would
// be required to store all the series and entries in the KeyIterator.
EstimatedIndexSize() int
}

is an abstraction used by the Compactor to read encoded TSM blocks for each series key. tsmKeyIterator conforms to this interface, such that it can be used by the write function:

for iter.Next() {
c.mu.RLock()
enabled := c.snapshotsEnabled || c.compactionsEnabled
c.mu.RUnlock()
if !enabled {
return errCompactionAborted{}
}
// Each call to read returns the next sorted key (or the prior one if there are
// more values to write). The size of values will be less than or equal to our
// chunk size (1000)
key, minTime, maxTime, block, err := iter.Read()
if err != nil {
return err
}
// Write the key and value
if err := w.WriteBlock(key, minTime, maxTime, block); err == ErrMaxBlocksExceeded {
if err := w.WriteIndex(); err != nil {
return err
}
return err
} else if err != nil {
return err
}
// If we have a max file size configured and we're over it, close out the file
// and return the error.
if w.Size() > maxTSMFileSize {
if err := w.WriteIndex(); err != nil {
return err
}
return errMaxFileExceeded
}
}

to produce a new set of TSM files. Note that iter.Next() is called for each iteration of the loop to move to the next merged block and iter.Read() is called to obtain the data to be encoded. The tsmKeyIterator provides two strategies for compacting data, per the fast option:

// indicates whether the iterator should choose a faster merging strategy over a more
// optimally compressed one. If fast is true, multiple blocks will just be added as is
// and not combined. In some cases, a slower path will need to be utilized even when
// fast is true to prevent overlapping blocks of time for the same key.
// If false, the blocks will be decoded and duplicated (if needed) and
// then chunked into the maximally sized blocks.
fast bool

The focus for the remainder of this document will be to define the requirements to implement a new KeyIterator, which merges a set of TSM files using the existing batch-oriented decoders and new batch-oriented encoders.

Cache Snapshots

The cache (WAL) is periodically snapshotted to generate a level-0 TSM file. A separate investigation is necessary to understand what would be required to utilize batch encoders for this process.

Tasks

Similar to moving the query / read path to batch-oriented APIs, it makes sense to start from the lowest layers and work up to defining a new KeyIterator.

Unit tests and benchmarks

In order to demonstrate correctness and show the scale of the performance improvements, unit tests should be replicated from the existing encoders. A diary of benchmarks was created to keep track of the improvements to the decoders. It would be useful to maintain a similar diary for the encoders.

Add batch encoders

Similarly to the array decoders, new implementations of the encoders should provide a single API for encoding a block of values. These encoders are responsible for encoding a single column of data, such as timestamps, floats or strings.

An example of a batch decoder in tsdb/engine/tsm1/batch_float.go:

func FloatBatchDecodeAll(b []byte, dst []int64) ([]int64, error)

which decodes the byte slice b, using dst (to potentially avoid allocations) and returning the decoded slice or an error if decoding fails. The Float decoder was rewritten from the iterative approach:

type FloatDecoder struct {
val uint64
leading uint64
trailing uint64
br BitReader
b []byte
first bool
finished bool
err error
}

Which required the client call Next() bool to decode the next value and Values() float64 to fetch the value. The improvements resulted in a single loop to decode an entire block of values, allowing a host of compiler optimizations.

A similar API for a batch encoder:

func FloatArrayEncodeAll(src []int, b []byte) ([]byte, error)

which encodes the slice src, using the byte slice b and returns the encoded slice or an error if encoding fails. The definition of the FloatEncoder can be found here:

type FloatEncoder struct {
val float64
err error
leading uint64
trailing uint64
buf bytes.Buffer
bw *bitstream.BitWriter
first bool
finished bool
}

Implement batch encoders

  • IntegerArrayEncodeAll
  • UnsignedArrayEncodeAll
  • FloatArrayEncodeAll
  • StringArrayEncodeAll
  • BooleanArrayEncodeAll
  • TimestampArrayEncodeAll

NOTE: FloatBatchDecodeAll should be renamed to FloatArrayDecodeAll, which follows the Array naming convention established throughout the remainder of the columnar work.

Add batch block encoders

The block encoders are responsible for TSM blocks, encoding a columns of up to 1,000 timestamps and values.

An example of a Float block decoder:

func DecodeFloatArrayBlock(block []byte, a *tsdb.FloatArray) error

decodes the block of time and value data into the FloatArray a using the block decoders. FloatArray, defined as:

type FloatArray struct {
Timestamps []int64
Values []float64
}

maintains the timestamps and values as separate slices. This is knows as a Struct of Arrays layout. FloatArray replaces FloatValues:

type FloatValue struct {
	unixnano int64
	value    float64
}

type FloatValues []FloatValue

which is an array of structs. Struct of Arrays improves cache locality and is better suited to leverage SIMD and is also the layout specified by Apache Arrow.

The tsmKeyIterator uses the FloatValues#Encode API to encode a block of values:

func (a FloatValues) Encode(buf []byte) ([]byte, error) {
return encodeFloatValuesBlock(buf, a)
}

which forwards to encodeFloatValuesBlock:

func encodeFloatValuesBlock(buf []byte, values []FloatValue) ([]byte, error) {
if len(values) == 0 {
return nil, nil
}
venc := getFloatEncoder(len(values))
tsenc := getTimeEncoder(len(values))
var b []byte
err := func() error {
for _, v := range values {
tsenc.Write(v.unixnano)
venc.Write(v.value)
}
venc.Flush()
// Encoded timestamp values
tb, err := tsenc.Bytes()
if err != nil {
return err
}
// Encoded values
vb, err := venc.Bytes()
if err != nil {
return err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
b = packBlock(buf, BlockFloat64, tb, vb)
return nil
}()
putTimeEncoder(tsenc)
putFloatEncoder(venc)
return b, err
}

The existing implementation uses the iterative encoders and therefore requires a new implementation is required to encode a FloatArray, leveraging the batch encoders:

func encodeFloatArrayBlock(a *tsdb.FloatArray, b []byte) ([]byte, error)

where a is encoded using b and the result returned or an error if there was a problem encoding the data.

Implement batch block encoders

  • encodeIntegerArrayBlock
  • encodeUnsignedArrayBlock
  • encodeFloatArrayBlock
  • encodeStringArrayBlock
  • encodeBooleanArrayBlock

Add Encode to FloatArray

Add a FloatValues#Encode method to the the code generation template arrayvalues.gen.go.tmpl

As noted above, the existing FloatValues#Encode method calls the encodeFloatValuesBlock. This task is to add the equivalent FloatArray#Encode methods, calling the new encodeFloatArrayBlock API.

Implement batch block encoders

  • IntegerArray#Encode
  • UnsignedArray#Encode
  • FloatArray#Encode
  • StringArray#Encode
  • BooleanArray#Encode

Implement batch-oriented KeyIterator

This task must implement a new version of the tsmKeyIterator using the batch-oriented types, including FloatArray, DecodeFloatArrayBlock and FloatArray#Encode APIs.

  • tsmKeyIterator replacement

Possible optimizations

It has been observed that large compactions generate a considerable amount of garbage. This is likely due to the merging of blocks:

k.mergedFloatValues = k.mergedFloatValues.Merge(v)

and the later encoding of blocks:

cb, err := FloatValues(values).Encode(nil)

and

cb, err := FloatValues(k.mergedFloatValues).Encode(nil)

It would be worth investigating ways of recycling memory, as this should also improve performance and reduce GC pressure.

@e-dard
Copy link
Contributor

e-dard commented Oct 23, 2018

Fixed via #10300.

@e-dard e-dard closed this as completed Oct 23, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants