Skip to content

Commit

Permalink
influx_tsm: Properly limit blocks per key per TSM file
Browse files Browse the repository at this point in the history
This should fix #5865

This commit also removes the dependecy on the influxql package constants
that were used to write b1 and bz1 files and have changed since the
release of 0.10
  • Loading branch information
joelegasse committed Mar 3, 2016
1 parent 77513a5 commit acd2e1e
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
- [#5854](https://github.com/influxdata/influxdb/issues/5854): failures of tests in tsdb/engine/tsm1 when compiled with go master
- [#5610](https://github.com/influxdata/influxdb/issues/5610): Write into fully-replicated cluster is not replicated across all shards
- [#5880](https://github.com/influxdata/influxdb/issues/5880): TCP connection closed after write (regression/change from 0.9.6)
- [#5865](https://github.com/influxdata/influxdb/issues/5865): Conversion to tsm fails with exceeds max index value

## v0.10.1 [2016-02-18]

Expand Down
10 changes: 9 additions & 1 deletion cmd/influx_tsm/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)

const (
maxBlocksPerKey = 65535
)

// KeyIterator is used to iterate over b* keys for conversion to tsm keys
type KeyIterator interface {
Next() bool
Expand Down Expand Up @@ -41,6 +45,8 @@ func (c *Converter) Process(iter KeyIterator) error {

// Iterate until no more data remains.
var w tsm1.TSMWriter
var keyCount map[string]int

for iter.Next() {
k, v, err := iter.Read()
if err != nil {
Expand All @@ -52,16 +58,18 @@ func (c *Converter) Process(iter KeyIterator) error {
if err != nil {
return err
}
keyCount = map[string]int{}
}
if err := w.Write(k, v); err != nil {
return err
}
keyCount[k]++

c.stats.AddPointsRead(len(v))
c.stats.AddPointsWritten(len(v))

// If we have a max file size configured and we're over it, start a new TSM file.
if w.Size() > c.maxTSMFileSize {
if w.Size() > c.maxTSMFileSize || keyCount[k] == maxBlocksPerKey {
if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues {
return err
}
Expand Down
18 changes: 11 additions & 7 deletions cmd/influx_tsm/tsdb/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ import (
"errors"
"fmt"
"math"

"github.com/influxdata/influxdb/influxql"
)

const maxStringLength = 64 * 1024

const (
fieldFloat = 1
fieldInteger = 2
fieldBoolean = 3
fieldString = 4
)

var (
// ErrFieldNotFound is returned when a field cannot be found.
ErrFieldNotFound = errors.New("field not found")
Expand Down Expand Up @@ -71,14 +76,13 @@ func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error) {
}

switch field.Type {
case influxql.Float:
// Move bytes forward.
case fieldFloat:
return math.Float64frombits(binary.BigEndian.Uint64(b[1:9])), nil
case influxql.Integer:
case fieldInteger:
return int64(binary.BigEndian.Uint64(b[1:9])), nil
case influxql.Boolean:
case fieldBoolean:
return b[1] == 1, nil
case influxql.String:
case fieldString:
return string(b[3 : 3+binary.BigEndian.Uint16(b[1:3])]), nil
default:
panic(fmt.Sprintf("unsupported value type during decode by id: %T", field.Type))
Expand Down

0 comments on commit acd2e1e

Please sign in to comment.