Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix panic: interface conversion: tsm1.Value is *tsm1.FloatValue, not *tsm1.StringValue #7533

Merged
merged 3 commits into from
Oct 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 80 additions & 15 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
)

var (
Expand All @@ -34,24 +35,35 @@ func newEntry() *entry {
}
}

// newEntryValues returns a new instance of entry with the given values
func newEntryValues(values []Value) *entry {
// newEntryValues returns a new instance of entry with the given values. If the
// values are not valid, an error is returned.
func newEntryValues(values []Value) (*entry, error) {
e := &entry{values: values}

// No values, don't check types and ordering
if len(values) == 0 {
return e, nil
}

var prevTime int64
et := valueType(values[0])
for _, v := range values {
if v.UnixNano() <= prevTime {
e.needSort = true
break
}
prevTime = v.UnixNano()

// Make sure all the values are the same type
if et != valueType(v) {
return nil, tsdb.ErrFieldTypeConflict
}
}

return e
return e, nil
}

// add adds the given values to the entry.
func (e *entry) add(values []Value) {
func (e *entry) add(values []Value) error {
// See if the new values are sorted or contain duplicate timestamps
var (
prevTime int64
Expand All @@ -75,6 +87,14 @@ func (e *entry) add(values []Value) {
if len(e.values) == 0 {
e.values = values
} else {
// Make sure the new values are the same type as the exiting values
et := valueType(e.values[0])
for _, v := range values {
if et != valueType(v) {
e.mu.Unlock()
return tsdb.ErrFieldTypeConflict
}
}
l := len(e.values)
lastValTime := e.values[l-1].UnixNano()
if lastValTime >= values[0].UnixNano() {
Expand All @@ -83,6 +103,7 @@ func (e *entry) add(values []Value) {
e.values = append(e.values, values...)
}
e.mu.Unlock()
return nil
}

// deduplicate sorts and orders the entry's values. If values are already deduped and
Expand Down Expand Up @@ -135,8 +156,9 @@ const (
statCachedBytes = "cachedBytes" // counter: Total number of bytes written into snapshots.
statWALCompactionTimeMs = "WALCompactionTimeMs" // counter: Total number of milliseconds spent compacting snapshots

writeOK = "writeOk"
writeErr = "writeErr"
statCacheWriteOK = "writeOk"
statCacheWriteErr = "writeErr"
statCacheWriteDropped = "writeDropped"
)

// Cache maintains an in-memory store of Values for a set of keys.
Expand Down Expand Up @@ -188,6 +210,7 @@ type CacheStatistics struct {
WALCompactionTimeMs int64
WriteOK int64
WriteErr int64
WriteDropped int64
}

// Statistics returns statistics for periodic monitoring.
Expand All @@ -202,6 +225,9 @@ func (c *Cache) Statistics(tags map[string]string) []models.Statistic {
statCacheAgeMs: atomic.LoadInt64(&c.stats.CacheAgeMs),
statCachedBytes: atomic.LoadInt64(&c.stats.CachedBytes),
statWALCompactionTimeMs: atomic.LoadInt64(&c.stats.WALCompactionTimeMs),
statCacheWriteOK: atomic.LoadInt64(&c.stats.WriteOK),
statCacheWriteErr: atomic.LoadInt64(&c.stats.WriteErr),
statCacheWriteDropped: atomic.LoadInt64(&c.stats.WriteDropped),
},
}}
}
Expand All @@ -219,7 +245,11 @@ func (c *Cache) Write(key string, values []Value) error {
return ErrCacheMemoryExceeded
}

c.write(key, values)
if err := c.write(key, values); err != nil {
c.mu.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update c.stats.WriteErr or maybe add a new stat for this unusual condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Just discovered that the existing OK and Err stats are tracked, but not actually recorded.

atomic.AddInt64(&c.stats.WriteErr, 1)
return err
}
c.size += addedSize
c.mu.Unlock()

Expand All @@ -231,7 +261,9 @@ func (c *Cache) Write(key string, values []Value) error {
}

// WriteMulti writes the map of keys and associated values to the cache. This function is goroutine-safe.
// It returns an error if the cache will exceeded its max size by adding the new values.
// It returns an error if the cache will exceeded its max size by adding the new values. The write attempts
// to write as many values as possible. If one key fails, the others can still succeed and an error will
// be returned.
func (c *Cache) WriteMulti(values map[string][]Value) error {
var totalSz uint64
for _, v := range values {
Expand All @@ -246,17 +278,30 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
return ErrCacheMemoryExceeded
}

var werr error
for k, v := range values {
c.write(k, v)
if err := c.write(k, v); err != nil {
// write failed, hold onto the error and adjust
// the size delta
werr = err
totalSz -= uint64(Values(v).Size())
}
}
c.size += totalSz
c.mu.Unlock()

// Some points in the batch were dropped. An error is returned so
// error stat is incremented as well.
if werr != nil {
atomic.AddInt64(&c.stats.WriteDropped, 1)
atomic.AddInt64(&c.stats.WriteErr, 1)
}

// Update the memory size stat
c.updateMemSize(int64(totalSz))
atomic.AddInt64(&c.stats.WriteOK, 1)

return nil
return werr
}

// Snapshot will take a snapshot of the current cache, add it to the slice of caches that
Expand Down Expand Up @@ -491,13 +536,18 @@ func (c *Cache) values(key string) Values {

// write writes the set of values for the key to the cache. This function assumes
// the lock has been taken and does not enforce the cache size limits.
func (c *Cache) write(key string, values []Value) {
func (c *Cache) write(key string, values []Value) error {
e, ok := c.store[key]
if !ok {
c.store[key] = newEntryValues(values)
return
var err error
e, err = newEntryValues(values)
if err != nil {
return err
}
c.store[key] = e
return nil
}
e.add(values)
return e.add(values)
}

func (c *Cache) entry(key string) *entry {
Expand Down Expand Up @@ -624,6 +674,21 @@ func (c *Cache) updateMemSize(b int64) {
atomic.AddInt64(&c.stats.MemSizeBytes, b)
}

func valueType(v Value) int {
switch v.(type) {
case *FloatValue:
return 1
case *IntegerValue:
return 2
case *StringValue:
return 3
case *BooleanValue:
return 4
default:
return 0
}
}

// Update the snapshotsCount and the diskSize levels
func (c *Cache) updateSnapshots() {
// Update disk stats
Expand Down
43 changes: 43 additions & 0 deletions tsdb/engine/tsm1/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,27 @@ func TestCache_CacheWrite(t *testing.T) {
}
}

func TestCache_CacheWrite_TypeConflict(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, int(64))
values := Values{v0, v1}
valuesSize := v0.Size() + v1.Size()

c := NewCache(uint64(2*valuesSize), "")

if err := c.Write("foo", values[:1]); err != nil {
t.Fatalf("failed to write key foo to cache: %s", err.Error())
}

if err := c.Write("foo", values[1:]); err == nil {
t.Fatalf("expected field type conflict")
}

if exp, got := uint64(v0.Size()), c.Size(); exp != got {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got)
}
}

func TestCache_CacheWriteMulti(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, 2.0)
Expand All @@ -77,6 +98,28 @@ func TestCache_CacheWriteMulti(t *testing.T) {
}
}

func TestCache_CacheWriteMulti_TypeConflict(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, 2.0)
v2 := NewValue(3, int64(3))
values := Values{v0, v1, v2}
valuesSize := uint64(v0.Size() + v1.Size() + v2.Size())

c := NewCache(3*valuesSize, "")

if err := c.WriteMulti(map[string][]Value{"foo": values[:1], "bar": values[1:]}); err == nil {
t.Fatalf(" expected field type conflict")
}

if exp, got := uint64(v0.Size()), c.Size(); exp != got {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got)
}

if exp, keys := []string{"foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) {
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
}
}

func TestCache_Cache_DeleteRange(t *testing.T) {
v0 := NewValue(1, 1.0)
v1 := NewValue(2, 2.0)
Expand Down
6 changes: 3 additions & 3 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (f *FileStore) Add(files ...TSMFile) {
for _, file := range files {
atomic.AddInt64(&f.stats.DiskBytes, int64(file.Size()))
}
f.lastFileStats = f.lastFileStats[:0] // Will need to be recalculated on next call to Stats.
f.lastFileStats = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh, I guess we were trying to be too clever for our own good making that change 😄

f.files = append(f.files, files...)
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
Expand Down Expand Up @@ -289,7 +289,7 @@ func (f *FileStore) Remove(paths ...string) {
atomic.AddInt64(&f.stats.DiskBytes, -int64(file.Size()))
}
}
f.lastFileStats = f.lastFileStats[:0] // Will need to be recalculated on next call to Stats.
f.lastFileStats = nil
f.files = active
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
Expand Down Expand Up @@ -618,7 +618,7 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
// Tell the purger about our in-use files we need to remove
f.purger.add(inuse)

f.lastFileStats = f.lastFileStats[:0] // Will need to be recalculated on next call to Stats.
f.lastFileStats = nil
f.files = active
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
Expand Down
75 changes: 75 additions & 0 deletions tsdb/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"reflect"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -361,6 +362,80 @@ func TestShardWriteAddNewField(t *testing.T) {
}
}

// Tests concurrently writing to the same shard with different field types which
// can trigger a panic when the shard is snapshotted to TSM files.
func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
if testing.Short() {
t.Skip()
}
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")

index := tsdb.NewDatabaseIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")

sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
defer sh.Close()

points := make([]models.Point, 0, 1000)
for i := 0; i < cap(points); i++ {
if i < 500 {
points = append(points, models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(int64(i), 0),
))
} else {
points = append(points, models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": int64(1)},
time.Unix(int64(i), 0),
))
}
}

var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 50; i++ {
if err := sh.DeleteMeasurement("cpu", []string{"cpu,host=server"}); err != nil {
t.Fatalf(err.Error())
}

_ = sh.WritePoints(points[:500])
if f, err := sh.CreateSnapshot(); err == nil {
os.RemoveAll(f)
}

}
}()

go func() {
defer wg.Done()
for i := 0; i < 50; i++ {
if err := sh.DeleteMeasurement("cpu", []string{"cpu,host=server"}); err != nil {
t.Fatalf(err.Error())
}

_ = sh.WritePoints(points[500:])
if f, err := sh.CreateSnapshot(); err == nil {
os.RemoveAll(f)
}
}
}()

wg.Wait()
}

// Ensures that when a shard is closed, it removes any series meta-data
// from the index.
func TestShard_Close_RemoveIndex(t *testing.T) {
Expand Down