-
Notifications
You must be signed in to change notification settings - Fork 69
/
lifecycler.go
1107 lines (923 loc) · 39.4 KB
/
lifecycler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package ring
import (
"context"
"flag"
"fmt"
"math/rand"
"net"
"net/http"
"os"
"sort"
"strconv"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/netutil"
"github.com/grafana/dskit/services"
)
// LifecyclerConfig is the config to build a Lifecycler.
type LifecyclerConfig struct {
RingConfig Config `yaml:"ring"`
// Config for the ingester lifecycle control
NumTokens int `yaml:"num_tokens" category:"advanced"`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period" category:"advanced"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout" category:"advanced"`
ObservePeriod time.Duration `yaml:"observe_period" category:"advanced"`
JoinAfter time.Duration `yaml:"join_after" category:"advanced"`
MinReadyDuration time.Duration `yaml:"min_ready_duration" category:"advanced"`
InfNames []string `yaml:"interface_names" doc:"default=[<private network interfaces>]"`
EnableInet6 bool `yaml:"enable_inet6" category:"advanced"`
// FinalSleep's default value can be overridden by
// setting it before calling RegisterFlags or RegisterFlagsWithPrefix.
FinalSleep time.Duration `yaml:"final_sleep" category:"advanced"`
TokensFilePath string `yaml:"tokens_file_path"`
Zone string `yaml:"availability_zone"`
UnregisterOnShutdown bool `yaml:"unregister_on_shutdown" category:"advanced"`
ReadinessCheckRingHealth bool `yaml:"readiness_check_ring_health" category:"advanced"`
// For testing, you can override the address and ID of this ingester
Addr string `yaml:"address" category:"advanced"`
Port int `category:"advanced"`
ID string `doc:"default=<hostname>" category:"advanced"`
// Injected internally
ListenPort int `yaml:"-"`
// If set, specifies the TokenGenerator implementation that will be used for generating tokens.
// Default value is nil, which means that RandomTokenGenerator is used.
RingTokenGenerator TokenGenerator `yaml:"-"`
}
// RegisterFlags adds the flags required to config this to the given FlagSet.
// The default values of some flags can be changed; see docs of LifecyclerConfig.
func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
cfg.RegisterFlagsWithPrefix("", f, logger)
}
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet.
// The default values of some flags can be changed; see docs of LifecyclerConfig.
func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet, logger log.Logger) {
cfg.RingConfig.RegisterFlagsWithPrefix(prefix, f)
// In order to keep backwards compatibility all of these need to be prefixed
// with "ingester."
if prefix == "" {
prefix = "ingester."
}
f.IntVar(&cfg.NumTokens, prefix+"num-tokens", 128, "Number of tokens for each ingester.")
f.DurationVar(&cfg.HeartbeatPeriod, prefix+"heartbeat-period", 5*time.Second, "Period at which to heartbeat to consul. 0 = disabled.")
f.DurationVar(&cfg.HeartbeatTimeout, prefix+"heartbeat-timeout", 1*time.Minute, "Heartbeat timeout after which instance is assumed to be unhealthy. 0 = disabled.")
f.DurationVar(&cfg.JoinAfter, prefix+"join-after", 0*time.Second, "Period to wait for a claim from another member; will join automatically after this.")
f.DurationVar(&cfg.ObservePeriod, prefix+"observe-period", 0*time.Second, "Observe tokens after generating to resolve collisions. Useful when using gossiping ring.")
f.DurationVar(&cfg.MinReadyDuration, prefix+"min-ready-duration", 15*time.Second, "Minimum duration to wait after the internal readiness checks have passed but before succeeding the readiness endpoint. This is used to slowdown deployment controllers (eg. Kubernetes) after an instance is ready and before they proceed with a rolling update, to give the rest of the cluster instances enough time to receive ring updates.")
f.DurationVar(&cfg.FinalSleep, prefix+"final-sleep", cfg.FinalSleep, "Duration to sleep for before exiting, to ensure metrics are scraped.")
f.StringVar(&cfg.TokensFilePath, prefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.")
hostname, err := os.Hostname()
if err != nil {
panic(fmt.Errorf("failed to get hostname %s", err))
}
cfg.InfNames = netutil.PrivateNetworkInterfacesWithFallback([]string{"eth0", "en0"}, logger)
f.Var((*flagext.StringSlice)(&cfg.InfNames), prefix+"lifecycler.interface", "Name of network interface to read address from.")
f.StringVar(&cfg.Addr, prefix+"lifecycler.addr", "", "IP address to advertise in the ring.")
f.IntVar(&cfg.Port, prefix+"lifecycler.port", 0, "port to advertise in consul (defaults to server.grpc-listen-port).")
f.StringVar(&cfg.ID, prefix+"lifecycler.ID", hostname, "ID to register in the ring.")
f.StringVar(&cfg.Zone, prefix+"availability-zone", "", "The availability zone where this instance is running.")
f.BoolVar(&cfg.UnregisterOnShutdown, prefix+"unregister-on-shutdown", true, "Unregister from the ring upon clean shutdown. It can be useful to disable for rolling restarts with consistent naming in conjunction with -distributor.extend-writes=false.")
f.BoolVar(&cfg.ReadinessCheckRingHealth, prefix+"readiness-check-ring-health", true, "When enabled the readiness probe succeeds only after all instances are ACTIVE and healthy in the ring, otherwise only the instance itself is checked. This option should be disabled if in your cluster multiple instances can be rolled out simultaneously, otherwise rolling updates may be slowed down.")
f.BoolVar(&cfg.EnableInet6, prefix+"enable-inet6", false, "Enable IPv6 support. Required to make use of IP addresses from IPv6 interfaces.")
}
// Validate checks the consistency of LifecyclerConfig, and fails if this cannot be achieved.
func (cfg *LifecyclerConfig) Validate() error {
_, ok := cfg.RingTokenGenerator.(*SpreadMinimizingTokenGenerator)
if ok {
// If cfg.RingTokenGenerator is a SpreadMinimizingTokenGenerator, we must ensure that
// the tokens are not loaded from file.
if cfg.TokensFilePath != "" {
return errors.New("you can't configure the tokens file path when using the spread minimizing token strategy. Please set the tokens file path to an empty string")
}
}
return nil
}
/*
Lifecycler is a Service that is responsible for publishing changes to a ring for a single instance.
- When a Lifecycler first starts, it will be in a [PENDING] state.
- After the configured [ring.LifecyclerConfig.JoinAfter] period, it selects some random tokens and enters the [JOINING] state, creating or updating the ring as needed.
- The lifecycler will then periodically, based on the [ring.LifecyclerConfig.ObservePeriod], attempt to verify that its tokens have been added to the ring, after which it will transition to the [ACTIVE] state.
- The lifecycler will update the key/value store with heartbeats, state changes, and token changes, based on the [ring.LifecyclerConfig.HeartbeatPeriod].
*/
type Lifecycler struct {
*services.BasicService
cfg LifecyclerConfig
flushTransferer FlushTransferer
KVStore kv.Client
actorChan chan func()
// These values are initialised at startup, and never change
ID string
Addr string
RingName string
RingKey string
Zone string
// Whether to flush if transfer fails on shutdown.
flushOnShutdown *atomic.Bool
unregisterOnShutdown *atomic.Bool
clearTokensOnShutdown *atomic.Bool
// We need to remember the ingester state, tokens and registered timestamp just in case the KV store
// goes away and comes back empty. The state changes during lifecycle of instance.
stateMtx sync.RWMutex
state InstanceState
tokens Tokens
registeredAt time.Time
readOnly bool
readOnlyLastUpdated time.Time
// Controls the ready-reporting
readyLock sync.Mutex
ready bool
readySince time.Time
// Keeps stats updated at every heartbeat period
countersLock sync.RWMutex
healthyInstancesCount int
instancesCount int
readOnlyInstancesCount int
healthyInstancesInZoneCount int
instancesInZoneCount int
zonesCount int
tokenGenerator TokenGenerator
// The maximum time allowed to wait on the CanJoin() condition.
// Configurable for testing purposes only.
canJoinTimeout time.Duration
lifecyclerMetrics *LifecyclerMetrics
logger log.Logger
}
// NewLifecycler creates new Lifecycler. It must be started via StartAsync.
func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringName, ringKey string, flushOnShutdown bool, logger log.Logger, reg prometheus.Registerer) (*Lifecycler, error) {
addr, err := GetInstanceAddr(cfg.Addr, cfg.InfNames, logger, cfg.EnableInet6)
if err != nil {
return nil, err
}
port := GetInstancePort(cfg.Port, cfg.ListenPort)
codec := GetCodec()
// Suffix all client names with "-lifecycler" to denote this kv client is used by the lifecycler
store, err := kv.NewClient(
cfg.RingConfig.KVStore,
codec,
kv.RegistererWithKVName(reg, ringName+"-lifecycler"),
logger,
)
if err != nil {
return nil, err
}
// We do allow a nil FlushTransferer, but to keep the ring logic easier we assume
// it's always set, so we use a noop FlushTransferer
if flushTransferer == nil {
flushTransferer = NewNoopFlushTransferer()
}
tokenGenerator := cfg.RingTokenGenerator
if tokenGenerator == nil {
tokenGenerator = NewRandomTokenGenerator()
}
// We validate cfg before we create a Lifecycler.
err = cfg.Validate()
if err != nil {
return nil, err
}
l := &Lifecycler{
cfg: cfg,
flushTransferer: flushTransferer,
KVStore: store,
Addr: net.JoinHostPort(addr, strconv.Itoa(port)),
ID: cfg.ID,
RingName: ringName,
RingKey: ringKey,
flushOnShutdown: atomic.NewBool(flushOnShutdown),
unregisterOnShutdown: atomic.NewBool(cfg.UnregisterOnShutdown),
clearTokensOnShutdown: atomic.NewBool(false),
Zone: cfg.Zone,
actorChan: make(chan func()),
state: PENDING,
tokenGenerator: tokenGenerator,
canJoinTimeout: 5 * time.Minute,
lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg),
logger: logger,
}
l.BasicService = services.
NewBasicService(nil, l.loop, l.stopping).
WithName(fmt.Sprintf("%s ring lifecycler", ringName))
return l, nil
}
// CheckReady is used to rate limit the number of ingesters that can be coming or
// going at any one time, by only returning true if all ingesters are active.
// The state latches: once we have gone ready we don't go un-ready
func (i *Lifecycler) CheckReady(ctx context.Context) error {
i.readyLock.Lock()
defer i.readyLock.Unlock()
if i.ready {
return nil
}
if err := i.checkRingHealthForReadiness(ctx); err != nil {
// Reset the min ready duration counter.
i.readySince = time.Time{}
return err
}
// Honor the min ready duration. The duration counter start after all readiness checks have
// passed.
if i.readySince.IsZero() {
i.readySince = time.Now()
}
if time.Since(i.readySince) < i.cfg.MinReadyDuration {
return fmt.Errorf("waiting for %v after being ready", i.cfg.MinReadyDuration)
}
i.ready = true
return nil
}
func (i *Lifecycler) checkRingHealthForReadiness(ctx context.Context) error {
// Ensure the instance holds some tokens.
if len(i.getTokens()) == 0 {
return fmt.Errorf("this instance owns no tokens")
}
// If ring health checking is enabled we make sure all instances in the ring are ACTIVE and healthy,
// otherwise we just check this instance.
desc, err := i.KVStore.Get(ctx, i.RingKey)
if err != nil {
level.Error(i.logger).Log("msg", "error talking to the KV store", "ring", i.RingName, "err", err)
return fmt.Errorf("error talking to the KV store: %s", err)
}
ringDesc, ok := desc.(*Desc)
if !ok || ringDesc == nil {
return fmt.Errorf("no ring returned from the KV store")
}
if i.cfg.ReadinessCheckRingHealth {
if err := ringDesc.IsReady(time.Now(), i.cfg.RingConfig.HeartbeatTimeout); err != nil {
level.Warn(i.logger).Log("msg", "found an existing instance(s) with a problem in the ring, "+
"this instance cannot become ready until this problem is resolved. "+
"The /ring http endpoint on the distributor (or single binary) provides visibility into the ring.",
"ring", i.RingName, "err", err)
return err
}
} else {
instance, ok := ringDesc.Ingesters[i.ID]
if !ok {
return fmt.Errorf("instance %s not found in the ring", i.ID)
}
if err := instance.IsReady(time.Now(), i.cfg.RingConfig.HeartbeatTimeout); err != nil {
return err
}
}
return nil
}
// GetState returns the state of this ingester.
func (i *Lifecycler) GetState() InstanceState {
i.stateMtx.RLock()
defer i.stateMtx.RUnlock()
return i.state
}
func (i *Lifecycler) setState(state InstanceState) {
i.stateMtx.Lock()
defer i.stateMtx.Unlock()
i.state = state
}
func (i *Lifecycler) sendToLifecyclerLoop(fn func()) error {
sc := i.ServiceContext()
if sc == nil {
return errors.New("lifecycler not running")
}
select {
case <-sc.Done():
return errors.New("lifecycler not running")
case i.actorChan <- fn:
return nil
}
}
// ChangeState of the ingester, for use off of the loop() goroutine.
func (i *Lifecycler) ChangeState(ctx context.Context, state InstanceState) error {
errCh := make(chan error)
fn := func() {
errCh <- i.changeState(ctx, state)
}
if err := i.sendToLifecyclerLoop(fn); err != nil {
return err
}
return <-errCh
}
func (i *Lifecycler) ChangeReadOnlyState(ctx context.Context, readOnly bool) error {
errCh := make(chan error)
fn := func() {
prevReadOnly, _ := i.GetReadOnlyState()
if prevReadOnly == readOnly {
errCh <- nil
return
}
level.Info(i.logger).Log("msg", "changing read-only state of instance in the ring", "readOnly", readOnly, "ring", i.RingName)
i.setReadOnlyState(readOnly, time.Now())
errCh <- i.updateConsul(ctx)
}
if err := i.sendToLifecyclerLoop(fn); err != nil {
return err
}
return <-errCh
}
func (i *Lifecycler) getTokens() Tokens {
i.stateMtx.RLock()
defer i.stateMtx.RUnlock()
return i.tokens
}
func (i *Lifecycler) setTokens(tokens Tokens) {
i.stateMtx.Lock()
defer i.stateMtx.Unlock()
i.tokens = tokens
if i.cfg.TokensFilePath != "" {
if err := i.tokens.StoreToFile(i.cfg.TokensFilePath); err != nil {
level.Error(i.logger).Log("msg", "error storing tokens to disk", "path", i.cfg.TokensFilePath, "err", err)
}
}
}
func (i *Lifecycler) getRegisteredAt() time.Time {
i.stateMtx.RLock()
defer i.stateMtx.RUnlock()
return i.registeredAt
}
func (i *Lifecycler) setRegisteredAt(registeredAt time.Time) {
i.stateMtx.Lock()
defer i.stateMtx.Unlock()
i.registeredAt = registeredAt
}
// GetReadOnlyState returns the read-only state of this instance -- whether instance is read-only, and when what the last
// update of read-only state (possibly zero).
func (i *Lifecycler) GetReadOnlyState() (bool, time.Time) {
i.stateMtx.RLock()
defer i.stateMtx.RUnlock()
return i.readOnly, i.readOnlyLastUpdated
}
func (i *Lifecycler) setReadOnlyState(readOnly bool, readOnlyLastUpdated time.Time) {
i.stateMtx.Lock()
defer i.stateMtx.Unlock()
i.readOnly = readOnly
i.readOnlyLastUpdated = readOnlyLastUpdated
if readOnly {
i.lifecyclerMetrics.readonly.Set(1)
} else {
i.lifecyclerMetrics.readonly.Set(0)
}
}
// ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester.
//
// For this method to work correctly (especially when using gossiping), source ingester (specified by
// ingesterID) must be in the LEAVING state, otherwise ring's merge function may detect token conflict and
// assign token to the wrong ingester. While we could check for that state here, when this method is called,
// transfers have already finished -- it's better to check for this *before* transfers start.
func (i *Lifecycler) ClaimTokensFor(ctx context.Context, ingesterID string) error {
errCh := make(chan error)
fn := func() {
var tokens Tokens
claimTokens := func(in interface{}) (out interface{}, retry bool, err error) {
ringDesc, ok := in.(*Desc)
if !ok || ringDesc == nil {
return nil, false, fmt.Errorf("Cannot claim tokens in an empty ring")
}
tokens = ringDesc.ClaimTokens(ingesterID, i.ID)
// update timestamp to give gossiping client a chance register ring change.
ing := ringDesc.Ingesters[i.ID]
ing.Timestamp = time.Now().Unix()
// Tokens of the leaving ingester may have been generated by an older version which
// doesn't guarantee sorted tokens, so we enforce sorting here.
sort.Sort(tokens)
ing.Tokens = tokens
ringDesc.Ingesters[i.ID] = ing
return ringDesc, true, nil
}
if err := i.KVStore.CAS(ctx, i.RingKey, claimTokens); err != nil {
level.Error(i.logger).Log("msg", "Failed to write to the KV store", "ring", i.RingName, "err", err)
}
i.setTokens(tokens)
errCh <- nil
}
if err := i.sendToLifecyclerLoop(fn); err != nil {
return err
}
return <-errCh
}
// HealthyInstancesCount returns the number of healthy instances for the Write operation
// in the ring, updated during the last heartbeat period.
func (i *Lifecycler) HealthyInstancesCount() int {
i.countersLock.RLock()
defer i.countersLock.RUnlock()
return i.healthyInstancesCount
}
// InstancesCount returns the total number of instances in the ring, updated during the last heartbeat period.
func (i *Lifecycler) InstancesCount() int {
i.countersLock.RLock()
defer i.countersLock.RUnlock()
return i.instancesCount
}
// ReadOnlyInstancesCount returns the total number of instances in the ring that are read only, updated during the last heartbeat period.
func (i *Lifecycler) ReadOnlyInstancesCount() int {
i.countersLock.RLock()
defer i.countersLock.RUnlock()
return i.readOnlyInstancesCount
}
// HealthyInstancesInZoneCount returns the number of healthy instances in the ring that are registered in
// this lifecycler's zone, updated during the last heartbeat period.
func (i *Lifecycler) HealthyInstancesInZoneCount() int {
i.countersLock.RLock()
defer i.countersLock.RUnlock()
return i.healthyInstancesInZoneCount
}
// InstancesInZoneCount returns the number of instances in the ring that are registered in
// this lifecycler's zone, updated during the last heartbeat period.
func (i *Lifecycler) InstancesInZoneCount() int {
i.countersLock.RLock()
defer i.countersLock.RUnlock()
return i.instancesInZoneCount
}
// ZonesCount returns the number of zones for which there's at least 1 instance registered
// in the ring.
func (i *Lifecycler) ZonesCount() int {
i.countersLock.RLock()
defer i.countersLock.RUnlock()
return i.zonesCount
}
func (i *Lifecycler) loop(ctx context.Context) error {
// First, see if we exist in the cluster, update our state to match if we do,
// and add ourselves (without tokens) if we don't.
if err := i.initRing(context.Background()); err != nil {
return errors.Wrapf(err, "failed to join the ring %s", i.RingName)
}
// We do various period tasks
autoJoinAfter := time.After(i.cfg.JoinAfter)
var observeChan <-chan time.Time
heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(i.cfg.HeartbeatPeriod)
defer heartbeatTickerStop()
for {
select {
case <-autoJoinAfter:
level.Debug(i.logger).Log("msg", "JoinAfter expired", "ring", i.RingName)
// Will only fire once, after auto join timeout. If we haven't entered "JOINING" state,
// then pick some tokens and enter ACTIVE state.
if i.GetState() == PENDING {
level.Info(i.logger).Log("msg", "auto-joining cluster after timeout", "ring", i.RingName)
if i.cfg.ObservePeriod > 0 {
// let's observe the ring. By using JOINING state, this ingester will be ignored by LEAVING
// ingesters, but we also signal that it is not fully functional yet.
if err := i.autoJoin(context.Background(), JOINING); err != nil {
return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName)
}
level.Info(i.logger).Log("msg", "observing tokens before going ACTIVE", "ring", i.RingName)
observeChan = time.After(i.cfg.ObservePeriod)
} else {
if err := i.autoJoin(context.Background(), ACTIVE); err != nil {
return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName)
}
}
}
case <-observeChan:
// if observeChan is nil, this case is ignored. We keep updating observeChan while observing the ring.
// When observing is done, observeChan is set to nil.
observeChan = nil
if s := i.GetState(); s != JOINING {
level.Error(i.logger).Log("msg", "unexpected state while observing tokens", "state", s, "ring", i.RingName)
}
if i.verifyTokens(context.Background()) {
level.Info(i.logger).Log("msg", "token verification successful", "ring", i.RingName)
err := i.changeState(context.Background(), ACTIVE)
if err != nil {
level.Error(i.logger).Log("msg", "failed to set state to ACTIVE", "ring", i.RingName, "err", err)
}
} else {
level.Info(i.logger).Log("msg", "token verification failed, observing", "ring", i.RingName)
// keep observing
observeChan = time.After(i.cfg.ObservePeriod)
}
case <-heartbeatTickerChan:
i.lifecyclerMetrics.consulHeartbeats.Inc()
if err := i.updateConsul(context.Background()); err != nil {
level.Error(i.logger).Log("msg", "failed to write to the KV store, sleeping", "ring", i.RingName, "err", err)
}
case f := <-i.actorChan:
f()
case <-ctx.Done():
level.Info(i.logger).Log("msg", "lifecycler loop() exited gracefully", "ring", i.RingName)
return nil
}
}
}
// Shutdown the lifecycle. It will:
// - send chunks to another ingester, if it can.
// - otherwise, flush chunks to the chunk store.
// - remove config from Consul.
func (i *Lifecycler) stopping(runningError error) error {
if runningError != nil {
// previously lifecycler just called os.Exit (from loop method)...
// now it stops more gracefully, but also without doing any cleanup
return nil
}
heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(i.cfg.HeartbeatPeriod)
defer heartbeatTickerStop()
// Mark ourselved as Leaving so no more samples are send to us.
err := i.changeState(context.Background(), LEAVING)
if err != nil {
level.Error(i.logger).Log("msg", "failed to set state to LEAVING", "ring", i.RingName, "err", err)
}
// Do the transferring / flushing on a background goroutine so we can continue
// to heartbeat to consul.
done := make(chan struct{})
go func() {
i.processShutdown(context.Background())
close(done)
}()
heartbeatLoop:
for {
select {
case <-heartbeatTickerChan:
i.lifecyclerMetrics.consulHeartbeats.Inc()
if err := i.updateConsul(context.Background()); err != nil {
level.Error(i.logger).Log("msg", "failed to write to the KV store, sleeping", "ring", i.RingName, "err", err)
}
case <-done:
break heartbeatLoop
}
}
if i.ShouldUnregisterOnShutdown() {
if err := i.unregister(context.Background()); err != nil {
return errors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName)
}
level.Info(i.logger).Log("msg", "instance removed from the KV store", "ring", i.RingName)
}
if i.cfg.TokensFilePath != "" && i.ClearTokensOnShutdown() {
if err := os.Remove(i.cfg.TokensFilePath); err != nil {
return errors.Wrapf(err, "failed to delete tokens file %s", i.cfg.TokensFilePath)
}
level.Info(i.logger).Log("msg", "removed tokens file from disk", "path", i.cfg.TokensFilePath)
}
return nil
}
// initRing is the first thing we do when we start. It:
// - adds an ingester entry to the ring
// - copies out our state and tokens if they exist
func (i *Lifecycler) initRing(ctx context.Context) error {
var (
ringDesc *Desc
tokensFromFile Tokens
err error
)
if i.cfg.TokensFilePath != "" {
tokensFromFile, err = LoadTokensFromFile(i.cfg.TokensFilePath)
if err != nil && !os.IsNotExist(err) {
level.Error(i.logger).Log("msg", "error loading tokens from file", "err", err)
}
} else {
level.Info(i.logger).Log("msg", "not loading tokens from file, tokens file path is empty")
}
err = i.KVStore.CAS(ctx, i.RingKey, func(in interface{}) (out interface{}, retry bool, err error) {
ringDesc = GetOrCreateRingDesc(in)
instanceDesc, ok := ringDesc.Ingesters[i.ID]
if !ok {
now := time.Now()
// The instance doesn't exist in the ring, so it's safe to set the registered timestamp as of now.
i.setRegisteredAt(now)
// Clear read-only state, and set last update time to "zero".
i.setReadOnlyState(false, time.Time{})
// We use the tokens from the file only if it does not exist in the ring yet.
if len(tokensFromFile) > 0 {
level.Info(i.logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile))
if len(tokensFromFile) >= i.cfg.NumTokens {
i.setState(ACTIVE)
}
ro, rots := i.GetReadOnlyState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), i.getRegisteredAt(), ro, rots)
i.setTokens(tokensFromFile)
return ringDesc, true, nil
}
// Either we are a new ingester, or consul must have restarted
level.Info(i.logger).Log("msg", "instance not found in ring, adding with no tokens", "ring", i.RingName)
ro, rots := i.GetReadOnlyState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, []uint32{}, i.GetState(), i.getRegisteredAt(), ro, rots)
return ringDesc, true, nil
}
// The instance already exists in the ring, so we can't change the registered timestamp (even if it's zero)
// but we need to update the local state accordingly.
i.setRegisteredAt(instanceDesc.GetRegisteredAt())
// Set lifecycler read-only state from ring entry. We will not modify ring entry's read-only state.
i.setReadOnlyState(instanceDesc.GetReadOnlyState())
// If the ingester is in the JOINING state this means it crashed due to
// a failed token transfer or some other reason during startup. We want
// to set it back to PENDING in order to start the lifecycle from the
// beginning.
if instanceDesc.State == JOINING {
level.Warn(i.logger).Log("msg", "instance found in ring as JOINING, setting to PENDING",
"ring", i.RingName)
instanceDesc.State = PENDING
return ringDesc, true, nil
}
tokens := Tokens(instanceDesc.Tokens)
ro, rots := instanceDesc.GetReadOnlyState()
level.Info(i.logger).Log("msg", "existing instance found in ring", "state", instanceDesc.State, "tokens", len(tokens), "ring", i.RingName, "readOnly", ro, "readOnlyStateUpdate", rots)
// If the ingester fails to clean its ring entry up or unregister_on_shutdown=false, it can leave behind its
// ring state as LEAVING. Make sure to switch to the ACTIVE state.
if instanceDesc.State == LEAVING {
delta := i.cfg.NumTokens - len(tokens)
if delta > 0 {
// We need more tokens
level.Info(i.logger).Log("msg", "existing instance has too few tokens, adding difference",
"current_tokens", len(tokens), "desired_tokens", i.cfg.NumTokens)
newTokens := i.tokenGenerator.GenerateTokens(delta, ringDesc.GetTokens())
tokens = append(tokens, newTokens...)
sort.Sort(tokens)
} else if delta < 0 {
// We have too many tokens
level.Info(i.logger).Log("msg", "existing instance has too many tokens, removing difference",
"current_tokens", len(tokens), "desired_tokens", i.cfg.NumTokens)
// Make sure we don't pick the N smallest tokens, since that would increase the chance of the instance receiving only smaller hashes.
rand.Shuffle(len(tokens), tokens.Swap)
tokens = tokens[0:i.cfg.NumTokens]
sort.Sort(tokens)
}
instanceDesc.State = ACTIVE
instanceDesc.Tokens = tokens
}
// Set the local state based on the updated instance.
i.setState(instanceDesc.State)
i.setTokens(tokens)
// We're taking over this entry, update instanceDesc with our values
instanceDesc.Id = i.ID
instanceDesc.Addr = i.Addr
instanceDesc.Zone = i.Zone
// Update the ring if the instance has been changed. We don't want to rely on heartbeat update, as heartbeat
// can be configured to long time, and until then lifecycler would not report this instance as ready in CheckReady.
if !instanceDesc.Equal(ringDesc.Ingesters[i.ID]) {
// Update timestamp to give gossiping client a chance register ring change.
instanceDesc.Timestamp = time.Now().Unix()
ringDesc.Ingesters[i.ID] = instanceDesc
return ringDesc, true, nil
}
// we haven't modified the ring, don't try to store it.
return nil, true, nil
})
// Update counters
if err == nil {
i.updateCounters(ringDesc)
}
return err
}
// Verifies that tokens that this ingester has registered to the ring still belong to it.
// Gossiping ring may change the ownership of tokens in case of conflicts.
// If ingester doesn't own its tokens anymore, this method generates new tokens and puts them to the ring.
func (i *Lifecycler) verifyTokens(ctx context.Context) bool {
result := false
err := i.KVStore.CAS(ctx, i.RingKey, func(in interface{}) (out interface{}, retry bool, err error) {
ringDesc := GetOrCreateRingDesc(in)
// At this point, we should have the same tokens as we have registered before
ringTokens, takenTokens := ringDesc.TokensFor(i.ID)
if !i.compareTokens(ringTokens) {
// uh, oh... our tokens are not ours anymore. Let's try new ones.
needTokens := i.cfg.NumTokens - len(ringTokens)
level.Info(i.logger).Log("msg", "generating new tokens", "count", needTokens, "ring", i.RingName)
newTokens := i.tokenGenerator.GenerateTokens(needTokens, takenTokens)
ringTokens = append(ringTokens, newTokens...)
sort.Sort(ringTokens)
ro, rots := i.GetReadOnlyState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, ringTokens, i.GetState(), i.getRegisteredAt(), ro, rots)
i.setTokens(ringTokens)
return ringDesc, true, nil
}
// all is good, this ingester owns its tokens
result = true
return nil, true, nil
})
if err != nil {
level.Error(i.logger).Log("msg", "failed to verify tokens", "ring", i.RingName, "err", err)
return false
}
return result
}
func (i *Lifecycler) compareTokens(fromRing Tokens) bool {
sort.Sort(fromRing)
tokens := i.getTokens()
sort.Sort(tokens)
if len(tokens) != len(fromRing) {
return false
}
for i := 0; i < len(tokens); i++ {
if tokens[i] != fromRing[i] {
return false
}
}
return true
}
func (i *Lifecycler) waitBeforeJoining(ctx context.Context) error {
if !i.tokenGenerator.CanJoinEnabled() {
return nil
}
level.Info(i.logger).Log("msg", "waiting to be able to join the ring", "ring", i.RingName, "id", i.cfg.ID, "timeout", i.canJoinTimeout)
ctxWithTimeout, cancel := context.WithTimeout(ctx, i.canJoinTimeout)
defer cancel()
retries := backoff.New(ctxWithTimeout, backoff.Config{
MinBackoff: 1 * time.Second,
MaxBackoff: 1 * time.Second,
MaxRetries: 0,
})
var lastError error
for ; retries.Ongoing(); retries.Wait() {
var desc interface{}
desc, lastError = i.KVStore.Get(ctxWithTimeout, i.RingKey)
if lastError != nil {
lastError = errors.Wrap(lastError, "error getting the ring from the KV store")
continue
}
ringDesc, ok := desc.(*Desc)
if !ok || ringDesc == nil {
lastError = fmt.Errorf("no ring returned from the KV store")
continue
}
lastError = i.tokenGenerator.CanJoin(ringDesc.GetIngesters())
if lastError == nil {
level.Info(i.logger).Log("msg", "it is now possible to join the ring", "ring", i.RingName, "id", i.cfg.ID, "retries", retries.NumRetries())
return nil
}
}
if lastError == nil {
lastError = retries.Err()
}
level.Warn(i.logger).Log("msg", "there was a problem while checking whether this instance could join the ring - will continue anyway", "ring", i.RingName, "id", i.cfg.ID, "err", lastError)
// Return error only in case the parent context has been cancelled.
// In all other cases, we just want to swallow the error and move on.
return ctx.Err()
}
// autoJoin selects random tokens & moves state to targetState
func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) error {
err := i.waitBeforeJoining(ctx)
if err != nil {
return err
}
var ringDesc *Desc
err = i.KVStore.CAS(ctx, i.RingKey, func(in interface{}) (out interface{}, retry bool, err error) {
ringDesc = GetOrCreateRingDesc(in)
// At this point, we should not have any tokens, and we should be in PENDING state.
myTokens, takenTokens := ringDesc.TokensFor(i.ID)
if len(myTokens) > 0 {
level.Error(i.logger).Log("msg", "tokens already exist for this instance - wasn't expecting any!", "num_tokens", len(myTokens), "ring", i.RingName)
}
newTokens := i.tokenGenerator.GenerateTokens(i.cfg.NumTokens-len(myTokens), takenTokens)
i.setState(targetState)
myTokens = append(myTokens, newTokens...)
sort.Sort(myTokens)
i.setTokens(myTokens)
ro, rots := i.GetReadOnlyState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt(), ro, rots)
return ringDesc, true, nil
})
// Update counters
if err == nil {
i.updateCounters(ringDesc)
}
return err
}
// updateConsul updates our entries in consul, heartbeating and dealing with
// consul restarts.
func (i *Lifecycler) updateConsul(ctx context.Context) error {
var ringDesc *Desc
err := i.KVStore.CAS(ctx, i.RingKey, func(in interface{}) (out interface{}, retry bool, err error) {
ringDesc = GetOrCreateRingDesc(in)
var tokens Tokens
instanceDesc, exists := ringDesc.Ingesters[i.ID]
if !exists {
// If the instance is missing in the ring, we need to add it back. However, due to how shuffle sharding work,
// the missing instance for some period of time could have cause a resharding of tenants among instances:
// to guarantee query correctness we need to update the registration timestamp to current time.
level.Info(i.logger).Log("msg", "instance is missing in the ring (e.g. the ring backend storage has been reset), registering the instance with an updated registration timestamp", "ring", i.RingName)
i.setRegisteredAt(time.Now())
tokens = i.getTokens()
} else {
tokens = instanceDesc.Tokens
}
ro, rots := i.GetReadOnlyState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokens, i.GetState(), i.getRegisteredAt(), ro, rots)
return ringDesc, true, nil
})
// Update counters
if err == nil {
i.updateCounters(ringDesc)
}
return err
}
// changeState updates consul with state transitions for us. NB this must be
// called from loop()! Use ChangeState for calls from outside of loop().
func (i *Lifecycler) changeState(ctx context.Context, state InstanceState) error {
currState := i.GetState()
// Only the following state transitions can be triggered externally
if !((currState == PENDING && state == JOINING) || // triggered by TransferChunks at the beginning
(currState == JOINING && state == PENDING) || // triggered by TransferChunks on failure
(currState == JOINING && state == ACTIVE) || // triggered by TransferChunks on success
(currState == PENDING && state == ACTIVE) || // triggered by autoJoin
(currState == ACTIVE && state == LEAVING)) { // triggered by shutdown
return fmt.Errorf("Changing instance state from %v -> %v is disallowed", currState, state)
}
level.Info(i.logger).Log("msg", "changing instance state from", "old_state", currState, "new_state", state, "ring", i.RingName)
i.setState(state)
return i.updateConsul(ctx)
}
func (i *Lifecycler) updateCounters(ringDesc *Desc) {
healthyInstancesCount := 0
instancesCount := 0
readOnlyInstancesCount := 0
zones := map[string]int{}
healthyInstancesInZone := map[string]int{}
if ringDesc != nil {
now := time.Now()
for _, ingester := range ringDesc.Ingesters {
zones[ingester.Zone]++
instancesCount++
if ingester.ReadOnly {
readOnlyInstancesCount++
}
// Count the number of healthy instances for Write operation.
if ingester.IsHealthy(Write, i.cfg.RingConfig.HeartbeatTimeout, now) {