diff --git a/tsdb/engine.go b/tsdb/engine.go index ad590b12cb3..e1f711d3ae0 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -29,14 +29,15 @@ type Engine interface { Close() error SetLogOutput(io.Writer) - LoadMetadataIndex(shard *Shard, index *DatabaseIndex, measurementFields map[string]*MeasurementFields) error + LoadMetadataIndex(shard *Shard, index *DatabaseIndex) error CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) - WritePoints(points []models.Point, measurementFieldsToSave map[string]*MeasurementFields, seriesToCreate []*SeriesCreate) error + WritePoints(points []models.Point) error DeleteSeries(keys []string) error DeleteMeasurement(name string, seriesKeys []string) error SeriesCount() (n int, err error) + MeasurementFields(measurement string) *MeasurementFields // Format will return the format for the engine Format() EngineFormat diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 0c446a2ff00..e4d8b8cbd29 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -78,8 +78,9 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine } e := &Engine{ - path: path, - logger: log.New(os.Stderr, "[tsm1] ", log.LstdFlags), + path: path, + logger: log.New(os.Stderr, "[tsm1] ", log.LstdFlags), + measurementFields: make(map[string]*tsdb.MeasurementFields), WAL: w, Cache: cache, @@ -110,13 +111,23 @@ func (e *Engine) Index() *tsdb.DatabaseIndex { } // MeasurementFields returns the measurement fields for a measurement. -func (e *Engine) MeasurementFields(name string) *tsdb.MeasurementFields { +func (e *Engine) MeasurementFields(measurement string) *tsdb.MeasurementFields { + e.mu.RLock() + m := e.measurementFields[measurement] + e.mu.RUnlock() + + if m != nil { + return m + } + e.mu.Lock() - defer e.mu.Unlock() - if e.measurementFields[name] == nil { - e.measurementFields[name] = &tsdb.MeasurementFields{Fields: make(map[string]*tsdb.Field)} + m = e.measurementFields[measurement] + if m == nil { + m = tsdb.NewMeasurementFields() + e.measurementFields[measurement] = m } - return e.measurementFields[name] + e.mu.Unlock() + return m } // Format returns the format type of this engine @@ -187,10 +198,9 @@ func (e *Engine) Close() error { func (e *Engine) SetLogOutput(w io.Writer) {} // LoadMetadataIndex loads the shard metadata into memory. -func (e *Engine) LoadMetadataIndex(sh *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error { +func (e *Engine) LoadMetadataIndex(sh *tsdb.Shard, index *tsdb.DatabaseIndex) error { // Save reference to index for iterator creation. e.index = index - e.measurementFields = measurementFields start := time.Now() @@ -200,7 +210,7 @@ func (e *Engine) LoadMetadataIndex(sh *tsdb.Shard, index *tsdb.DatabaseIndex, me return err } - if err := e.addToIndexFromKey(key, fieldType, index, measurementFields); err != nil { + if err := e.addToIndexFromKey(key, fieldType, index); err != nil { return err } return nil @@ -220,7 +230,7 @@ func (e *Engine) LoadMetadataIndex(sh *tsdb.Shard, index *tsdb.DatabaseIndex, me continue } - if err := e.addToIndexFromKey(key, fieldType, index, measurementFields); err != nil { + if err := e.addToIndexFromKey(key, fieldType, index); err != nil { return err } } @@ -297,19 +307,17 @@ func (e *Engine) writeFileToBackup(f FileStat, shardRelativePath string, tw *tar // addToIndexFromKey will pull the measurement name, series key, and field name from a composite key and add it to the // database index and measurement fields -func (e *Engine) addToIndexFromKey(key string, fieldType influxql.DataType, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error { +func (e *Engine) addToIndexFromKey(key string, fieldType influxql.DataType, index *tsdb.DatabaseIndex) error { seriesKey, field := seriesAndFieldFromCompositeKey(key) measurement := tsdb.MeasurementFromSeriesKey(seriesKey) m := index.CreateMeasurementIndexIfNotExists(measurement) m.SetFieldName(field) - mf := measurementFields[measurement] + mf := e.measurementFields[measurement] if mf == nil { - mf = &tsdb.MeasurementFields{ - Fields: map[string]*tsdb.Field{}, - } - measurementFields[measurement] = mf + mf = tsdb.NewMeasurementFields() + e.measurementFields[measurement] = mf } if err := mf.CreateFieldIfNotExists(field, fieldType, false); err != nil { @@ -330,7 +338,7 @@ func (e *Engine) addToIndexFromKey(key string, fieldType influxql.DataType, inde // WritePoints writes metadata and point data into the engine. // Returns an error if new points are added to an existing key. -func (e *Engine) WritePoints(points []models.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { +func (e *Engine) WritePoints(points []models.Point) error { values := map[string][]Value{} for _, p := range points { for k, v := range p.Fields() { @@ -397,6 +405,10 @@ func (e *Engine) DeleteSeries(seriesKeys []string) error { // DeleteMeasurement deletes a measurement and all related series. func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error { + e.mu.Lock() + delete(e.measurementFields, name) + e.mu.Unlock() + return e.DeleteSeries(seriesKeys) } @@ -740,7 +752,7 @@ func (e *Engine) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, return influxql.Unknown } - f := mf.Fields[field] + f := mf.Field(field) if f == nil { return influxql.Unknown } @@ -904,7 +916,7 @@ func (e *Engine) buildCursor(measurement, seriesKey, field string, opt influxql. } // Find individual field. - f := mf.Fields[field] + f := mf.Field(field) if f == nil { return nil } diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index fcd867fdb78..be1ce95cd49 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -34,7 +34,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { // Load metadata index. index := tsdb.NewDatabaseIndex("db") - if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil { + if err := e.LoadMetadataIndex(nil, index); err != nil { t.Fatal(err) } @@ -57,7 +57,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { // Load metadata index. index = tsdb.NewDatabaseIndex("db") - if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil { + if err := e.LoadMetadataIndex(nil, index); err != nil { t.Fatal(err) } @@ -71,7 +71,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { // Write a new point and ensure we can close and load index from TSM and WAL if err := e.WritePoints([]models.Point{ MustParsePointString("cpu,host=B value=1.2 2000000000"), - }, nil, nil); err != nil { + }); err != nil { t.Fatalf("failed to write points: %s", err.Error()) } @@ -82,7 +82,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { // Load metadata index. index = tsdb.NewDatabaseIndex("db") - if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil { + if err := e.LoadMetadataIndex(nil, index); err != nil { t.Fatal(err) } @@ -152,14 +152,14 @@ func TestEngine_Backup(t *testing.T) { t.Fatalf("failed to open tsm1 engine: %s", err.Error()) } - if err := e.WritePoints([]models.Point{p1}, nil, nil); err != nil { + if err := e.WritePoints([]models.Point{p1}); err != nil { t.Fatalf("failed to write points: %s", err.Error()) } if err := e.WriteSnapshot(); err != nil { t.Fatalf("failed to snapshot: %s", err.Error()) } - if err := e.WritePoints([]models.Point{p2}, nil, nil); err != nil { + if err := e.WritePoints([]models.Point{p2}); err != nil { t.Fatalf("failed to write points: %s", err.Error()) } @@ -189,7 +189,7 @@ func TestEngine_Backup(t *testing.T) { // so this test won't work properly unless the file is at least a second past the last one time.Sleep(time.Second) - if err := e.WritePoints([]models.Point{p3}, nil, nil); err != nil { + if err := e.WritePoints([]models.Point{p3}); err != nil { t.Fatalf("failed to write points: %s", err.Error()) } @@ -521,7 +521,7 @@ func MustOpenEngine() *Engine { if err := e.Open(); err != nil { panic(err) } - if err := e.LoadMetadataIndex(nil, tsdb.NewDatabaseIndex("db"), make(map[string]*tsdb.MeasurementFields)); err != nil { + if err := e.LoadMetadataIndex(nil, tsdb.NewDatabaseIndex("db")); err != nil { panic(err) } return e @@ -559,7 +559,7 @@ func (e *Engine) MustWriteSnapshot() { // WritePointsString parses a string buffer and writes the points. func (e *Engine) WritePointsString(buf ...string) error { - return e.WritePoints(MustParsePointsString(strings.Join(buf, "\n")), nil, nil) + return e.WritePoints(MustParsePointsString(strings.Join(buf, "\n"))) } // MustParsePointsString parses points from a string. Panic on error. diff --git a/tsdb/meta.go b/tsdb/meta.go index 1adb578b9d0..c01e1ab3fa6 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -53,8 +53,9 @@ func NewDatabaseIndex(name string) *DatabaseIndex { // Series returns a series by key. func (d *DatabaseIndex) Series(key string) *Series { d.mu.RLock() - defer d.mu.RUnlock() - return d.series[key] + s := d.series[key] + d.mu.RUnlock() + return s } // SeriesN returns the number of series. @@ -146,7 +147,7 @@ func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurem // and acquire the write lock m = d.measurements[name] if m == nil { - m = NewMeasurement(name, d) + m = NewMeasurement(name) d.measurements[name] = m d.statMap.Add(statDatabaseMeasurements, 1) } @@ -293,7 +294,9 @@ func (d *DatabaseIndex) measurementsByTagFilters(filters []*TagFilter) Measureme for _, m := range d.measurements { // Iterate filters seeing if the measurement has a matching tag. for _, f := range filters { + m.mu.RLock() tagVals, ok := m.seriesByTagKeyValue[f.Key] + m.mu.RUnlock() if !ok { continue } @@ -352,10 +355,12 @@ func (d *DatabaseIndex) MeasurementsByRegex(re *regexp.Regexp) Measurements { // Measurements returns a list of all measurements. func (d *DatabaseIndex) Measurements() Measurements { + d.mu.RLock() measurements := make(Measurements, 0, len(d.measurements)) for _, m := range d.measurements { measurements = append(measurements, m) } + d.mu.RUnlock() return measurements } @@ -408,7 +413,6 @@ type Measurement struct { mu sync.RWMutex Name string `json:"name,omitempty"` fieldNames map[string]struct{} - index *DatabaseIndex // in-memory index fields seriesByID map[uint64]*Series // lookup table for series by their id @@ -418,11 +422,10 @@ type Measurement struct { } // NewMeasurement allocates and initializes a new Measurement. -func NewMeasurement(name string, idx *DatabaseIndex) *Measurement { +func NewMeasurement(name string) *Measurement { return &Measurement{ Name: name, fieldNames: make(map[string]struct{}), - index: idx, seriesByID: make(map[uint64]*Series), seriesByTagKeyValue: make(map[string]map[string]SeriesIDs), @@ -433,7 +436,12 @@ func NewMeasurement(name string, idx *DatabaseIndex) *Measurement { // HasField returns true if the measurement has a field by the given name func (m *Measurement) HasField(name string) bool { m.mu.RLock() - defer m.mu.RUnlock() + hasField := m.hasField(name) + m.mu.RUnlock() + return hasField +} + +func (m *Measurement) hasField(name string) bool { _, hasField := m.fieldNames[name] return hasField } @@ -606,8 +614,6 @@ func (m *Measurement) filters(condition influxql.Expr) (map[uint64]influxql.Expr // influx filter expression that goes with the series // TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it. func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) { - m.index.mu.RLock() - defer m.index.mu.RUnlock() m.mu.RLock() defer m.mu.RUnlock() @@ -753,7 +759,7 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex // For fields, return all series IDs from this measurement and return // the expression passed in, as the filter. - if name.Val != "_name" && m.HasField(name.Val) { + if name.Val != "_name" && m.hasField(name.Val) { return m.seriesIDs, n, nil } @@ -1314,6 +1320,13 @@ func (s *Series) AssignShard(shardID uint64) { s.mu.Unlock() } +func (s *Series) Assigned(shardID uint64) bool { + s.mu.RLock() + b := s.shardIDs[shardID] + s.mu.RUnlock() + return b +} + // MarshalBinary encodes the object to a binary format. func (s *Series) MarshalBinary() ([]byte, error) { s.mu.RLock() diff --git a/tsdb/shard.go b/tsdb/shard.go index abcffd564d1..8241fb13a01 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -82,9 +82,8 @@ type Shard struct { options EngineOptions - mu sync.RWMutex - measurementFields map[string]*MeasurementFields // measurement name to their fields - engine Engine + mu sync.RWMutex + engine Engine // expvar-based stats. statMap *expvar.Map @@ -108,12 +107,11 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti statMap := influxdb.NewStatistics(key, "shard", tags) return &Shard{ - index: index, - id: id, - path: path, - walPath: walPath, - options: options, - measurementFields: make(map[string]*MeasurementFields), + index: index, + id: id, + path: path, + walPath: walPath, + options: options, database: db, retentionPolicy: rp, @@ -153,7 +151,7 @@ func (s *Shard) Open() error { } // Load metadata index. - if err := s.engine.LoadMetadataIndex(s, s.index, s.measurementFields); err != nil { + if err := s.engine.LoadMetadataIndex(s, s.index); err != nil { return err } @@ -206,9 +204,7 @@ func (s *Shard) DiskSize() (int64, error) { // TODO: this is temporarily exported to make tx.go work. When the query engine gets refactored // into the tsdb package this should be removed. No one outside tsdb should know the underlying field encoding scheme. func (s *Shard) FieldCodec(measurementName string) *FieldCodec { - s.mu.RLock() - defer s.mu.RUnlock() - m := s.measurementFields[measurementName] + m := s.engine.MeasurementFields(measurementName) if m == nil { return NewFieldCodec(nil) } @@ -232,63 +228,25 @@ func (s *Shard) WritePoints(points []models.Point) error { if s.closed() { return ErrEngineClosed } + + s.mu.RLock() + defer s.mu.RUnlock() + s.statMap.Add(statWriteReq, 1) - seriesToCreate, fieldsToCreate, seriesToAddShardTo, err := s.validateSeriesAndFields(points) + fieldsToCreate, err := s.validateSeriesAndFields(points) if err != nil { return err } - s.statMap.Add(statSeriesCreate, int64(len(seriesToCreate))) s.statMap.Add(statFieldsCreate, int64(len(fieldsToCreate))) - // add any new series to the in-memory index - if len(seriesToCreate) > 0 { - for _, ss := range seriesToCreate { - s.index.CreateSeriesIndexIfNotExists(ss.Measurement, ss.Series) - } - } - - if len(seriesToAddShardTo) > 0 { - for _, k := range seriesToAddShardTo { - s.index.AssignShard(k, s.id) - } - } - // add any new fields and keep track of what needs to be saved - measurementFieldsToSave, err := s.createFieldsAndMeasurements(fieldsToCreate) - if err != nil { + if err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil { return err } - // make sure all data is encoded before attempting to save to bolt - // only required for the b1 and bz1 formats - if s.engine.Format() != TSM1Format { - for _, p := range points { - // Ignore if raw data has already been marshaled. - if p.Data() != nil { - continue - } - - // This was populated earlier, don't need to validate that it's there. - s.mu.RLock() - mf := s.measurementFields[p.Name()] - s.mu.RUnlock() - - // If a measurement is dropped while writes for it are in progress, this could be nil - if mf == nil { - return ErrFieldNotFound - } - - data, err := mf.Codec.EncodeFields(p.Fields()) - if err != nil { - return err - } - p.SetData(data) - } - } - // Write to the engine. - if err := s.engine.WritePoints(points, measurementFieldsToSave, seriesToCreate); err != nil { + if err := s.engine.WritePoints(points); err != nil { s.statMap.Add(statWritePointsFail, 1) return fmt.Errorf("engine: %s", err) } @@ -315,39 +273,21 @@ func (s *Shard) DeleteMeasurement(name string, seriesKeys []string) error { return err } - // Remove entry from shard index. - s.mu.Lock() - delete(s.measurementFields, name) - s.mu.Unlock() - return nil } -func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) (map[string]*MeasurementFields, error) { +func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error { if len(fieldsToCreate) == 0 { - return nil, nil + return nil } - s.mu.Lock() - defer s.mu.Unlock() - // add fields - measurementsToSave := make(map[string]*MeasurementFields) for _, f := range fieldsToCreate { - m := s.measurementFields[f.Measurement] - if m == nil { - m = measurementsToSave[f.Measurement] - if m == nil { - m = &MeasurementFields{Fields: make(map[string]*Field)} - } - s.measurementFields[f.Measurement] = m - } - - measurementsToSave[f.Measurement] = m + m := s.engine.MeasurementFields(f.Measurement) // Add the field to the in memory index if err := m.CreateFieldIfNotExists(f.Field.Name, f.Field.Type, false); err != nil { - return nil, err + return err } // ensure the measurement is in the index and the field is there @@ -355,32 +295,27 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) (map[ measurement.SetFieldName(f.Field.Name) } - return measurementsToSave, nil + return nil } // validateSeriesAndFields checks which series and fields are new and whose metadata should be saved and indexed -func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*SeriesCreate, []*FieldCreate, []string, error) { - var seriesToCreate []*SeriesCreate +func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*FieldCreate, error) { var fieldsToCreate []*FieldCreate - var seriesToAddShardTo []string // get the shard mutex for locally defined fields for _, p := range points { // see if the series should be added to the index - if ss := s.index.Series(string(p.Key())); ss == nil { - series := NewSeries(string(p.Key()), p.Tags()) - seriesToCreate = append(seriesToCreate, &SeriesCreate{p.Name(), series}) - seriesToAddShardTo = append(seriesToAddShardTo, series.Key) - } else if !ss.shardIDs[s.id] { - // this is the first time this series is being written into this shard, persist it - seriesToCreate = append(seriesToCreate, &SeriesCreate{p.Name(), ss}) - seriesToAddShardTo = append(seriesToAddShardTo, ss.Key) + ss := s.index.Series(string(p.Key())) + if ss == nil { + ss = NewSeries(string(p.Key()), p.Tags()) + s.statMap.Add(statSeriesCreate, 1) } + ss = s.index.CreateSeriesIndexIfNotExists(p.Name(), ss) + s.index.AssignShard(ss.Key, ss.id) + // see if the field definitions need to be saved to the shard - s.mu.RLock() - mf := s.measurementFields[p.Name()] - s.mu.RUnlock() + mf := s.engine.MeasurementFields(p.Name()) if mf == nil { for name, value := range p.Fields() { @@ -391,10 +326,10 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*SeriesCreate, // validate field types and encode data for name, value := range p.Fields() { - if f := mf.Fields[name]; f != nil { + if f := mf.Field(name); f != nil { // Field present in shard metadata, make sure there is no type conflict. if f.Type != influxql.InspectDataType(value) { - return nil, nil, nil, fmt.Errorf("field type conflict: input field \"%s\" on measurement \"%s\" is type %T, already exists as type %s", name, p.Name(), value, f.Type) + return nil, fmt.Errorf("field type conflict: input field \"%s\" on measurement \"%s\" is type %T, already exists as type %s", name, p.Name(), value, f.Type) } continue // Field is present, and it's of the same type. Nothing more to do. @@ -404,7 +339,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]*SeriesCreate, } } - return seriesToCreate, fieldsToCreate, seriesToAddShardTo, nil + return fieldsToCreate, nil } // SeriesCount returns the number of series buckets on the shard. @@ -568,14 +503,23 @@ func (a Shards) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // MeasurementFields holds the fields of a measurement and their codec. type MeasurementFields struct { - Fields map[string]*Field `json:"fields"` + mu sync.RWMutex + + fields map[string]*Field `json:"fields"` Codec *FieldCodec } +func NewMeasurementFields() *MeasurementFields { + return &MeasurementFields{fields: make(map[string]*Field)} +} + // MarshalBinary encodes the object to a binary format. func (m *MeasurementFields) MarshalBinary() ([]byte, error) { + m.mu.RLock() + defer m.mu.RUnlock() + var pb internal.MeasurementFields - for _, f := range m.Fields { + for _, f := range m.fields { id := int32(f.ID) name := f.Name t := int32(f.Type) @@ -586,13 +530,16 @@ func (m *MeasurementFields) MarshalBinary() ([]byte, error) { // UnmarshalBinary decodes the object from a binary format. func (m *MeasurementFields) UnmarshalBinary(buf []byte) error { + m.mu.Lock() + defer m.mu.Unlock() + var pb internal.MeasurementFields if err := proto.Unmarshal(buf, &pb); err != nil { return err } - m.Fields = make(map[string]*Field, len(pb.Fields)) + m.fields = make(map[string]*Field, len(pb.Fields)) for _, f := range pb.Fields { - m.Fields[f.GetName()] = &Field{ID: uint8(f.GetID()), Name: f.GetName(), Type: influxql.DataType(f.GetType())} + m.fields[f.GetName()] = &Field{ID: uint8(f.GetID()), Name: f.GetName(), Type: influxql.DataType(f.GetType())} } return nil } @@ -601,31 +548,44 @@ func (m *MeasurementFields) UnmarshalBinary(buf []byte) error { // Returns an error if 255 fields have already been created on the measurement or // the fields already exists with a different type. func (m *MeasurementFields) CreateFieldIfNotExists(name string, typ influxql.DataType, limitCount bool) error { + m.mu.RLock() + // Ignore if the field already exists. - if f := m.Fields[name]; f != nil { + if f := m.fields[name]; f != nil { if f.Type != typ { + m.mu.RUnlock() return ErrFieldTypeConflict } + m.mu.RUnlock() return nil } + m.mu.RUnlock() - // If we're supposed to limit the number of fields, only 255 are allowed. If we go over that then return an error. - if len(m.Fields)+1 > math.MaxUint8 && limitCount { - return ErrFieldOverflow + m.mu.Lock() + if f := m.fields[name]; f != nil { + return nil } // Create and append a new field. f := &Field{ - ID: uint8(len(m.Fields) + 1), + ID: uint8(len(m.fields) + 1), Name: name, Type: typ, } - m.Fields[name] = f - m.Codec = NewFieldCodec(m.Fields) + m.fields[name] = f + m.Codec = NewFieldCodec(m.fields) + m.mu.Unlock() return nil } +func (m *MeasurementFields) Field(name string) *Field { + m.mu.RLock() + f := m.fields[name] + m.mu.RUnlock() + return f +} + // Field represents a series field. type Field struct { ID uint8 `json:"id,omitempty"` diff --git a/tsdb/store.go b/tsdb/store.go index e3a16b5f2b2..cf3eda98da3 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -616,18 +616,20 @@ func (s *Store) IteratorCreators() influxql.IteratorCreators { // WriteToShard writes a list of points to a shard identified by its ID. func (s *Store) WriteToShard(shardID uint64, points []models.Point) error { s.mu.RLock() - defer s.mu.RUnlock() select { case <-s.closing: + s.mu.RUnlock() return ErrStoreClosed default: } sh, ok := s.shards[shardID] if !ok { + s.mu.RUnlock() return ErrShardNotFound } + s.mu.RUnlock() return sh.WritePoints(points) }