diff --git a/CHANGELOG.md b/CHANGELOG.md index d1576c93296..166bbec60ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,7 @@ - [#5854](https://github.com/influxdata/influxdb/issues/5854): failures of tests in tsdb/engine/tsm1 when compiled with go master - [#5610](https://github.com/influxdata/influxdb/issues/5610): Write into fully-replicated cluster is not replicated across all shards - [#5880](https://github.com/influxdata/influxdb/issues/5880): TCP connection closed after write (regression/change from 0.9.6) +- [#5865](https://github.com/influxdata/influxdb/issues/5865): Conversion to tsm fails with exceeds max index value ## v0.10.1 [2016-02-18] diff --git a/cmd/influx_tsm/converter.go b/cmd/influx_tsm/converter.go index f2898272961..3469af62b14 100644 --- a/cmd/influx_tsm/converter.go +++ b/cmd/influx_tsm/converter.go @@ -9,6 +9,10 @@ import ( "github.com/influxdata/influxdb/tsdb/engine/tsm1" ) +const ( + maxBlocksPerKey = 65535 +) + // KeyIterator is used to iterate over b* keys for conversion to tsm keys type KeyIterator interface { Next() bool @@ -41,6 +45,8 @@ func (c *Converter) Process(iter KeyIterator) error { // Iterate until no more data remains. var w tsm1.TSMWriter + var keyCount map[string]int + for iter.Next() { k, v, err := iter.Read() if err != nil { @@ -52,16 +58,18 @@ func (c *Converter) Process(iter KeyIterator) error { if err != nil { return err } + keyCount = map[string]int{} } if err := w.Write(k, v); err != nil { return err } + keyCount[k]++ c.stats.AddPointsRead(len(v)) c.stats.AddPointsWritten(len(v)) // If we have a max file size configured and we're over it, start a new TSM file. - if w.Size() > c.maxTSMFileSize { + if w.Size() > c.maxTSMFileSize || keyCount[k] == maxBlocksPerKey { if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues { return err } diff --git a/cmd/influx_tsm/tsdb/codec.go b/cmd/influx_tsm/tsdb/codec.go index 9590bb29757..760a33ff72a 100644 --- a/cmd/influx_tsm/tsdb/codec.go +++ b/cmd/influx_tsm/tsdb/codec.go @@ -5,12 +5,17 @@ import ( "errors" "fmt" "math" - - "github.com/influxdata/influxdb/influxql" ) const maxStringLength = 64 * 1024 +const ( + fieldFloat = 1 + fieldInteger = 2 + fieldBoolean = 3 + fieldString = 4 +) + var ( // ErrFieldNotFound is returned when a field cannot be found. ErrFieldNotFound = errors.New("field not found") @@ -71,14 +76,13 @@ func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error) { } switch field.Type { - case influxql.Float: - // Move bytes forward. + case fieldFloat: return math.Float64frombits(binary.BigEndian.Uint64(b[1:9])), nil - case influxql.Integer: + case fieldInteger: return int64(binary.BigEndian.Uint64(b[1:9])), nil - case influxql.Boolean: + case fieldBoolean: return b[1] == 1, nil - case influxql.String: + case fieldString: return string(b[3 : 3+binary.BigEndian.Uint16(b[1:3])]), nil default: panic(fmt.Sprintf("unsupported value type during decode by id: %T", field.Type))