@@ -3,6 +3,7 @@ package accnt
3
3
import (
4
4
"sort"
5
5
6
+ "github.com/grafana/metrictank/mdata/chunk"
6
7
"github.com/raintank/worldping-api/pkg/log"
7
8
"gopkg.in/raintank/schema.v1"
8
9
)
@@ -47,32 +48,49 @@ type FlatAccntMet struct {
47
48
chunks map [uint32 ]uint64
48
49
}
49
50
50
- // event types to be used in FlatAccntEvent
51
- const evnt_hit_chnk uint8 = 4
52
- const evnt_add_chnk uint8 = 5
53
- const evnt_del_met uint8 = 6
54
- const evnt_get_total uint8 = 7
55
- const evnt_stop uint8 = 100
56
- const evnt_reset uint8 = 101
57
-
58
51
type FlatAccntEvent struct {
59
- t uint8 // event type
60
- pl interface {} // payload
52
+ eType eventType
53
+ pl interface {} // payload
61
54
}
62
55
56
+ type eventType uint8
57
+
58
+ const (
59
+ evnt_hit_chnk eventType = iota
60
+ evnt_hit_chnks
61
+ evnt_add_chnk
62
+ evnt_add_chnks
63
+ evnt_del_met
64
+ evnt_get_total
65
+ evnt_stop
66
+ evnt_reset
67
+ )
68
+
63
69
// payload to be sent with an add event
64
70
type AddPayload struct {
65
71
metric schema.AMKey
66
72
ts uint32
67
73
size uint64
68
74
}
69
75
76
+ // payload to be sent with an add event
77
+ type AddsPayload struct {
78
+ metric schema.AMKey
79
+ chunks []chunk.IterGen
80
+ }
81
+
70
82
// payload to be sent with a hit event
71
83
type HitPayload struct {
72
84
metric schema.AMKey
73
85
ts uint32
74
86
}
75
87
88
+ // payload to be sent with a hits event
89
+ type HitsPayload struct {
90
+ metric schema.AMKey
91
+ chunks []chunk.IterGen
92
+ }
93
+
76
94
// payload to be sent with del metric event
77
95
type DelMetPayload struct {
78
96
metric schema.AMKey
@@ -111,9 +129,16 @@ func (a *FlatAccnt) AddChunk(metric schema.AMKey, ts uint32, size uint64) {
111
129
a .act (evnt_add_chnk , & AddPayload {metric , ts , size })
112
130
}
113
131
132
+ func (a * FlatAccnt ) AddChunks (metric schema.AMKey , chunks []chunk.IterGen ) {
133
+ a .act (evnt_add_chnks , & AddsPayload {metric , chunks })
134
+ }
135
+
114
136
func (a * FlatAccnt ) HitChunk (metric schema.AMKey , ts uint32 ) {
115
137
a .act (evnt_hit_chnk , & HitPayload {metric , ts })
116
138
}
139
+ func (a * FlatAccnt ) HitChunks (metric schema.AMKey , chunks []chunk.IterGen ) {
140
+ a .act (evnt_hit_chnks , & HitsPayload {metric , chunks })
141
+ }
117
142
118
143
func (a * FlatAccnt ) Stop () {
119
144
a .act (evnt_stop , nil )
@@ -123,10 +148,10 @@ func (a *FlatAccnt) Reset() {
123
148
a .act (evnt_reset , nil )
124
149
}
125
150
126
- func (a * FlatAccnt ) act (t uint8 , payload interface {}) {
151
+ func (a * FlatAccnt ) act (eType eventType , payload interface {}) {
127
152
event := FlatAccntEvent {
128
- t : t ,
129
- pl : payload ,
153
+ eType : eType ,
154
+ pl : payload ,
130
155
}
131
156
132
157
select {
@@ -141,7 +166,7 @@ func (a *FlatAccnt) eventLoop() {
141
166
for {
142
167
select {
143
168
case event := <- a .eventQ :
144
- switch event .t {
169
+ switch event .eType {
145
170
case evnt_add_chnk :
146
171
payload := event .pl .(* AddPayload )
147
172
a .add (payload .metric , payload .ts , payload .size )
@@ -152,6 +177,18 @@ func (a *FlatAccnt) eventLoop() {
152
177
Ts : payload .ts ,
153
178
},
154
179
)
180
+ case evnt_add_chnks :
181
+ payload := event .pl .(* AddsPayload )
182
+ a .addRange (payload .metric , payload .chunks )
183
+ cacheChunkAdd .Add (len (payload .chunks ))
184
+ for _ , chunk := range payload .chunks {
185
+ a .lru .touch (
186
+ EvictTarget {
187
+ Metric : payload .metric ,
188
+ Ts : chunk .Ts ,
189
+ },
190
+ )
191
+ }
155
192
case evnt_hit_chnk :
156
193
payload := event .pl .(* HitPayload )
157
194
a .lru .touch (
@@ -160,6 +197,16 @@ func (a *FlatAccnt) eventLoop() {
160
197
Ts : payload .ts ,
161
198
},
162
199
)
200
+ case evnt_hit_chnks :
201
+ payload := event .pl .(* HitsPayload )
202
+ for _ , chunk := range payload .chunks {
203
+ a .lru .touch (
204
+ EvictTarget {
205
+ Metric : payload .metric ,
206
+ Ts : chunk .Ts ,
207
+ },
208
+ )
209
+ }
163
210
case evnt_del_met :
164
211
payload := event .pl .(* DelMetPayload )
165
212
a .delMet (payload .metric )
@@ -228,6 +275,35 @@ func (a *FlatAccnt) add(metric schema.AMKey, ts uint32, size uint64) {
228
275
cacheSizeUsed .AddUint64 (size )
229
276
}
230
277
278
+ func (a * FlatAccnt ) addRange (metric schema.AMKey , chunks []chunk.IterGen ) {
279
+ var met * FlatAccntMet
280
+ var ok bool
281
+
282
+ if met , ok = a .metrics [metric ]; ! ok {
283
+ met = & FlatAccntMet {
284
+ total : 0 ,
285
+ chunks : make (map [uint32 ]uint64 ),
286
+ }
287
+ a .metrics [metric ] = met
288
+ cacheMetricAdd .Inc ()
289
+ }
290
+
291
+ var sizeDiff uint64
292
+
293
+ for _ , chunk := range chunks {
294
+ if _ , ok = met .chunks [chunk .Ts ]; ok {
295
+ // we already have that chunk
296
+ continue
297
+ }
298
+ size := chunk .Size ()
299
+ sizeDiff += size
300
+ met .chunks [chunk .Ts ] = size
301
+ }
302
+
303
+ met .total = met .total + sizeDiff
304
+ cacheSizeUsed .AddUint64 (sizeDiff )
305
+ }
306
+
231
307
func (a * FlatAccnt ) evict () {
232
308
var met * FlatAccntMet
233
309
var targets []uint32
0 commit comments