diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index 47bc1dc8ef0..08f3c69ae41 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -2,6 +2,7 @@ package coordinator import ( "errors" + "fmt" "sort" "sync" "sync/atomic" @@ -308,7 +309,11 @@ func (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, c ch := make(chan error, len(shardMappings.Points)) for shardID, points := range shardMappings.Points { go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) { - ch <- w.writeToShard(shard, database, retentionPolicy, points) + err := w.writeToShard(shard, database, retentionPolicy, points) + if err == tsdb.ErrShardDeletion { + err = tsdb.PartialWriteError{Reason: fmt.Sprintf("shard %d is pending deletion", shard.ID), Dropped: len(points)} + } + ch <- err }(shardMappings.Shards[shardID], database, retentionPolicy, points) } diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 13ca53302fc..0f81577c8cd 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -726,6 +726,7 @@ func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) { if err != nil { t.Fatal(err) } + defer itr.Close() fitr := itr.(query.FloatIterator) if p, err := fitr.Next(); err != nil { @@ -783,6 +784,7 @@ func TestEngine_CreateIterator_TSM_Descending(t *testing.T) { if err != nil { t.Fatal(err) } + defer itr.Close() fitr := itr.(query.FloatIterator) if p, err := fitr.Next(); err != nil { @@ -1749,6 +1751,7 @@ func TestEngine_CreateCursor_Ascending(t *testing.T) { if err != nil { t.Fatal(err) } + defer cur.Close() fcur := cur.(tsdb.FloatBatchCursor) ts, vs := fcur.Next() @@ -1808,6 +1811,7 @@ func TestEngine_CreateCursor_Descending(t *testing.T) { if err != nil { t.Fatal(err) } + defer cur.Close() fcur := cur.(tsdb.FloatBatchCursor) ts, vs := fcur.Next() diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 217252bbfff..2afe6e575f5 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -530,7 +530,10 @@ func (f *FileStore) Close() error { defer f.mu.Unlock() for _, file := range f.files { - file.Close() + err := file.Close() + if err != nil { + return err + } } f.lastFileStats = nil diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index b3b13bd0838..0f9d1870825 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -25,7 +25,8 @@ var nilOffset = []byte{255, 255, 255, 255} // TSMReader is a reader for a TSM file. type TSMReader struct { // refs is the count of active references to this reader. - refs int64 + refs int64 + refsWG sync.WaitGroup mu sync.RWMutex @@ -398,13 +399,11 @@ func (t *TSMReader) Type(key []byte) (byte, error) { // Close closes the TSMReader. func (t *TSMReader) Close() error { + t.refsWG.Wait() + t.mu.Lock() defer t.mu.Unlock() - if t.InUse() { - return ErrFileInUse - } - if err := t.accessor.close(); err != nil { return err } @@ -417,6 +416,7 @@ func (t *TSMReader) Close() error { // there are no more references. func (t *TSMReader) Ref() { atomic.AddInt64(&t.refs, 1) + t.refsWG.Add(1) } // Unref removes a usage record of this TSMReader. If the Reader was closed @@ -424,6 +424,7 @@ func (t *TSMReader) Ref() { // be closed and remove func (t *TSMReader) Unref() { atomic.AddInt64(&t.refs, -1) + t.refsWG.Done() } // InUse returns whether the TSMReader currently has any active references. @@ -455,7 +456,10 @@ func (t *TSMReader) remove() error { } if path != "" { - os.RemoveAll(path) + err := os.RemoveAll(path) + if err != nil { + return err + } } if err := t.tombstoner.Delete(); err != nil { diff --git a/tsdb/engine/tsm1/reader_test.go b/tsdb/engine/tsm1/reader_test.go index 7618ae68058..4c6fc61e043 100644 --- a/tsdb/engine/tsm1/reader_test.go +++ b/tsdb/engine/tsm1/reader_test.go @@ -1828,10 +1828,6 @@ func TestTSMReader_References(t *testing.T) { r.Ref() - if err := r.Close(); err != ErrFileInUse { - t.Fatalf("expected error closing reader: %v", err) - } - if err := r.Remove(); err != ErrFileInUse { t.Fatalf("expected error removing reader: %v", err) } diff --git a/tsdb/store.go b/tsdb/store.go index 57de371608d..da4dcd25654 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -31,6 +31,8 @@ var ( ErrShardNotFound = fmt.Errorf("shard not found") // ErrStoreClosed is returned when trying to use a closed Store. ErrStoreClosed = fmt.Errorf("store is closed") + // ErrShardDeletion is returned when trying to create a shard that is being deleted + ErrShardDeletion = errors.New("shard is being deleted") ) // Statistics gathered by the store. @@ -523,7 +525,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en // Shard may be undergoing a pending deletion. While the shard can be // recreated, it must wait for the pending delete to finish. if _, ok := s.pendingShardDeletes[shardID]; ok { - return fmt.Errorf("shard %d is pending deletion and cannot be created again until finished", shardID) + return ErrShardDeletion } // Create the db and retention policy directories if they don't exist.