Skip to content

Commit

Permalink
fix calling of stop method on series store (grafana#2911)
Browse files Browse the repository at this point in the history
* fix calling of stop method on series store

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* do not embed fetcher in seriesStore to avoid errors

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
  • Loading branch information
sandeepsukhani authored Jul 22, 2020
1 parent 0ed1041 commit 2c6a44a
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 26 deletions.
36 changes: 18 additions & 18 deletions chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ func (cfg *StoreConfig) Validate() error {
type baseStore struct {
cfg StoreConfig

index IndexClient
chunks Client
schema BaseSchema
limits StoreLimits
*Fetcher
index IndexClient
chunks Client
schema BaseSchema
limits StoreLimits
fetcher *Fetcher
}

func newBaseStore(cfg StoreConfig, schema BaseSchema, index IndexClient, chunks Client, limits StoreLimits, chunksCache cache.Cache) (baseStore, error) {
Expand All @@ -109,10 +109,17 @@ func newBaseStore(cfg StoreConfig, schema BaseSchema, index IndexClient, chunks
chunks: chunks,
schema: schema,
limits: limits,
Fetcher: fetcher,
fetcher: fetcher,
}, nil
}

// Stop any background goroutines (ie in the cache.)
func (c *baseStore) Stop() {
c.fetcher.storage.Stop()
c.fetcher.Stop()
c.index.Stop()
}

// store implements Store
type store struct {
baseStore
Expand All @@ -131,13 +138,6 @@ func newStore(cfg StoreConfig, schema StoreSchema, index IndexClient, chunks Cli
}, nil
}

// Stop any background goroutines (ie in the cache.)
func (c *store) Stop() {
c.storage.Stop()
c.Fetcher.Stop()
c.index.Stop()
}

// Put implements ChunkStore
func (c *store) Put(ctx context.Context, chunks []Chunk) error {
for _, chunk := range chunks {
Expand All @@ -153,12 +153,12 @@ func (c *store) PutOne(ctx context.Context, from, through model.Time, chunk Chun
log, ctx := spanlogger.New(ctx, "ChunkStore.PutOne")
chunks := []Chunk{chunk}

err := c.storage.PutChunks(ctx, chunks)
err := c.fetcher.storage.PutChunks(ctx, chunks)
if err != nil {
return err
}

if cacheErr := c.writeBackCache(ctx, chunks); cacheErr != nil {
if cacheErr := c.fetcher.writeBackCache(ctx, chunks); cacheErr != nil {
level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr)
}

Expand Down Expand Up @@ -278,7 +278,7 @@ func (c *store) LabelNamesForMetricName(ctx context.Context, userID string, from
level.Debug(log).Log("msg", "Chunks post filtering", "chunks", len(chunks))

// Now fetch the actual chunk data from Memcache / S3
allChunks, err := c.FetchChunks(ctx, filtered, keys)
allChunks, err := c.fetcher.FetchChunks(ctx, filtered, keys)
if err != nil {
level.Error(log).Log("msg", "FetchChunks", "err", err)
return nil, err
Expand Down Expand Up @@ -370,7 +370,7 @@ func (c *store) getMetricNameChunks(ctx context.Context, userID string, from, th

// Now fetch the actual chunk data from Memcache / S3
keys := keysFromChunks(filtered)
allChunks, err := c.FetchChunks(ctx, filtered, keys)
allChunks, err := c.fetcher.FetchChunks(ctx, filtered, keys)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -648,7 +648,7 @@ func (c *baseStore) reboundChunk(ctx context.Context, userID, chunkID string, pa
return ErrParialDeleteChunkNoOverlap
}

chunks, err := c.Fetcher.FetchChunks(ctx, []Chunk{chunk}, []string{chunkID})
chunks, err := c.fetcher.FetchChunks(ctx, []Chunk{chunk}, []string{chunkID})
if err != nil {
if err == ErrStorageObjectNotFound {
return nil
Expand Down
4 changes: 2 additions & 2 deletions chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ func TestIndexCachingWorks(t *testing.T) {
store := newTestChunkStoreConfig(t, "v9", storeCfg)
defer store.Stop()

storage := store.(CompositeStore).stores[0].Store.(*seriesStore).storage.(*MockStorage)
storage := store.(CompositeStore).stores[0].Store.(*seriesStore).fetcher.storage.(*MockStorage)

fooChunk1 := dummyChunkFor(model.Time(0).Add(15*time.Second), metric)
err := fooChunk1.Encode()
Expand Down Expand Up @@ -1326,7 +1326,7 @@ func TestDisableIndexDeduplication(t *testing.T) {
store := newTestChunkStoreConfig(t, "v9", storeCfg)
defer store.Stop()

storage := store.(CompositeStore).stores[0].Store.(*seriesStore).storage.(*MockStorage)
storage := store.(CompositeStore).stores[0].Store.(*seriesStore).fetcher.storage.(*MockStorage)

fooChunk1 := dummyChunkFor(model.Time(0).Add(15*time.Second), metric)
err := fooChunk1.Encode()
Expand Down
12 changes: 6 additions & 6 deletions series_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (c *seriesStore) GetChunkRefs(ctx context.Context, userID string, from, thr
return [][]Chunk{}, []*Fetcher{}, nil
}

return [][]Chunk{chunks}, []*Fetcher{c.baseStore.Fetcher}, nil
return [][]Chunk{chunks}, []*Fetcher{c.baseStore.fetcher}, nil
}

// LabelNamesForMetricName retrieves all label names for a metric name.
Expand Down Expand Up @@ -251,7 +251,7 @@ func (c *seriesStore) lookupLabelNamesByChunks(ctx context.Context, from, throug
chunksPerQuery.Observe(float64(len(filtered)))

// Now fetch the actual chunk data from Memcache / S3
allChunks, err := c.FetchChunks(ctx, filtered, keys)
allChunks, err := c.fetcher.FetchChunks(ctx, filtered, keys)
if err != nil {
level.Error(log).Log("msg", "FetchChunks", "err", err)
return nil, err
Expand Down Expand Up @@ -424,7 +424,7 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun
writeChunk := true

// If this chunk is in cache it must already be in the database so we don't need to write it again
found, _, _ := c.cache.Fetch(ctx, []string{chunk.ExternalKey()})
found, _, _ := c.fetcher.cache.Fetch(ctx, []string{chunk.ExternalKey()})
if len(found) > 0 {
writeChunk = false
dedupedChunksTotal.Inc()
Expand All @@ -444,7 +444,7 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun
return err
}

if oic, ok := c.storage.(ObjectAndIndexClient); ok {
if oic, ok := c.fetcher.storage.(ObjectAndIndexClient); ok {
chunks := chunks
if !writeChunk {
chunks = []Chunk{}
Expand All @@ -455,7 +455,7 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun
} else {
// chunk not found, write it.
if writeChunk {
err := c.storage.PutChunks(ctx, chunks)
err := c.fetcher.storage.PutChunks(ctx, chunks)
if err != nil {
return err
}
Expand All @@ -467,7 +467,7 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun

// we already have the chunk in the cache so don't write it back to the cache.
if writeChunk {
if cacheErr := c.writeBackCache(ctx, chunks); cacheErr != nil {
if cacheErr := c.fetcher.writeBackCache(ctx, chunks); cacheErr != nil {
level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr)
}
}
Expand Down

0 comments on commit 2c6a44a

Please sign in to comment.