Skip to content

Commit

Permalink
Merge pull request #9374 from influxdata/er-shard-seriesn
Browse files Browse the repository at this point in the history
Ensure shard-level  cardinality correct
  • Loading branch information
e-dard authored Jan 29, 2018
2 parents 8a00870 + b19edd5 commit 079fe6e
Showing 4 changed files with 82 additions and 64 deletions.
35 changes: 17 additions & 18 deletions tsdb/index/inmem/inmem.go
Original file line number Diff line number Diff line change
@@ -104,16 +104,6 @@ func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
return i.seriesSketch.Clone(), i.seriesTSSketch.Clone(), nil
}

// SeriesN returns the number of unique non-tombstoned series in the index.
// Since indexes are not shared across shards, the count returned by SeriesN
// cannot be combined with other shards' counts.
func (i *Index) SeriesN() int64 {
i.mu.RLock()
n := int64(len(i.series))
i.mu.RUnlock()
return n
}

// Measurement returns the measurement object from the index by the name
func (i *Index) Measurement(name []byte) (*measurement, error) {
i.mu.RLock()
@@ -1182,24 +1172,33 @@ func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSli
return nil
}

// SeriesN returns the number of unique non-tombstoned series local to this shard.
func (idx *ShardIndex) SeriesN() int64 {
idx.mu.RLock()
defer idx.mu.RUnlock()
return int64(idx.seriesIDSet.Cardinality())
}

// InitializeSeries is called during start-up.
// This works the same as CreateSeriesIfNotExists except it ignore limit errors.
func (i *ShardIndex) InitializeSeries(key, name []byte, tags models.Tags) error {
return i.Index.CreateSeriesListIfNotExists(i.id, i.seriesIDSet, [][]byte{key}, [][]byte{name}, []models.Tags{tags}, &i.opt, true)
func (idx *ShardIndex) InitializeSeries(key, name []byte, tags models.Tags) error {
return idx.Index.CreateSeriesListIfNotExists(idx.id, idx.seriesIDSet, [][]byte{key}, [][]byte{name}, []models.Tags{tags}, &idx.opt, true)
}

func (i *ShardIndex) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
return i.Index.CreateSeriesListIfNotExists(i.id, i.seriesIDSet, [][]byte{key}, [][]byte{name}, []models.Tags{tags}, &i.opt, false)
// CreateSeriesIfNotExists creates the provided series on the index if it is not
// already present.
func (idx *ShardIndex) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error {
return idx.Index.CreateSeriesListIfNotExists(idx.id, idx.seriesIDSet, [][]byte{key}, [][]byte{name}, []models.Tags{tags}, &idx.opt, false)
}

// TagSets returns a list of tag sets based on series filtering.
func (i *ShardIndex) TagSets(name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) {
return i.Index.TagSets(i.id, name, opt)
func (idx *ShardIndex) TagSets(name []byte, opt query.IteratorOptions) ([]*query.TagSet, error) {
return idx.Index.TagSets(idx.id, name, opt)
}

// SeriesIDSet returns the bitset associated with the series ids.
func (i *ShardIndex) SeriesIDSet() *tsdb.SeriesIDSet {
return i.seriesIDSet
func (idx *ShardIndex) SeriesIDSet() *tsdb.SeriesIDSet {
return idx.seriesIDSet
}

// NewShardIndex returns a new index for a shard.
9 changes: 5 additions & 4 deletions tsdb/index/tsi1/index.go
Original file line number Diff line number Diff line change
@@ -610,13 +610,14 @@ func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, erro
return s, ts, nil
}

// SeriesN returns the number of unique non-tombstoned series in the index.
// SeriesN returns the number of unique non-tombstoned series in this index.
//
// Since indexes are not shared across shards, the count returned by SeriesN
// cannot be combined with other shard's results. If you need to count series
// across indexes then use SeriesSketches and merge the results from other
// indexes.
// across indexes then use either the database-wide series file, or merge the
// index-level bitsets or sketches.
func (i *Index) SeriesN() int64 {
return int64(i.sfile.SeriesCount())
return int64(i.SeriesIDSet().Cardinality())
}

// HasTagKey returns true if tag key exists. It returns the first error
67 changes: 25 additions & 42 deletions tsdb/shard_test.go
Original file line number Diff line number Diff line change
@@ -691,10 +691,7 @@ func TestShard_Close_RemoveIndex(t *testing.T) {
func TestShard_CreateIterator_Ascending(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
sfile := MustOpenSeriesFile()
defer sfile.Close()

sh := NewShard(index, sfile.SeriesFile)
sh := NewShard(index)
defer sh.Close()

// Calling CreateIterator when the engine is not open will return
@@ -778,10 +775,7 @@ func TestShard_CreateIterator_Descending(t *testing.T) {
var itr query.Iterator

test := func(index string) {
sfile := MustOpenSeriesFile()
defer sfile.Close()

sh = NewShard(index, sfile.SeriesFile)
sh = NewShard(index)

// Calling CreateIterator when the engine is not open will return
// ErrEngineClosed.
@@ -883,10 +877,7 @@ func TestShard_CreateIterator_Series_Auth(t *testing.T) {
}

test := func(index string, v variant) error {
sfile := MustOpenSeriesFile()
defer sfile.Close()

sh := MustNewOpenShard(index, sfile.SeriesFile)
sh := MustNewOpenShard(index)
defer sh.Close()
sh.MustWritePointsString(`
cpu,host=serverA,region=uswest value=100 0
@@ -1015,10 +1006,7 @@ func TestShard_Disabled_WriteQuery(t *testing.T) {
var sh *Shard

test := func(index string) {
sfile := MustOpenSeriesFile()
defer sfile.Close()

sh = NewShard(index, sfile.SeriesFile)
sh = NewShard(index)
if err := sh.Open(); err != nil {
t.Fatal(err)
}
@@ -1069,10 +1057,7 @@ func TestShard_Disabled_WriteQuery(t *testing.T) {
func TestShard_Closed_Functions(t *testing.T) {
var sh *Shard
test := func(index string) {
sfile := MustOpenSeriesFile()
defer sfile.Close()

sh = NewShard(index, sfile.SeriesFile)
sh = NewShard(index)
if err := sh.Open(); err != nil {
t.Fatal(err)
}
@@ -1108,7 +1093,7 @@ func TestShard_FieldDimensions(t *testing.T) {
defer sfile.Close()

setup := func(index string) {
sh = NewShard(index, sfile.SeriesFile)
sh = NewShard(index)

if err := sh.Open(); err != nil {
t.Fatal(err)
@@ -1223,11 +1208,8 @@ _reserved,region=uswest value="foo" 0
func TestShards_FieldDimensions(t *testing.T) {
var shard1, shard2 *Shard

sfile := MustOpenSeriesFile()
defer sfile.Close()

setup := func(index string) {
shard1 = NewShard(index, sfile.SeriesFile)
shard1 = NewShard(index)
if err := shard1.Open(); err != nil {
t.Fatal(err)
}
@@ -1238,7 +1220,7 @@ cpu,host=serverA,region=uswest value=50,val2=5 10
cpu,host=serverB,region=uswest value=25 0
`)

shard2 = NewShard(index, sfile.SeriesFile)
shard2 = NewShard(index)
if err := shard2.Open(); err != nil {
t.Fatal(err)
}
@@ -1352,10 +1334,7 @@ func TestShards_MapType(t *testing.T) {
var shard1, shard2 *Shard

setup := func(index string) {
sfile := MustOpenSeriesFile()
defer sfile.Close()

shard1 = NewShard(index, sfile.SeriesFile)
shard1 = NewShard(index)
if err := shard1.Open(); err != nil {
t.Fatal(err)
}
@@ -1366,7 +1345,7 @@ cpu,host=serverA,region=uswest value=50,val2=5 10
cpu,host=serverB,region=uswest value=25 0
`)

shard2 = NewShard(index, sfile.SeriesFile)
shard2 = NewShard(index)
if err := shard2.Open(); err != nil {
t.Fatal(err)
}
@@ -1493,11 +1472,8 @@ _reserved,region=uswest value="foo" 0
func TestShards_MeasurementsByRegex(t *testing.T) {
var shard1, shard2 *Shard

sfile := MustOpenSeriesFile()
defer sfile.Close()

setup := func(index string) {
shard1 = NewShard(index, sfile.SeriesFile)
shard1 = NewShard(index)
if err := shard1.Open(); err != nil {
t.Fatal(err)
}
@@ -1508,7 +1484,7 @@ cpu,host=serverA,region=uswest value=50,val2=5 10
cpu,host=serverB,region=uswest value=25 0
`)

shard2 = NewShard(index, sfile.SeriesFile)
shard2 = NewShard(index)
if err := shard2.Open(); err != nil {
t.Fatal(err)
}
@@ -1924,24 +1900,26 @@ func chunkedWrite(shard *tsdb.Shard, points []models.Point) {
// Shard represents a test wrapper for tsdb.Shard.
type Shard struct {
*tsdb.Shard
sfile *tsdb.SeriesFile
sfile *SeriesFile
path string
}

// NewShard returns a new instance of Shard with temp paths.
func NewShard(index string, sfile *tsdb.SeriesFile) *Shard {
func NewShard(index string) *Shard {
// Create temporary path for data and WAL.
dir, err := ioutil.TempDir("", "influxdb-tsdb-")
if err != nil {
panic(err)
}

sfile := MustOpenSeriesFile()

// Build engine options.
opt := tsdb.NewEngineOptions()
opt.IndexVersion = index
opt.Config.WALDir = filepath.Join(dir, "wal")
if index == "inmem" {
opt.InmemIndex = inmem.NewIndex(path.Base(dir), sfile)
opt.InmemIndex = inmem.NewIndex(path.Base(dir), sfile.SeriesFile)
}
// Initialise series id sets. Need to do this as it's normally done at the
// store level.
@@ -1952,7 +1930,7 @@ func NewShard(index string, sfile *tsdb.SeriesFile) *Shard {
Shard: tsdb.NewShard(0,
filepath.Join(dir, "data", "db0", "rp0", "1"),
filepath.Join(dir, "wal", "db0", "rp0", "1"),
sfile,
sfile.SeriesFile,
opt,
),
sfile: sfile,
@@ -1961,8 +1939,8 @@ func NewShard(index string, sfile *tsdb.SeriesFile) *Shard {
}

// MustNewOpenShard creates and opens a shard with the provided index.
func MustNewOpenShard(index string, sfile *tsdb.SeriesFile) *Shard {
sh := NewShard(index, sfile)
func MustNewOpenShard(index string) *Shard {
sh := NewShard(index)
if err := sh.Open(); err != nil {
panic(err)
}
@@ -1971,6 +1949,11 @@ func MustNewOpenShard(index string, sfile *tsdb.SeriesFile) *Shard {

// Close closes the shard and removes all underlying data.
func (sh *Shard) Close() error {
// Will remove temp series file data.
if err := sh.sfile.Close(); err != nil {
return err
}

defer os.RemoveAll(sh.path)
return sh.Shard.Close()
}
35 changes: 35 additions & 0 deletions tsdb/store_test.go
Original file line number Diff line number Diff line change
@@ -571,6 +571,41 @@ func TestStore_BackupRestoreShard(t *testing.T) {
})
}
}
func TestStore_Shard_SeriesN(t *testing.T) {
t.Parallel()

test := func(index string) error {
s := MustOpenStore(index)
defer s.Close()

// Create shard with data.
s.MustCreateShardWithData("db0", "rp0", 1,
`cpu value=1 0`,
`cpu,host=serverA value=2 10`,
)

// Create 2nd shard w/ same measurements.
s.MustCreateShardWithData("db0", "rp0", 2,
`cpu value=1 0`,
`cpu value=2 10`,
)

if got, exp := s.Shard(1).SeriesN(), int64(2); got != exp {
return fmt.Errorf("[shard %d] got series count of %d, but expected %d", 1, got, exp)
} else if got, exp := s.Shard(2).SeriesN(), int64(1); got != exp {
return fmt.Errorf("[shard %d] got series count of %d, but expected %d", 2, got, exp)
}
return nil
}

for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
if err := test(index); err != nil {
t.Error(err)
}
})
}
}

func TestStore_MeasurementNames_Deduplicate(t *testing.T) {
t.Parallel()

0 comments on commit 079fe6e

Please sign in to comment.