-
Notifications
You must be signed in to change notification settings - Fork 535
/
Copy pathcompactor.go
316 lines (254 loc) · 9.97 KB
/
compactor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
package compactor
import (
"context"
"fmt"
"hash/fnv"
"time"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/modules/storage"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/util/log"
)
const (
// ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance
// in the ring will be automatically removed.
ringAutoForgetUnhealthyPeriods = 2
// We use a safe default instead of exposing to config option to the user
// in order to simplify the config.
ringNumTokens = 512
compactorRingKey = "compactor"
reasonCompactorDiscardedSpans = "trace_too_large_to_compact"
)
var (
ringOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
)
type Compactor struct {
services.Service
cfg *Config
store storage.Store
overrides *overrides.Overrides
// Ring used for sharding compactions.
ringLifecycler *ring.BasicLifecycler
Ring *ring.Ring
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
}
// New makes a new Compactor.
func New(cfg Config, store storage.Store, overrides *overrides.Overrides, reg prometheus.Registerer) (*Compactor, error) {
c := &Compactor{
cfg: &cfg,
store: store,
overrides: overrides,
}
if c.isSharded() {
reg = prometheus.WrapRegistererWithPrefix("cortex_", reg)
lifecyclerStore, err := kv.NewClient(
cfg.ShardingRing.KVStore,
ring.GetCodec(),
kv.RegistererWithKVName(reg, compactorRingKey+"-lifecycler"),
log.Logger,
)
if err != nil {
return nil, err
}
delegate := ring.BasicLifecyclerDelegate(c)
delegate = ring.NewLeaveOnStoppingDelegate(delegate, log.Logger)
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.ShardingRing.HeartbeatTimeout, delegate, log.Logger)
bcfg, err := toBasicLifecyclerConfig(cfg.ShardingRing, log.Logger)
if err != nil {
return nil, err
}
c.ringLifecycler, err = ring.NewBasicLifecycler(bcfg, compactorRingKey, cfg.OverrideRingKey, lifecyclerStore, delegate, log.Logger, reg)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize compactor ring lifecycler")
}
c.Ring, err = ring.New(c.cfg.ShardingRing.ToLifecyclerConfig().RingConfig, compactorRingKey, cfg.OverrideRingKey, log.Logger, reg)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize compactor ring")
}
}
c.Service = services.NewBasicService(c.starting, c.running, c.stopping)
return c, nil
}
func (c *Compactor) starting(ctx context.Context) (err error) {
// In case this function will return error we want to unregister the instance
// from the ring. We do it ensuring dependencies are gracefully stopped if they
// were already started.
defer func() {
if err == nil || c.subservices == nil {
return
}
if stopErr := services.StopManagerAndAwaitStopped(context.Background(), c.subservices); stopErr != nil {
level.Error(log.Logger).Log("msg", "failed to gracefully stop compactor dependencies", "err", stopErr)
}
}()
if c.isSharded() {
c.subservices, err = services.NewManager(c.ringLifecycler, c.Ring)
if err != nil {
return fmt.Errorf("failed to create subservices %w", err)
}
c.subservicesWatcher = services.NewFailureWatcher()
c.subservicesWatcher.WatchManager(c.subservices)
err := services.StartManagerAndAwaitHealthy(ctx, c.subservices)
if err != nil {
return fmt.Errorf("failed to start subservices %w", err)
}
// Wait until the ring client detected this instance in the ACTIVE state.
level.Info(log.Logger).Log("msg", "waiting until compactor is ACTIVE in the ring")
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.cfg.ShardingRing.WaitActiveInstanceTimeout)
defer cancel()
if err := ring.WaitInstanceState(ctxWithTimeout, c.Ring, c.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil {
return err
}
level.Info(log.Logger).Log("msg", "compactor is ACTIVE in the ring")
// In the event of a cluster cold start we may end up in a situation where each new compactor
// instance starts at a slightly different time and thus each one starts with a different state
// of the ring. It's better to just wait the ring stability for a short time.
if c.cfg.ShardingRing.WaitStabilityMinDuration > 0 {
minWaiting := c.cfg.ShardingRing.WaitStabilityMinDuration
maxWaiting := c.cfg.ShardingRing.WaitStabilityMaxDuration
level.Info(log.Logger).Log("msg", "waiting until compactor ring topology is stable", "min_waiting", minWaiting.String(), "max_waiting", maxWaiting.String())
if err := ring.WaitRingStability(ctx, c.Ring, ringOp, minWaiting, maxWaiting); err != nil {
level.Warn(log.Logger).Log("msg", "compactor ring topology is not stable after the max waiting time, proceeding anyway")
} else {
level.Info(log.Logger).Log("msg", "compactor ring topology is stable")
}
}
}
// this will block until one poll cycle is complete
c.store.EnablePolling(c)
return nil
}
func (c *Compactor) running(ctx context.Context) error {
level.Info(log.Logger).Log("msg", "enabling compaction")
c.store.EnableCompaction(&c.cfg.Compactor, c, c)
if c.subservices != nil {
select {
case <-ctx.Done():
return nil
case err := <-c.subservicesWatcher.Chan():
return fmt.Errorf("distributor subservices failed %w", err)
}
} else {
<-ctx.Done()
}
return nil
}
// Called after distributor is asked to stop via StopAsync.
func (c *Compactor) stopping(_ error) error {
if c.subservices != nil {
return services.StopManagerAndAwaitStopped(context.Background(), c.subservices)
}
return nil
}
// Owns implements CompactorSharder
func (c *Compactor) Owns(hash string) bool {
if !c.isSharded() {
return true
}
level.Debug(log.Logger).Log("msg", "checking hash", "hash", hash)
hasher := fnv.New32a()
_, _ = hasher.Write([]byte(hash))
hash32 := hasher.Sum32()
rs, err := c.Ring.Get(hash32, ringOp, []ring.InstanceDesc{}, nil, nil)
if err != nil {
level.Error(log.Logger).Log("msg", "failed to get ring", "err", err)
return false
}
if len(rs.Instances) != 1 {
level.Error(log.Logger).Log("msg", "unexpected number of compactors in the shard (expected 1, got %d)", len(rs.Instances))
return false
}
ringAddr := c.ringLifecycler.GetInstanceAddr()
level.Debug(log.Logger).Log("msg", "checking addresses", "owning_addr", rs.Instances[0].Addr, "this_addr", ringAddr)
return rs.Instances[0].Addr == ringAddr
}
// Combine implements tempodb.CompactorSharder
func (c *Compactor) Combine(dataEncoding string, tenantID string, objs ...[]byte) ([]byte, bool, error) {
combinedObj, wasCombined, err := model.StaticCombiner.Combine(dataEncoding, objs...)
if err != nil {
return nil, false, err
}
maxBytes := c.overrides.MaxBytesPerTrace(tenantID)
if maxBytes == 0 || len(combinedObj) < maxBytes {
return combinedObj, wasCombined, nil
}
// technically neither of these conditions should ever be true, we are adding them as guard code
// for the following logic
if len(objs) == 0 {
return []byte{}, wasCombined, nil
}
if len(objs) == 1 {
return objs[0], wasCombined, nil
}
spansDiscarded := countSpans(dataEncoding, objs[1:]...)
overrides.RecordDiscardedSpans(spansDiscarded, reasonCompactorDiscardedSpans, tenantID)
return objs[0], wasCombined, nil
}
func (c *Compactor) RecordDiscardedSpans(count int, tenantID string) {
overrides.RecordDiscardedSpans(count, reasonCompactorDiscardedSpans, tenantID)
}
// BlockRetentionForTenant implements CompactorOverrides
func (c *Compactor) BlockRetentionForTenant(tenantID string) time.Duration {
return c.overrides.BlockRetention(tenantID)
}
func (c *Compactor) MaxBytesPerTraceForTenant(tenantID string) int {
return c.overrides.MaxBytesPerTrace(tenantID)
}
func (c *Compactor) isSharded() bool {
return c.cfg.ShardingRing.KVStore.Store != ""
}
// OnRingInstanceRegister is called while the lifecycler is registering the
// instance within the ring and should return the state and set of tokens to
// use for the instance itself.
func (c *Compactor) OnRingInstanceRegister(lifecycler *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
// When we initialize the compactor instance in the ring we want to start from
// a clean situation, so whatever is the state we set it ACTIVE, while we keep existing
// tokens (if any) or the ones loaded from file.
var tokens []uint32
if instanceExists {
tokens = instanceDesc.GetTokens()
}
takenTokens := ringDesc.GetTokens()
newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens)
// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)
return ring.ACTIVE, tokens
}
// OnRingInstanceTokens is called once the instance tokens are set and are
// stable within the ring (honoring the observe period, if set).
func (c *Compactor) OnRingInstanceTokens(lifecycler *ring.BasicLifecycler, tokens ring.Tokens) {}
// OnRingInstanceStopping is called while the lifecycler is stopping. The lifecycler
// will continue to hearbeat the ring the this function is executing and will proceed
// to unregister the instance from the ring only after this function has returned.
func (c *Compactor) OnRingInstanceStopping(lifecycler *ring.BasicLifecycler) {}
// OnRingInstanceHeartbeat is called while the instance is updating its heartbeat
// in the ring.
func (c *Compactor) OnRingInstanceHeartbeat(lifecycler *ring.BasicLifecycler, ringDesc *ring.Desc, instanceDesc *ring.InstanceDesc) {
}
//
func countSpans(dataEncoding string, objs ...[]byte) int {
decoder, err := model.NewObjectDecoder(dataEncoding)
if err != nil {
return 0
}
spans := 0
for _, o := range objs {
t, err := decoder.PrepareForRead(o)
if err != nil {
continue
}
for _, b := range t.Batches {
for _, ilm := range b.InstrumentationLibrarySpans {
spans += len(ilm.Spans)
}
}
}
return spans
}