diff --git a/CHANGELOG.md b/CHANGELOG.md index ad436ceac3b..559da235c4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ - [#6121](https://github.com/influxdata/influxdb/issues/6121): Fix panic: slice index out of bounds in TSM index - [#6140](https://github.com/influxdata/influxdb/issues/6140): Ensure Shard engine not accessed when closed. - [#6110](https://github.com/influxdata/influxdb/issues/6110): Fix for 0.9 upgrade path when using RPM +- [#6131](https://github.com/influxdata/influxdb/issues/6061): Fix write throughput regression with large number of measurments ## v0.11.0 [2016-03-22] diff --git a/tsdb/meta.go b/tsdb/meta.go index feb10f8ffba..1adb578b9d0 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -158,9 +158,7 @@ func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurem func (d *DatabaseIndex) AssignShard(k string, shardID uint64) { ss := d.Series(k) if ss != nil { - d.mu.Lock() ss.AssignShard(shardID) - d.mu.Unlock() } } @@ -168,6 +166,7 @@ func (d *DatabaseIndex) AssignShard(k string, shardID uint64) { func (d *DatabaseIndex) TagsForSeries(key string) map[string]string { d.mu.RLock() defer d.mu.RUnlock() + ss := d.series[key] if ss == nil { return nil @@ -375,8 +374,6 @@ func (d *DatabaseIndex) DropMeasurement(name string) { delete(d.series, s.Key) } - m.drop() - d.statMap.Add(statDatabaseSeries, int64(-len(m.seriesByID))) d.statMap.Add(statDatabaseMeasurements, -1) } @@ -418,8 +415,6 @@ type Measurement struct { measurement *Measurement seriesByTagKeyValue map[string]map[string]SeriesIDs // map from tag key to value to sorted set of series ids seriesIDs SeriesIDs // sorted list of series IDs in this measurement - - statMap *expvar.Map } // NewMeasurement allocates and initializes a new Measurement. @@ -432,12 +427,6 @@ func NewMeasurement(name string, idx *DatabaseIndex) *Measurement { seriesByID: make(map[uint64]*Series), seriesByTagKeyValue: make(map[string]map[string]SeriesIDs), seriesIDs: make(SeriesIDs, 0), - - statMap: influxdb.NewStatistics( - fmt.Sprintf("measurement:%s.%s", name, idx.name), - "measurement", - map[string]string{"database": idx.name, "measurement": name}, - ), } } @@ -530,7 +519,6 @@ func (m *Measurement) AddSeries(s *Series) bool { valueMap[v] = ids } - m.statMap.Add(statMeasurementSeries, 1) return true } @@ -578,17 +566,9 @@ func (m *Measurement) DropSeries(seriesID uint64) { } } - m.statMap.Add(statMeasurementSeries, -1) - return } -// drop handles any cleanup for when a measurement is dropped. -// Currently only cleans up stats. -func (m *Measurement) drop() { - m.statMap.Add(statMeasurementSeries, int64(-len(m.seriesIDs))) -} - // filters walks the where clause of a select statement and returns a map with all series ids // matching the where clause and any filter expression that should be applied to each func (m *Measurement) filters(condition influxql.Expr) (map[uint64]influxql.Expr, error) { @@ -1311,9 +1291,9 @@ func (a Measurements) union(other Measurements) Measurements { // Series belong to a Measurement and represent unique time series in a database type Series struct { - Key string - Tags map[string]string - + mu sync.RWMutex + Key string + Tags map[string]string id uint64 measurement *Measurement shardIDs map[uint64]bool // shards that have this series defined @@ -1329,11 +1309,16 @@ func NewSeries(key string, tags map[string]string) *Series { } func (s *Series) AssignShard(shardID uint64) { + s.mu.Lock() s.shardIDs[shardID] = true + s.mu.Unlock() } // MarshalBinary encodes the object to a binary format. func (s *Series) MarshalBinary() ([]byte, error) { + s.mu.RLock() + defer s.mu.RUnlock() + var pb internal.Series pb.Key = &s.Key for k, v := range s.Tags { @@ -1346,6 +1331,9 @@ func (s *Series) MarshalBinary() ([]byte, error) { // UnmarshalBinary decodes the object from a binary format. func (s *Series) UnmarshalBinary(buf []byte) error { + s.mu.Lock() + defer s.mu.Unlock() + var pb internal.Series if err := proto.Unmarshal(buf, &pb); err != nil { return err @@ -1360,17 +1348,9 @@ func (s *Series) UnmarshalBinary(buf []byte) error { // InitializeShards initializes the list of shards. func (s *Series) InitializeShards() { + s.mu.Lock() s.shardIDs = make(map[uint64]bool) -} - -// match returns true if all tags match the series' tags. -func (s *Series) match(tags map[string]string) bool { - for k, v := range tags { - if s.Tags[k] != v { - return false - } - } - return true + s.mu.Unlock() } // SeriesIDs is a convenience type for sorting, checking equality, and doing