Skip to content

Commit

Permalink
Merge pull request #9822 from influxdata/jm-bytes
Browse files Browse the repository at this point in the history
Implement SHOW STATS FOR 'indexes'
  • Loading branch information
Jacob Marble authored May 10, 2018
2 parents de58584 + 3cfbc33 commit f1b656c
Show file tree
Hide file tree
Showing 12 changed files with 74 additions and 33 deletions.
2 changes: 1 addition & 1 deletion cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
s.QueryExecutor.StatementExecutor = &coordinator.StatementExecutor{
MetaClient: s.MetaClient,
TaskManager: s.QueryExecutor.TaskManager,
TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore},
TSDBStore: s.TSDBStore,
ShardMapper: &coordinator.LocalShardMapper{
MetaClient: s.MetaClient,
TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore},
Expand Down
44 changes: 29 additions & 15 deletions coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,25 +868,39 @@ func (e *StatementExecutor) executeShowShardGroupsStatement(stmt *influxql.ShowS
}

func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsStatement) (models.Rows, error) {
stats, err := e.Monitor.Statistics(nil)
if err != nil {
return nil, err
}

var rows []*models.Row
for _, stat := range stats {
if stmt.Module != "" && stat.Name != stmt.Module {
continue
}
row := &models.Row{Name: stat.Name, Tags: stat.Tags}

values := make([]interface{}, 0, len(stat.Values))
for _, k := range stat.ValueNames() {
row.Columns = append(row.Columns, k)
values = append(values, stat.Values[k])
if _, ok := e.TSDBStore.(*tsdb.Store); stmt.Module == "indexes" && ok {
// The cost of collecting indexes metrics grows with the size of the indexes, so only collect this
// stat when explicitly requested.
b := e.TSDBStore.(*tsdb.Store).IndexBytes()
row := &models.Row{
Name: "indexes",
Columns: []string{"memoryBytes"},
Values: [][]interface{}{{b}},
}
row.Values = [][]interface{}{values}
rows = append(rows, row)

} else {
stats, err := e.Monitor.Statistics(nil)
if err != nil {
return nil, err
}

for _, stat := range stats {
if stmt.Module != "" && stat.Name != stmt.Module {
continue
}
row := &models.Row{Name: stat.Name, Tags: stat.Tags}

values := make([]interface{}, 0, len(stat.Values))
for _, k := range stat.ValueNames() {
row.Columns = append(row.Columns, k)
values = append(values, stat.Values[k])
}
row.Values = [][]interface{}{values}
rows = append(rows, row)
}
}
return rows, nil
}
Expand Down
1 change: 1 addition & 0 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type Engine interface {
DiskSize() int64
IsIdle() bool
Free() error
IndexBytes() (int, uintptr)

io.WriterTo
}
Expand Down
4 changes: 4 additions & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
return e
}

func (e *Engine) IndexBytes() (int, uintptr) {
return e.index.Bytes()
}

// Digest returns a reader for the shard's digest.
func (e *Engine) Digest() (io.ReadCloser, int64, error) {
digestPath := filepath.Join(e.path, "digest.tsd")
Expand Down
5 changes: 3 additions & 2 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ type Index interface {
// Size of the index on disk, if applicable.
DiskSizeBytes() int64

// Bytes estimates the memory footprint of this Index, in bytes.
Bytes() int
// Bytes estimates the memory footprint of this Index, in bytes,
// and a unique reference ID to the Index instance.
Bytes() (int, uintptr)

// To be removed w/ tsi1.
SetFieldName(measurement []byte, name string)
Expand Down
4 changes: 2 additions & 2 deletions tsdb/index/inmem/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func NewIndex(database string, sfile *tsdb.SeriesFile) *Index {
}

// Bytes estimates the memory footprint of this Index, in bytes.
func (i *Index) Bytes() int {
func (i *Index) Bytes() (int, uintptr) {
var b int
i.mu.RLock()
b += 24 // mu RWMutex is 24 bytes
Expand All @@ -106,7 +106,7 @@ func (i *Index) Bytes() int {
b += int(unsafe.Sizeof(i.measurementsTSSketch)) + i.measurementsTSSketch.Bytes()
b += 8 // rebuildQueue Mutex is 8 bytes
i.mu.RUnlock()
return b
return b, uintptr(unsafe.Pointer(i))
}

func (i *Index) Type() string { return IndexName }
Expand Down
7 changes: 4 additions & 3 deletions tsdb/index/inmem/inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestIndex_Bytes(t *testing.T) {
opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)}
si := inmem.NewShardIndex(1, "foo", "bar", tsdb.NewSeriesIDSet(), sfile.SeriesFile, opt).(*inmem.ShardIndex)

indexBaseBytes := si.Bytes()
indexBaseBytes, _ := si.Bytes()

name := []byte("name")
err := si.CreateSeriesIfNotExists(name, name, models.Tags{})
Expand All @@ -109,8 +109,9 @@ func TestIndex_Bytes(t *testing.T) {
t.FailNow()
}

if indexBaseBytes >= si.Bytes() {
t.Errorf("index Bytes(): want >%d, got %d", indexBaseBytes, si.Bytes())
indexNewBytes, _ := si.Bytes()
if indexBaseBytes >= indexNewBytes {
t.Errorf("index Bytes(): want >%d, got %d", indexBaseBytes, indexNewBytes)
}
}

Expand Down
6 changes: 3 additions & 3 deletions tsdb/index/inmem/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (m *measurement) bytes() int {
b += int(unsafe.Sizeof(m.Database)) + len(m.Database)
b += int(unsafe.Sizeof(m.Name)) + len(m.Name)
if m.NameBytes != nil {
b += int(unsafe.Sizeof(m.NameBytes)) + cap(m.NameBytes)
b += int(unsafe.Sizeof(m.NameBytes)) + len(m.NameBytes)
}
b += 24 // 24 bytes for m.mu RWMutex
b += int(unsafe.Sizeof(m.fieldNames))
Expand Down Expand Up @@ -1034,7 +1034,7 @@ func (s *series) bytes() int {
// Do not count s.Measurement to prevent double-counting in Index.Bytes.
b += int(unsafe.Sizeof(s.Key)) + len(s.Key)
for _, tag := range s.Tags {
b += int(unsafe.Sizeof(tag)) + cap(tag.Key) + cap(tag.Value)
b += int(unsafe.Sizeof(tag)) + len(tag.Key) + len(tag.Value)
}
b += int(unsafe.Sizeof(s.Tags))
s.mu.RUnlock()
Expand Down Expand Up @@ -1072,7 +1072,7 @@ func (t *tagKeyValue) bytes() int {
for k, v := range t.entries {
b += int(unsafe.Sizeof(k)) + len(k)
b += len(v.m) * 8 // uint64
b += cap(v.a) * 8 // uint64
b += len(v.a) * 8 // uint64
b += int(unsafe.Sizeof(v) + unsafe.Sizeof(*v))
}
t.mu.RUnlock()
Expand Down
4 changes: 2 additions & 2 deletions tsdb/index/tsi1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func NewIndex(sfile *tsdb.SeriesFile, database string, options ...IndexOption) *
}

// Bytes estimates the memory footprint of this Index, in bytes.
func (i *Index) Bytes() int {
func (i *Index) Bytes() (int, uintptr) {
var b int
i.mu.RLock()
b += 24 // mu RWMutex is 24 bytes
Expand All @@ -147,7 +147,7 @@ func (i *Index) Bytes() int {
b += int(unsafe.Sizeof(i.version))
b += int(unsafe.Sizeof(i.PartitionN))
i.mu.RUnlock()
return b
return b, uintptr(unsafe.Pointer(i))
}

// Database returns the name of the database the index was initialized with.
Expand Down
10 changes: 5 additions & 5 deletions tsdb/index/tsi1/log_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ func (f *LogFile) bytes() int {
// Do not include f.data because it is mmap'd
// TODO(jacobmarble): Uncomment when we are using go >= 1.10.0
//b += int(unsafe.Sizeof(f.w)) + f.w.Size()
b += int(unsafe.Sizeof(f.buf)) + cap(f.buf)
b += int(unsafe.Sizeof(f.keyBuf)) + cap(f.keyBuf)
b += int(unsafe.Sizeof(f.buf)) + len(f.buf)
b += int(unsafe.Sizeof(f.keyBuf)) + len(f.keyBuf)
// Do not count SeriesFile because it belongs to the code that constructed this Index.
b += int(unsafe.Sizeof(f.size))
b += int(unsafe.Sizeof(f.modTime))
Expand Down Expand Up @@ -1183,7 +1183,7 @@ type logMeasurement struct {
// bytes estimates the memory footprint of this logMeasurement, in bytes.
func (mm *logMeasurement) bytes() int {
var b int
b += cap(mm.name)
b += len(mm.name)
for k, v := range mm.tagSet {
b += len(k)
b += v.bytes()
Expand Down Expand Up @@ -1253,7 +1253,7 @@ type logTagKey struct {
// bytes estimates the memory footprint of this logTagKey, in bytes.
func (tk *logTagKey) bytes() int {
var b int
b += cap(tk.name)
b += len(tk.name)
for k, v := range tk.tagValues {
b += len(k)
b += v.bytes()
Expand Down Expand Up @@ -1297,7 +1297,7 @@ type logTagValue struct {
// bytes estimates the memory footprint of this logTagValue, in bytes.
func (tv *logTagValue) bytes() int {
var b int
b += cap(tv.name)
b += len(tv.name)
b += len(tv.series) * 8
b += int(unsafe.Sizeof(*tv))
return b
Expand Down
4 changes: 4 additions & 0 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,10 @@ func (s *Shard) SetCompactionsEnabled(enabled bool) {
engine.SetCompactionsEnabled(enabled)
}

func (s *Shard) IndexBytes() (int, uintptr) {
return s._engine.IndexBytes()
}

// DiskSize returns the size on disk of this shard.
func (s *Shard) DiskSize() (int64, error) {
s.mu.RLock()
Expand Down
16 changes: 16 additions & 0 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,22 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic {
return statistics
}

func (s *Store) IndexBytes() int {
// Get bytes per index.
// inmem indexes are shared among shards in a database, so keep individual values mapped per index.
indexes := map[uintptr]int{}
for _, sh := range s.shards {
b, indexRefID := sh.IndexBytes()
indexes[indexRefID] = b
}

var bytesSum int
for _, b := range indexes {
bytesSum += b
}
return bytesSum
}

// Path returns the store's root path.
func (s *Store) Path() string { return s.path }

Expand Down

0 comments on commit f1b656c

Please sign in to comment.