diff --git a/chunk_store.go b/chunk_store.go index eadfa8121ee06..e802e667598a1 100644 --- a/chunk_store.go +++ b/chunk_store.go @@ -90,11 +90,11 @@ func (cfg *StoreConfig) Validate() error { type baseStore struct { cfg StoreConfig - index IndexClient - chunks Client - schema BaseSchema - limits StoreLimits - *Fetcher + index IndexClient + chunks Client + schema BaseSchema + limits StoreLimits + fetcher *Fetcher } func newBaseStore(cfg StoreConfig, schema BaseSchema, index IndexClient, chunks Client, limits StoreLimits, chunksCache cache.Cache) (baseStore, error) { @@ -109,10 +109,17 @@ func newBaseStore(cfg StoreConfig, schema BaseSchema, index IndexClient, chunks chunks: chunks, schema: schema, limits: limits, - Fetcher: fetcher, + fetcher: fetcher, }, nil } +// Stop any background goroutines (ie in the cache.) +func (c *baseStore) Stop() { + c.fetcher.storage.Stop() + c.fetcher.Stop() + c.index.Stop() +} + // store implements Store type store struct { baseStore @@ -131,13 +138,6 @@ func newStore(cfg StoreConfig, schema StoreSchema, index IndexClient, chunks Cli }, nil } -// Stop any background goroutines (ie in the cache.) -func (c *store) Stop() { - c.storage.Stop() - c.Fetcher.Stop() - c.index.Stop() -} - // Put implements ChunkStore func (c *store) Put(ctx context.Context, chunks []Chunk) error { for _, chunk := range chunks { @@ -153,12 +153,12 @@ func (c *store) PutOne(ctx context.Context, from, through model.Time, chunk Chun log, ctx := spanlogger.New(ctx, "ChunkStore.PutOne") chunks := []Chunk{chunk} - err := c.storage.PutChunks(ctx, chunks) + err := c.fetcher.storage.PutChunks(ctx, chunks) if err != nil { return err } - if cacheErr := c.writeBackCache(ctx, chunks); cacheErr != nil { + if cacheErr := c.fetcher.writeBackCache(ctx, chunks); cacheErr != nil { level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) } @@ -278,7 +278,7 @@ func (c *store) LabelNamesForMetricName(ctx context.Context, userID string, from level.Debug(log).Log("msg", "Chunks post filtering", "chunks", len(chunks)) // Now fetch the actual chunk data from Memcache / S3 - allChunks, err := c.FetchChunks(ctx, filtered, keys) + allChunks, err := c.fetcher.FetchChunks(ctx, filtered, keys) if err != nil { level.Error(log).Log("msg", "FetchChunks", "err", err) return nil, err @@ -370,7 +370,7 @@ func (c *store) getMetricNameChunks(ctx context.Context, userID string, from, th // Now fetch the actual chunk data from Memcache / S3 keys := keysFromChunks(filtered) - allChunks, err := c.FetchChunks(ctx, filtered, keys) + allChunks, err := c.fetcher.FetchChunks(ctx, filtered, keys) if err != nil { return nil, err } @@ -648,7 +648,7 @@ func (c *baseStore) reboundChunk(ctx context.Context, userID, chunkID string, pa return ErrParialDeleteChunkNoOverlap } - chunks, err := c.Fetcher.FetchChunks(ctx, []Chunk{chunk}, []string{chunkID}) + chunks, err := c.fetcher.FetchChunks(ctx, []Chunk{chunk}, []string{chunkID}) if err != nil { if err == ErrStorageObjectNotFound { return nil diff --git a/chunk_store_test.go b/chunk_store_test.go index 0a4a80f0924e8..536869f63ad44 100644 --- a/chunk_store_test.go +++ b/chunk_store_test.go @@ -865,7 +865,7 @@ func TestIndexCachingWorks(t *testing.T) { store := newTestChunkStoreConfig(t, "v9", storeCfg) defer store.Stop() - storage := store.(CompositeStore).stores[0].Store.(*seriesStore).storage.(*MockStorage) + storage := store.(CompositeStore).stores[0].Store.(*seriesStore).fetcher.storage.(*MockStorage) fooChunk1 := dummyChunkFor(model.Time(0).Add(15*time.Second), metric) err := fooChunk1.Encode() @@ -1326,7 +1326,7 @@ func TestDisableIndexDeduplication(t *testing.T) { store := newTestChunkStoreConfig(t, "v9", storeCfg) defer store.Stop() - storage := store.(CompositeStore).stores[0].Store.(*seriesStore).storage.(*MockStorage) + storage := store.(CompositeStore).stores[0].Store.(*seriesStore).fetcher.storage.(*MockStorage) fooChunk1 := dummyChunkFor(model.Time(0).Add(15*time.Second), metric) err := fooChunk1.Encode() diff --git a/series_store.go b/series_store.go index 4792ca1ae2e00..3d3373c5dfd4f 100644 --- a/series_store.go +++ b/series_store.go @@ -187,7 +187,7 @@ func (c *seriesStore) GetChunkRefs(ctx context.Context, userID string, from, thr return [][]Chunk{}, []*Fetcher{}, nil } - return [][]Chunk{chunks}, []*Fetcher{c.baseStore.Fetcher}, nil + return [][]Chunk{chunks}, []*Fetcher{c.baseStore.fetcher}, nil } // LabelNamesForMetricName retrieves all label names for a metric name. @@ -251,7 +251,7 @@ func (c *seriesStore) lookupLabelNamesByChunks(ctx context.Context, from, throug chunksPerQuery.Observe(float64(len(filtered))) // Now fetch the actual chunk data from Memcache / S3 - allChunks, err := c.FetchChunks(ctx, filtered, keys) + allChunks, err := c.fetcher.FetchChunks(ctx, filtered, keys) if err != nil { level.Error(log).Log("msg", "FetchChunks", "err", err) return nil, err @@ -424,7 +424,7 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun writeChunk := true // If this chunk is in cache it must already be in the database so we don't need to write it again - found, _, _ := c.cache.Fetch(ctx, []string{chunk.ExternalKey()}) + found, _, _ := c.fetcher.cache.Fetch(ctx, []string{chunk.ExternalKey()}) if len(found) > 0 { writeChunk = false dedupedChunksTotal.Inc() @@ -444,7 +444,7 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun return err } - if oic, ok := c.storage.(ObjectAndIndexClient); ok { + if oic, ok := c.fetcher.storage.(ObjectAndIndexClient); ok { chunks := chunks if !writeChunk { chunks = []Chunk{} @@ -455,7 +455,7 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun } else { // chunk not found, write it. if writeChunk { - err := c.storage.PutChunks(ctx, chunks) + err := c.fetcher.storage.PutChunks(ctx, chunks) if err != nil { return err } @@ -467,7 +467,7 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun // we already have the chunk in the cache so don't write it back to the cache. if writeChunk { - if cacheErr := c.writeBackCache(ctx, chunks); cacheErr != nil { + if cacheErr := c.fetcher.writeBackCache(ctx, chunks); cacheErr != nil { level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) } }