Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track stats for number of series, measurements #5816

Merged
merged 1 commit into from
Feb 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion services/copier/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func MustOpenShard(id uint64) *Shard {

sh := &Shard{
Shard: tsdb.NewShard(id,
tsdb.NewDatabaseIndex(),
tsdb.NewDatabaseIndex("db"),
filepath.Join(path, "data"),
filepath.Join(path, "wal"),
tsdb.NewEngineOptions(),
Expand Down
8 changes: 4 additions & 4 deletions tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
}

// Load metadata index.
index := tsdb.NewDatabaseIndex()
index := tsdb.NewDatabaseIndex("db")
if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil {
t.Fatal(err)
}
Expand All @@ -56,7 +56,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
}

// Load metadata index.
index = tsdb.NewDatabaseIndex()
index = tsdb.NewDatabaseIndex("db")
if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil {
t.Fatal(err)
}
Expand All @@ -81,7 +81,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
}

// Load metadata index.
index = tsdb.NewDatabaseIndex()
index = tsdb.NewDatabaseIndex("db")
if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -521,7 +521,7 @@ func MustOpenEngine() *Engine {
if err := e.Open(); err != nil {
panic(err)
}
if err := e.LoadMetadataIndex(nil, tsdb.NewDatabaseIndex(), make(map[string]*tsdb.MeasurementFields)); err != nil {
if err := e.LoadMetadataIndex(nil, tsdb.NewDatabaseIndex("db"), make(map[string]*tsdb.MeasurementFields)); err != nil {
panic(err)
}
return e
Expand Down
47 changes: 46 additions & 1 deletion tsdb/meta.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package tsdb

import (
"expvar"
"fmt"
"regexp"
"sort"
"strings"
"sync"
"time"

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/pkg/escape"
"github.com/influxdata/influxdb/tsdb/internal"
Expand All @@ -19,6 +21,9 @@ import (

const (
maxStringLength = 64 * 1024

statDatabaseSeries = "numSeries" // number of series in this database
statDatabaseMeasurements = "numMeasurements" // number of measurements in this database
)

// DatabaseIndex is the in memory index of a collection of measurements, time series, and their tags.
Expand All @@ -29,13 +34,19 @@ type DatabaseIndex struct {
measurements map[string]*Measurement // measurement name to object and index
series map[string]*Series // map series key to the Series object
lastID uint64 // last used series ID. They're in memory only for this shard

name string // name of the database represented by this index

statMap *expvar.Map
}

// NewDatabaseIndex returns a new initialized DatabaseIndex.
func NewDatabaseIndex() *DatabaseIndex {
func NewDatabaseIndex(name string) *DatabaseIndex {
return &DatabaseIndex{
measurements: make(map[string]*Measurement),
series: make(map[string]*Series),
name: name,
statMap: influxdb.NewStatistics("database:"+name, "database", map[string]string{"database": name}),
}
}

Expand Down Expand Up @@ -103,6 +114,8 @@ func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, ser

m.AddSeries(series)

d.statMap.Add(statDatabaseSeries, 1)

return series
}

Expand All @@ -113,6 +126,7 @@ func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurem
if m == nil {
m = NewMeasurement(name, d)
d.measurements[name] = m
d.statMap.Add(statDatabaseMeasurements, 1)
}
return m
}
Expand Down Expand Up @@ -311,22 +325,36 @@ func (d *DatabaseIndex) DropMeasurement(name string) {
for _, s := range m.seriesByID {
delete(d.series, s.Key)
}

m.drop()

d.statMap.Add(statDatabaseSeries, int64(-len(m.seriesByID)))
d.statMap.Add(statDatabaseMeasurements, -1)
}

// DropSeries removes the series keys and their tags from the index
func (d *DatabaseIndex) DropSeries(keys []string) {
d.mu.Lock()
defer d.mu.Unlock()

var nDeleted int64
for _, k := range keys {
series := d.series[k]
if series == nil {
continue
}
series.measurement.DropSeries(series.id)
delete(d.series, k)
nDeleted++
}

d.statMap.Add(statDatabaseSeries, -nDeleted)
}

const (
statMeasurementSeries = "numSeries" // number of series contained in this measurement
)

// Measurement represents a collection of time series in a database. It also contains in memory
// structures for indexing tags. Exported functions are goroutine safe while un-exported functions
// assume the caller will use the appropriate locks
Expand All @@ -341,6 +369,8 @@ type Measurement struct {
measurement *Measurement
seriesByTagKeyValue map[string]map[string]SeriesIDs // map from tag key to value to sorted set of series ids
seriesIDs SeriesIDs // sorted list of series IDs in this measurement

statMap *expvar.Map
}

// NewMeasurement allocates and initializes a new Measurement.
Expand All @@ -353,6 +383,12 @@ func NewMeasurement(name string, idx *DatabaseIndex) *Measurement {
seriesByID: make(map[uint64]*Series),
seriesByTagKeyValue: make(map[string]map[string]SeriesIDs),
seriesIDs: make(SeriesIDs, 0),

statMap: influxdb.NewStatistics(
fmt.Sprintf("measurement:%s.%s", name, idx.name),
"measurement",
map[string]string{"database": idx.name, "measurement": name},
),
}
}

Expand Down Expand Up @@ -445,6 +481,7 @@ func (m *Measurement) AddSeries(s *Series) bool {
valueMap[v] = ids
}

m.statMap.Add(statMeasurementSeries, 1)
return true
}

Expand Down Expand Up @@ -492,9 +529,17 @@ func (m *Measurement) DropSeries(seriesID uint64) {
}
}

m.statMap.Add(statMeasurementSeries, -1)

return
}

// drop handles any cleanup for when a measurement is dropped.
// Currently only cleans up stats.
func (m *Measurement) drop() {
m.statMap.Add(statMeasurementSeries, int64(-len(m.seriesIDs)))
}

// filters walks the where clause of a select statement and returns a map with all series ids
// matching the where clause and any filter expression that should be applied to each
func (m *Measurement) filters(condition influxql.Expr) (map[uint64]influxql.Expr, error) {
Expand Down
2 changes: 1 addition & 1 deletion tsdb/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func BenchmarkCreateSeriesIndex_1M(b *testing.B) {
func benchmarkCreateSeriesIndex(b *testing.B, series []*TestSeries) {
idxs := make([]*tsdb.DatabaseIndex, 0, b.N)
for i := 0; i < b.N; i++ {
idxs = append(idxs, tsdb.NewDatabaseIndex())
idxs = append(idxs, tsdb.NewDatabaseIndex(fmt.Sprintf("db%d", i)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could significantly affect the benchmark because you will get a new allocation for the stat map with a different database name won't you?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a call to b.ResetTimer a couple lines down, so the allocation overhead shouldn't affect this benchmark run.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah right yeah. GH diff made it look like that was the benchmark 😄

}

b.ResetTimer()
Expand Down
12 changes: 6 additions & 6 deletions tsdb/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestShardWriteAndIndex(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")

index := tsdb.NewDatabaseIndex()
index := tsdb.NewDatabaseIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")

Expand Down Expand Up @@ -75,7 +75,7 @@ func TestShardWriteAndIndex(t *testing.T) {
// ensure the index gets loaded after closing and opening the shard
sh.Close()

index = tsdb.NewDatabaseIndex()
index = tsdb.NewDatabaseIndex("db")
sh = tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error openeing shard: %s", err.Error())
Expand All @@ -99,7 +99,7 @@ func TestShardWriteAddNewField(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")

index := tsdb.NewDatabaseIndex()
index := tsdb.NewDatabaseIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")

Expand Down Expand Up @@ -239,7 +239,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) {
// Generate test series (measurements + unique tag sets).
series := genTestSeries(mCnt, tkCnt, tvCnt)
// Create index for the shard to use.
index := tsdb.NewDatabaseIndex()
index := tsdb.NewDatabaseIndex("db")
// Generate point data to write to the shard.
points := []models.Point{}
for _, s := range series {
Expand Down Expand Up @@ -280,7 +280,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt
// Generate test series (measurements + unique tag sets).
series := genTestSeries(mCnt, tkCnt, tvCnt)
// Create index for the shard to use.
index := tsdb.NewDatabaseIndex()
index := tsdb.NewDatabaseIndex("db")
// Generate point data to write to the shard.
points := []models.Point{}
for _, s := range series {
Expand Down Expand Up @@ -355,7 +355,7 @@ func NewShard() *Shard {

return &Shard{
Shard: tsdb.NewShard(0,
tsdb.NewDatabaseIndex(),
tsdb.NewDatabaseIndex("db"),
filepath.Join(path, "data"),
filepath.Join(path, "wal"),
opt,
Expand Down
4 changes: 2 additions & 2 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *Store) loadIndexes() error {
s.Logger.Printf("Skipping database dir: %s. Not a directory", db.Name())
continue
}
s.databaseIndexes[db.Name()] = NewDatabaseIndex()
s.databaseIndexes[db.Name()] = NewDatabaseIndex(db.Name())
}
return nil
}
Expand Down Expand Up @@ -252,7 +252,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
// create the database index if it does not exist
db, ok := s.databaseIndexes[database]
if !ok {
db = NewDatabaseIndex()
db = NewDatabaseIndex(database)
s.databaseIndexes[database] = db
}

Expand Down