Skip to content

Commit

Permalink
Merge pull request #1123 from grafana/fetch-chunks
Browse files Browse the repository at this point in the history
Download chunks as tarball so you can run tests against them.
  • Loading branch information
tomwilkie authored Dec 4, 2018
2 parents e226f0e + 4dc467c commit b4a3a2e
Show file tree
Hide file tree
Showing 58 changed files with 3,896 additions and 1,109 deletions.
13 changes: 3 additions & 10 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/querier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func main() {
subrouter.Path("/read").Handler(middleware.AuthenticateUser.Wrap(querier.RemoteReadHandler(queryable)))
subrouter.Path("/validate_expr").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(dist.ValidateExprHandler)))
subrouter.Path("/user_stats").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(dist.UserStatsHandler)))
subrouter.Path("/chunks").Handler(middleware.AuthenticateUser.Wrap(querier.ChunksHandler(queryable)))

server.Run()
}
66 changes: 37 additions & 29 deletions pkg/chunk/encoding/bigchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (b *bigchunk) Size() int {
func (b *bigchunk) NewIterator() Iterator {
return &bigchunkIterator{
bigchunk: b,
curr: b.chunks[0].Iterator(),
}
}

Expand Down Expand Up @@ -215,81 +216,88 @@ func (r *reader) ReadBytes(count int) ([]byte, error) {
type bigchunkIterator struct {
*bigchunk

iter chunkenc.Iterator
curr chunkenc.Iterator
i int
}

func (i *bigchunkIterator) FindAtOrAfter(target model.Time) bool {
// On average we'll have about 12*3600/15/120 = 24 chunks, so just linear
// scan for now.
i.i = 0
for i.i < len(i.chunks) {
if int64(target) <= i.ends[i.i] {
break
func (it *bigchunkIterator) FindAtOrAfter(target model.Time) bool {
if it.i >= len(it.chunks) {
return false
}

// If the seek is outside the current chunk, use the index to find the right
// chunk.
if int64(target) < it.starts[it.i] || int64(target) > it.ends[it.i] {
it.curr = nil
for it.i = 0; it.i < len(it.chunks) && int64(target) > it.ends[it.i]; it.i++ {
}
i.i++
}

if i.i >= len(i.chunks) {
if it.i >= len(it.chunks) {
return false
}

i.iter = i.chunks[i.i].Iterator()
i.i++
if it.curr == nil {
it.curr = it.chunks[it.i].Iterator()
} else if t, _ := it.curr.At(); int64(target) <= t {
it.curr = it.chunks[it.i].Iterator()
}

for i.iter.Next() {
t, _ := i.iter.At()
for it.curr.Next() {
t, _ := it.curr.At()
if t >= int64(target) {
return true
}
}

return false
}

func (i *bigchunkIterator) Scan() bool {
if i.iter != nil && i.iter.Next() {
func (it *bigchunkIterator) Scan() bool {
if it.curr.Next() {
return true
}
if err := it.curr.Err(); err != nil {
return false
}

for i.i < len(i.chunks) {
i.iter = i.chunks[i.i].Iterator()
i.i++
if i.iter.Next() {
for it.i < len(it.chunks)-1 {
it.i++
it.curr = it.chunks[it.i].Iterator()
if it.curr.Next() {
return true
}
}
return false
}

func (i *bigchunkIterator) Value() model.SamplePair {
t, v := i.iter.At()
func (it *bigchunkIterator) Value() model.SamplePair {
t, v := it.curr.At()
return model.SamplePair{
Timestamp: model.Time(t),
Value: model.SampleValue(v),
}
}

func (i *bigchunkIterator) Batch(size int) Batch {
func (it *bigchunkIterator) Batch(size int) Batch {
var result Batch
j := 0
for j < size {
t, v := i.iter.At()
t, v := it.curr.At()
result.Timestamps[j] = t
result.Values[j] = v
j++

if j < size && !i.Scan() {
if j < size && !it.Scan() {
break
}
}
result.Length = j
return result
}

func (i *bigchunkIterator) Err() error {
if i.iter != nil {
return i.iter.Err()
func (it *bigchunkIterator) Err() error {
if it.curr != nil {
return it.curr.Err()
}
return nil
}
Expand Down
29 changes: 28 additions & 1 deletion pkg/chunk/encoding/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestChunk(t *testing.T) {
{Varbit, 2048},
{Bigchunk, 4096},
} {
for samples := 0; samples < tc.maxSamples; samples += tc.maxSamples / 10 {
for samples := tc.maxSamples / 10; samples < tc.maxSamples; samples += tc.maxSamples / 10 {

// DoubleDelta doesn't support zero length chunks.
if tc.encoding == DoubleDelta && samples == 0 {
Expand All @@ -79,6 +79,10 @@ func TestChunk(t *testing.T) {
testChunkSeek(t, tc.encoding, samples)
})

t.Run(fmt.Sprintf("testChunkSeekForward/%s/%d", tc.encoding.String(), samples), func(t *testing.T) {
testChunkSeekForward(t, tc.encoding, samples)
})

t.Run(fmt.Sprintf("testChunkBatch/%s/%d", tc.encoding.String(), samples), func(t *testing.T) {
testChunkBatch(t, tc.encoding, samples)
})
Expand Down Expand Up @@ -137,6 +141,7 @@ func testChunkEncoding(t *testing.T, encoding Encoding, samples int) {
}

// testChunkSeek checks seek works as expected.
// This version of the test will seek backwards.
func testChunkSeek(t *testing.T, encoding Encoding, samples int) {
chunk := mkChunk(t, encoding, samples)

Expand All @@ -159,6 +164,28 @@ func testChunkSeek(t *testing.T, encoding Encoding, samples int) {
}
}

func testChunkSeekForward(t *testing.T, encoding Encoding, samples int) {
chunk := mkChunk(t, encoding, samples)

iter := chunk.NewIterator()
for i := 0; i < samples; i += samples / 10 {
require.True(t, iter.FindAtOrAfter(model.Time(i*step)))
sample := iter.Value()
require.EqualValues(t, model.Time(i*step), sample.Timestamp)
require.EqualValues(t, model.SampleValue(i), sample.Value)

j := i + 1
for ; j < (i+samples/10) && j < samples; j++ {
require.True(t, iter.Scan())
sample := iter.Value()
require.EqualValues(t, model.Time(j*step), sample.Timestamp)
require.EqualValues(t, model.SampleValue(j), sample.Value)
}
}
require.False(t, iter.Scan())
require.NoError(t, iter.Err())
}

func testChunkBatch(t *testing.T, encoding Encoding, samples int) {
chunk := mkChunk(t, encoding, samples)

Expand Down
9 changes: 9 additions & 0 deletions pkg/querier/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ func newIteratorAdapter(underlying iterator) storage.SeriesIterator {

// Seek implements storage.SeriesIterator.
func (a *iteratorAdapter) Seek(t int64) bool {
// Optimisation: see if the seek is within the current batch.
if a.curr.Length > 0 && t >= a.curr.Timestamps[0] && t <= a.curr.Timestamps[a.curr.Length-1] {
a.curr.Index = 0
for a.curr.Index < a.curr.Length && t > a.curr.Timestamps[a.curr.Index] {
a.curr.Index++
}
return true
}

a.curr.Length = -1
a.batchSize = 1
if a.underlying.Seek(t, a.batchSize) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/querier/batch/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ func (i *chunkIterator) Seek(t int64, size int) bool {
return false
}

// If the seek is to the middle of the current batch, and size fits, we can
// shortcut.
if i.batch.Length > 0 && t >= i.batch.Timestamps[0] && t <= i.batch.Timestamps[i.batch.Length-1] {
i.batch.Index = 0
for i.batch.Index < i.batch.Length && t > i.batch.Timestamps[i.batch.Index] {
i.batch.Index++
}
if i.batch.Index+size < i.batch.Length {
return true
}
}

if i.it.FindAtOrAfter(model.Time(t)) {
i.batch = i.it.Batch(size)
return i.batch.Length > 0
Expand Down
49 changes: 49 additions & 0 deletions pkg/querier/batch/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,52 @@ func testSeek(t require.TestingT, points int, iter storage.SeriesIterator) {
}
}
}

func TestSeek(t *testing.T) {
var it mockIterator
c := chunkIterator{
chunk: chunk.Chunk{
Through: promchunk.BatchSize,
},
it: &it,
}

for i := 0; i < promchunk.BatchSize-1; i++ {
require.True(t, c.Seek(int64(i), 1))
}
require.Equal(t, 1, it.seeks)

require.True(t, c.Seek(int64(promchunk.BatchSize), 1))
require.Equal(t, 2, it.seeks)
}

type mockIterator struct {
seeks int
}

func (i *mockIterator) Scan() bool {
return true
}

func (i *mockIterator) FindAtOrAfter(model.Time) bool {
i.seeks++
return true
}

func (i *mockIterator) Value() model.SamplePair {
return model.SamplePair{}
}

func (i *mockIterator) Batch(size int) promchunk.Batch {
batch := promchunk.Batch{
Length: promchunk.BatchSize,
}
for i := 0; i < promchunk.BatchSize; i++ {
batch.Timestamps[i] = int64(i)
}
return batch
}

func (i *mockIterator) Err() error {
return nil
}
54 changes: 35 additions & 19 deletions pkg/querier/batch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,10 @@ func newMergeIterator(cs []chunk.Chunk) *mergeIterator {
}

c := &mergeIterator{
its: its,
h: make(iteratorHeap, 0, len(its)),

// Is 2x # iterators guaranteed to be enough?
// Yes: if we've popped two batches of each iterators, we're guaranteed
// that the last entry in the first of the resulting merged batches is smaller
// than any remaining iterator.
batches: make(batchStream, 0, len(its)*2),
batchesBuf: make(batchStream, 0, len(its)*2),
its: its,
h: make(iteratorHeap, 0, len(its)),
batches: make(batchStream, 0, len(its)*2*promchunk.BatchSize),
batchesBuf: make(batchStream, 0, len(its)*2*promchunk.BatchSize),
}

for _, iter := range c.its {
Expand All @@ -56,22 +51,43 @@ func newMergeIterator(cs []chunk.Chunk) *mergeIterator {
}

func (c *mergeIterator) Seek(t int64, size int) bool {
c.h = c.h[:0]
c.batches = c.batches[:0]

for _, iter := range c.its {
if iter.Seek(t, size) {
c.h = append(c.h, iter)
continue
// Optimisation to see if the seek is within our current caches batches.
found:
for len(c.batches) > 0 {
batch := &c.batches[0]
if t >= batch.Timestamps[0] && t <= batch.Timestamps[batch.Length-1] {
batch.Index = 0
for batch.Index < batch.Length && t > batch.Timestamps[batch.Index] {
batch.Index++
}
break found
}
copy(c.batches, c.batches[1:])
c.batches = c.batches[:len(c.batches)-1]
}

if err := iter.Err(); err != nil {
c.currErr = err
return false
// If we didn't find anything in the current set of batches, reset the heap
// and seek.
if len(c.batches) == 0 {
c.h = c.h[:0]
c.batches = c.batches[:0]

for _, iter := range c.its {
if iter.Seek(t, size) {
c.h = append(c.h, iter)
continue
}

if err := iter.Err(); err != nil {
c.currErr = err
return false
}
}

heap.Init(&c.h)
}

heap.Init(&c.h)
return c.buildNextBatch(size)
}

Expand Down
Loading

0 comments on commit b4a3a2e

Please sign in to comment.