Skip to content

Commit

Permalink
Don't convert chunks to matrixes and then merges them... (grafana#713)
Browse files Browse the repository at this point in the history
* Make chunk store return chunks; converting to series is now separate.

- Refactor chunk store to return chunks.
- Refactor querier package to use new Prometheus 2.0 interfaces.
- Have separate querier for ingesters, chunk store and metadata.
- Make remote read handler take a Queryable.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>

* Simple query benchmark, using in-memory chunks, but running an acutal PromQL query.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>

* Don't convert chunks to matrixes and then merges them; use iterators and the upstream heap-based merging code.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>

* Optimise the ChunksToMatrix function.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>

* Update prometheus to pull in 78efdc6d - Avoid infinite loop on duplicate NaN values

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>

* Register query engine metrics.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>

* Review feedback: clarify log message.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
  • Loading branch information
tomwilkie authored Jul 13, 2018
1 parent 2c4e2f2 commit 8f8e7b9
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 33 deletions.
29 changes: 12 additions & 17 deletions chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,36 +319,31 @@ func equalByKey(a, b Chunk) bool {
a.From == b.From && a.Through == b.Through && a.Checksum == b.Checksum
}

func chunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error) {
// ChunksToMatrix converts a set of chunks to a model.Matrix.
func ChunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error) {
sp, ctx := ot.StartSpanFromContext(ctx, "chunksToMatrix")
defer sp.Finish()
sp.LogFields(otlog.Int("chunks", len(chunks)))

// Group chunks by series, sort and dedupe samples.
sampleStreams := map[model.Fingerprint]*model.SampleStream{}
metrics := map[model.Fingerprint]model.Metric{}
samplesBySeries := map[model.Fingerprint][][]model.SamplePair{}
for _, c := range chunks {
ss, ok := sampleStreams[c.Fingerprint]
if !ok {
ss = &model.SampleStream{
Metric: c.Metric,
}
sampleStreams[c.Fingerprint] = ss
}

samples, err := c.Samples(from, through)
ss, err := c.Samples(from, through)
if err != nil {
return nil, err
}

ss.Values = util.MergeSampleSets(ss.Values, samples)
metrics[c.Fingerprint] = c.Metric
samplesBySeries[c.Fingerprint] = append(samplesBySeries[c.Fingerprint], ss)
}
sp.LogFields(otlog.Int("sample streams", len(sampleStreams)))
sp.LogFields(otlog.Int("series", len(samplesBySeries)))

matrix := make(model.Matrix, 0, len(sampleStreams))
for _, ss := range sampleStreams {
matrix := make(model.Matrix, 0, len(samplesBySeries))
for fp, ss := range samplesBySeries {
matrix = append(matrix, &model.SampleStream{
Metric: ss.Metric,
Values: ss.Values,
Metric: metrics[fp],
Values: util.MergeNSampleSets(ss...),
})
}

Expand Down
18 changes: 5 additions & 13 deletions chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (s *spanLogger) Log(kvps ...interface{}) error {
}

// Get implements ChunkStore
func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) (model.Matrix, error) {
func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) ([]Chunk, error) {
log, ctx := newSpanLogger(ctx, "ChunkStore.Get")
defer log.Span.Finish()

Expand Down Expand Up @@ -227,19 +227,11 @@ func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers .
metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(allMatchers)
if ok && metricNameMatcher.Type == labels.MatchEqual {
log.Span.SetTag("metric", metricNameMatcher.Value)
return c.getMetricNameMatrix(ctx, from, through, matchers, metricNameMatcher.Value)
return c.getMetricNameChunks(ctx, from, through, matchers, metricNameMatcher.Value)
}

// Otherwise we consult the metric name index first and then create queries for each matching metric name.
return c.getSeriesMatrix(ctx, from, through, matchers, metricNameMatcher)
}

func (c *Store) getMetricNameMatrix(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricName string) (model.Matrix, error) {
chunks, err := c.getMetricNameChunks(ctx, from, through, allMatchers, metricName)
if err != nil {
return nil, err
}
return chunksToMatrix(ctx, chunks, from, through)
return c.getSeriesChunks(ctx, from, through, matchers, metricNameMatcher)
}

func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricName string) ([]Chunk, error) {
Expand Down Expand Up @@ -346,7 +338,7 @@ func ProcessCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) (found [
return
}

func (c *Store) getSeriesMatrix(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricNameMatcher *labels.Matcher) (model.Matrix, error) {
func (c *Store) getSeriesChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricNameMatcher *labels.Matcher) ([]Chunk, error) {
// Get all series from the index
userID, err := user.ExtractOrgID(ctx)
if err != nil {
Expand Down Expand Up @@ -407,7 +399,7 @@ outer:
}
}
}
return chunksToMatrix(ctx, chunks, from, through)
return chunks, nil
}

func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through model.Time, matchers []*labels.Matcher, metricName string) ([]Chunk, error) {
Expand Down
10 changes: 8 additions & 2 deletions chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ func TestChunkStore_Get(t *testing.T) {
}

// Query with ordinary time-range
matrix1, err := store.Get(ctx, now.Add(-time.Hour), now, matchers...)
chunks1, err := store.Get(ctx, now.Add(-time.Hour), now, matchers...)
require.NoError(t, err)

matrix1, err := ChunksToMatrix(ctx, chunks1, now.Add(-time.Hour), now)
require.NoError(t, err)

sort.Sort(ByFingerprint(matrix1))
Expand All @@ -229,7 +232,10 @@ func TestChunkStore_Get(t *testing.T) {
}

// Pushing end of time-range into future should yield exact same resultset
matrix2, err := store.Get(ctx, now.Add(-time.Hour), now.Add(time.Hour*24*30), matchers...)
chunks2, err := store.Get(ctx, now.Add(-time.Hour), now.Add(time.Hour*24*30), matchers...)
require.NoError(t, err)

matrix2, err := ChunksToMatrix(ctx, chunks2, now.Add(-time.Hour), now)
require.NoError(t, err)

sort.Sort(ByFingerprint(matrix2))
Expand Down
2 changes: 1 addition & 1 deletion chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestChunksToMatrix(t *testing.T) {
},
},
} {
matrix, err := chunksToMatrix(context.Background(), c.chunks, chunk1.From, chunk3.Through)
matrix, err := ChunksToMatrix(context.Background(), c.chunks, chunk1.From, chunk3.Through)
require.NoError(t, err)

sort.Sort(matrix)
Expand Down

0 comments on commit 8f8e7b9

Please sign in to comment.