From dd96fa146c1fc0a7ef3327e495a1754c8bf9da0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Fri, 6 Dec 2019 15:30:32 +0100 Subject: [PATCH] pkg/ingester: check that ingester is in LEAVING state when transferring chunks and claiming tokens. Required when using memberlist client. (#1300) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * On chunks transfer, verify that source ingester is LEAVING This is important when using gossiping ring. If source ingester is not leaving yet, than claiming its tokens will fail (merge function will not move ownership to the target ingester), and target ingester will end up with no tokens in the ring. * Register ring to prometheus. Signed-off-by: Peter Štibraný * Update check using latest version from Cortex. There is no direct usage of logging, which is consistent with the rest of this method. Signed-off-by: Peter Štibraný * Move import to group of imports with third-party packages. Signed-off-by: Peter Štibraný --- pkg/ingester/transfer.go | 31 +++++++++++++++++++++++++++++++ pkg/loki/modules.go | 2 ++ 2 files changed, 33 insertions(+) diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 2c4093984b86..f9da369fe7da 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -81,6 +81,12 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) if fromIngesterID == "" { fromIngesterID = chunkSet.FromIngesterId level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID) + + // Before transfer, make sure 'from' ingester is in correct state to call ClaimTokensFor later + err := i.checkFromIngesterIsInLeavingState(stream.Context(), fromIngesterID) + if err != nil { + return errors.Wrap(err, "TransferChunks: checkFromIngesterIsInLeavingState") + } } userCtx := user.InjectOrgID(stream.Context(), chunkSet.UserId) @@ -127,6 +133,31 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) return nil } +// Ring gossiping: check if "from" ingester is in LEAVING state. It should be, but we may not see that yet +// when using gossip ring. If we cannot see ingester is the LEAVING state yet, we don't accept this +// transfer, as claiming tokens would possibly end up with this ingester owning no tokens, due to conflict +// resolution in ring merge function. Hopefully the leaving ingester will retry transfer again. +func (i *Ingester) checkFromIngesterIsInLeavingState(ctx context.Context, fromIngesterID string) error { + v, err := i.lifecycler.KVStore.Get(ctx, ring.ConsulKey) + if err != nil { + return errors.Wrap(err, "get ring") + } + if v == nil { + return fmt.Errorf("ring not found when checking state of source ingester") + } + r, ok := v.(*ring.Desc) + if !ok || r == nil { + return fmt.Errorf("ring not found, got %T", v) + } + + if r.Ingesters == nil || r.Ingesters[fromIngesterID].State != ring.LEAVING { + return fmt.Errorf("source ingester is not in a LEAVING state, found state=%v", r.Ingesters[fromIngesterID].State) + } + + // all fine + return nil +} + // StopIncomingRequests implements ring.Lifecycler. func (i *Ingester) StopIncomingRequests() { i.shutdownMtx.Lock() diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index cee38c635a49..e717e27617d2 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -13,6 +13,7 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" "google.golang.org/grpc/health/grpc_health_v1" @@ -120,6 +121,7 @@ func (t *Loki) initRing() (err error) { if err != nil { return } + prometheus.MustRegister(t.ring) t.server.HTTP.Handle("/ring", t.ring) return }