Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix large field keys preventing snapshot compactions #8425

Merged
merged 3 commits into from
May 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio
- [#8358](https://github.com/influxdata/influxdb/issues/8358): Small edits to the etc/config.sample.toml file.
- [#8392](https://github.com/influxdata/influxdb/issues/8393): Points beyond retention policy scope are dropped silently
- [#8387](https://github.com/influxdata/influxdb/issues/8387): Fix TSM tmp file leaked on disk
- [#8417](https://github.com/influxdata/influxdb/issues/8417): Fix large field keys preventing snapshot compactions


## v1.2.4 [2017-05-08]
Expand Down
47 changes: 45 additions & 2 deletions models/points.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,19 @@ func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, err
return nil, fmt.Errorf("missing fields")
}

var maxKeyErr error
walkFields(fields, func(k, v []byte) bool {
if sz := seriesKeySize(key, k); sz > MaxKeyLength {
maxKeyErr = fmt.Errorf("max key length exceeded: %v > %v", sz, MaxKeyLength)
return false
}
return true
})

if maxKeyErr != nil {
return nil, maxKeyErr
}

// scan the last block which is an optional integer timestamp
pos, ts, err := scanTime(buf, pos)
if err != nil {
Expand Down Expand Up @@ -1259,13 +1272,22 @@ func pointKey(measurement string, tags Tags, fields Fields, t time.Time) ([]byte
}

key := MakeKey([]byte(measurement), tags)
if len(key) > MaxKeyLength {
return nil, fmt.Errorf("max key length exceeded: %v > %v", len(key), MaxKeyLength)
for field := range fields {
sz := seriesKeySize(key, []byte(field))
if sz > MaxKeyLength {
return nil, fmt.Errorf("max key length exceeded: %v > %v", sz, MaxKeyLength)
}
}

return key, nil
}

func seriesKeySize(key, field []byte) int {
// 4 is the length of the tsm1.fieldKeySeparator constant. It's inlined here to avoid a circular
// dependency.
return len(key) + 4 + len(field)
}

// NewPointFromBytes returns a new Point from a marshalled Point.
func NewPointFromBytes(b []byte) (Point, error) {
p := &point{}
Expand Down Expand Up @@ -1421,6 +1443,27 @@ func walkTags(buf []byte, fn func(key, value []byte) bool) {
}
}

// walkFields walks each field key and value via fn. If fn returns false, the iteration
// is stopped. The values are the raw byte slices and not the converted types.
func walkFields(buf []byte, fn func(key, value []byte) bool) {
var i int
var key, val []byte
for len(buf) > 0 {
i, key = scanTo(buf, 0, '=')
buf = buf[i+1:]
i, val = scanFieldValue(buf, 0)
buf = buf[i:]
if !fn(key, val) {
break
}

// slice off comma
if len(buf) > 0 {
buf = buf[1:]
}
}
}

func parseTags(buf []byte) Tags {
if len(buf) == 0 {
return nil
Expand Down
19 changes: 16 additions & 3 deletions models/points_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2062,17 +2062,30 @@ func TestNewPointsRejectsEmptyFieldNames(t *testing.T) {

func TestNewPointsRejectsMaxKey(t *testing.T) {
var key string
for i := 0; i < 65536; i++ {
// tsm field key is point key, separator (4 bytes) and field
for i := 0; i < models.MaxKeyLength-len("value")-4; i++ {
key += "a"
}

if _, err := models.NewPoint(key, nil, models.Fields{"value": 1}, time.Now()); err == nil {
// Test max key len
if _, err := models.NewPoint(key, nil, models.Fields{"value": 1, "ok": 2.0}, time.Now()); err != nil {
t.Fatalf("new point with max key. got: %v, expected: nil", err)
}

if _, err := models.ParsePointsString(fmt.Sprintf("%v value=1,ok=2.0", key)); err != nil {
t.Fatalf("parse point with max key. got: %v, expected: nil", err)
}

// Test 1 byte over max key len
key += "a"
if _, err := models.NewPoint(key, nil, models.Fields{"value": 1, "ok": 2.0}, time.Now()); err == nil {
t.Fatalf("new point with max key. got: nil, expected: error")
}

if _, err := models.ParsePointsString(fmt.Sprintf("%v value=1", key)); err == nil {
if _, err := models.ParsePointsString(fmt.Sprintf("%v value=1,ok=2.0", key)); err == nil {
t.Fatalf("parse point with max key. got: nil, expected: error")
}

}

func TestParseKeyEmpty(t *testing.T) {
Expand Down
12 changes: 8 additions & 4 deletions tsdb/engine/tsm1/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,15 +455,15 @@ func (t *tsmWriter) writeHeader() error {

// Write writes a new block containing key and values.
func (t *tsmWriter) Write(key string, values Values) error {
if len(key) > maxKeyLength {
return ErrMaxKeyLengthExceeded
}

// Nothing to write
if len(values) == 0 {
return nil
}

if len(key) > maxKeyLength {
return ErrMaxKeyLengthExceeded
}

// Write header only after we have some data to write.
if t.n == 0 {
if err := t.writeHeader(); err != nil {
Expand Down Expand Up @@ -507,6 +507,10 @@ func (t *tsmWriter) Write(key string, values Values) error {
// exceeds max entries for a given key, ErrMaxBlocksExceeded is returned. This indicates
// that the index is now full for this key and no future writes to this key will succeed.
func (t *tsmWriter) WriteBlock(key string, minTime, maxTime int64, block []byte) error {
if len(key) > maxKeyLength {
return ErrMaxKeyLengthExceeded
}

// Nothing to write
if len(block) == 0 {
return nil
Expand Down
20 changes: 20 additions & 0 deletions tsdb/engine/tsm1/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,26 @@ func TestTSMWriter_WriteBlock_Multiple(t *testing.T) {
}
}

func TestTSMWriter_WriteBlock_MaxKey(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)

w, err := tsm1.NewTSMWriter(f)
if err != nil {
t.Fatalf("unexpected error creating writer: %v", err)
}

var key string
for i := 0; i < 100000; i++ {
key += "a"
}

if err := w.WriteBlock(key, 0, 0, nil); err != tsm1.ErrMaxKeyLengthExceeded {
t.Fatalf("expected max key length error writing key: %v", err)
}
}

func TestTSMWriter_Write_MaxKey(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
Expand Down