-
Notifications
You must be signed in to change notification settings - Fork 3.4k
/
bloom_querier.go
153 lines (130 loc) · 3.01 KB
/
bloom_querier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package v1
import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)
type BloomQuerier interface {
Seek(BloomOffset) (*Bloom, error)
}
type LazyBloomIter struct {
usePool bool
b *Block
m int // max page size in bytes
// state
initialized bool
err error
curPageIndex int
curPage *BloomPageDecoder
}
// NewLazyBloomIter returns a new lazy bloom iterator.
// If pool is true, the underlying byte slice of the bloom page
// will be returned to the pool for efficiency.
// This can only safely be used when the underlying bloom
// bytes don't escape the decoder.
func NewLazyBloomIter(b *Block, pool bool, maxSize int) *LazyBloomIter {
return &LazyBloomIter{
usePool: pool,
b: b,
m: maxSize,
}
}
func (it *LazyBloomIter) ensureInit() {
// TODO(owen-d): better control over when to decode
if !it.initialized {
if err := it.b.LoadHeaders(); err != nil {
it.err = err
}
it.initialized = true
}
}
func (it *LazyBloomIter) Seek(offset BloomOffset, series ...model.Fingerprint) {
it.ensureInit()
// if we need a different page or the current page hasn't been loaded,
// load the desired page
if it.curPageIndex != offset.Page || it.curPage == nil {
// drop the current page if it exists and
// we're using the pool
if it.curPage != nil && it.usePool {
it.curPage.Relinquish()
}
r, err := it.b.reader.Blooms()
if err != nil {
it.err = errors.Wrap(err, "getting blooms reader")
return
}
decoder, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics, series...)
if err != nil {
it.err = errors.Wrap(err, "loading bloom page")
return
}
it.curPageIndex = offset.Page
it.curPage = decoder
}
it.curPage.Seek(offset.ByteOffset)
}
func (it *LazyBloomIter) Next(series ...model.Fingerprint) bool {
it.ensureInit()
if it.err != nil {
return false
}
return it.next(series...)
}
func (it *LazyBloomIter) next(series ...model.Fingerprint) bool {
if it.err != nil {
return false
}
for it.curPageIndex < len(it.b.blooms.pageHeaders) {
// first access of next page
if it.curPage == nil {
r, err := it.b.reader.Blooms()
if err != nil {
it.err = errors.Wrap(err, "getting blooms reader")
return false
}
it.curPage, err = it.b.blooms.BloomPageDecoder(
r,
it.curPageIndex,
it.m,
it.b.metrics,
series...,
)
if err != nil {
it.err = err
return false
}
continue
}
if !it.curPage.Next() {
// there was an error
if it.curPage.Err() != nil {
return false
}
// we've exhausted the current page, progress to next
it.curPageIndex++
// drop the current page if it exists and
// we're using the pool
if it.usePool {
it.curPage.Relinquish()
}
it.curPage = nil
continue
}
return true
}
// finished last page
return false
}
func (it *LazyBloomIter) At() *Bloom {
return it.curPage.At()
}
func (it *LazyBloomIter) Err() error {
{
if it.err != nil {
return it.err
}
if it.curPage != nil {
return it.curPage.Err()
}
return nil
}
}