From e8e75b31e1a002296624d41134aa3b2e2fe66719 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Fri, 4 Oct 2019 14:26:36 +0200 Subject: [PATCH 1/4] On chunks transfer, verify that source ingester is LEAVING This is important when using gossiping ring. If source ingester is not leaving yet, than claiming its tokens will fail (merge function will not move ownership to the target ingester), and target ingester will end up with no tokens in the ring. --- pkg/ingester/transfer.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 7b9a66370feb..825416f08503 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -79,6 +79,33 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) if fromIngesterID == "" { fromIngesterID = chunkSet.FromIngesterId level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID) + + // Ring gossiping: check if "from" ingester is in LEAVING state. It should be, but we may not see that yet. + // If ingester is not LEAVING yet, we don't accept this transfer, as claiming tokens would end up + // with this ingester owning no tokens. + // Hopefully the leaving ingester will retry again. + v, err := i.lifecycler.KVStore.Get(stream.Context(), ring.ConsulKey) + if err != nil { + util.Logger.Log("msg", "TransferChunks error", "err", err) + return err + } + if v == nil { + err = fmt.Errorf("ring not found?!? (got nil)") + util.Logger.Log("msg", "TransferChunks error", "err", err) + return err + } + r, ok := v.(*ring.Desc) + if !ok || r == nil { + err = fmt.Errorf("ring not found?!? (got %T instead)", v) + util.Logger.Log("msg", "TransferChunks error", "err", err) + return err + } + + if r.Ingesters == nil || r.Ingesters[fromIngesterID].State != ring.LEAVING { + err = fmt.Errorf("source ingester is not in a LEAVING state, found state=%v", r.Ingesters[fromIngesterID].State) + util.Logger.Log("msg", "TransferChunks error", "err", err) + return err + } } userCtx := user.InjectOrgID(stream.Context(), chunkSet.UserId) From 764e8f8dbe0b1622668649f88782e27f5f4494be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 22 Oct 2019 09:48:37 +0200 Subject: [PATCH 2/4] Register ring to prometheus. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/loki/modules.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index cee38c635a49..8f404b0c5cfa 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -7,6 +7,8 @@ import ( "strings" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/storage" "github.com/cortexproject/cortex/pkg/ring" @@ -120,6 +122,7 @@ func (t *Loki) initRing() (err error) { if err != nil { return } + prometheus.MustRegister(t.ring) t.server.HTTP.Handle("/ring", t.ring) return } From 0f77dab8bb9c246c6360bd86b032801bb86f9608 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 28 Nov 2019 19:04:27 +0100 Subject: [PATCH 3/4] Update check using latest version from Cortex. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There is no direct usage of logging, which is consistent with the rest of this method. Signed-off-by: Peter Štibraný --- pkg/ingester/transfer.go | 52 +++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 825416f08503..6491ca30794e 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -80,31 +80,10 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) fromIngesterID = chunkSet.FromIngesterId level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID) - // Ring gossiping: check if "from" ingester is in LEAVING state. It should be, but we may not see that yet. - // If ingester is not LEAVING yet, we don't accept this transfer, as claiming tokens would end up - // with this ingester owning no tokens. - // Hopefully the leaving ingester will retry again. - v, err := i.lifecycler.KVStore.Get(stream.Context(), ring.ConsulKey) + // Before transfer, make sure 'from' ingester is in correct state to call ClaimTokensFor later + err := i.checkFromIngesterIsInLeavingState(stream.Context(), fromIngesterID) if err != nil { - util.Logger.Log("msg", "TransferChunks error", "err", err) - return err - } - if v == nil { - err = fmt.Errorf("ring not found?!? (got nil)") - util.Logger.Log("msg", "TransferChunks error", "err", err) - return err - } - r, ok := v.(*ring.Desc) - if !ok || r == nil { - err = fmt.Errorf("ring not found?!? (got %T instead)", v) - util.Logger.Log("msg", "TransferChunks error", "err", err) - return err - } - - if r.Ingesters == nil || r.Ingesters[fromIngesterID].State != ring.LEAVING { - err = fmt.Errorf("source ingester is not in a LEAVING state, found state=%v", r.Ingesters[fromIngesterID].State) - util.Logger.Log("msg", "TransferChunks error", "err", err) - return err + return errors.Wrap(err, "TransferChunks: checkFromIngesterIsInLeavingState") } } @@ -152,6 +131,31 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) return nil } +// Ring gossiping: check if "from" ingester is in LEAVING state. It should be, but we may not see that yet +// when using gossip ring. If we cannot see ingester is the LEAVING state yet, we don't accept this +// transfer, as claiming tokens would possibly end up with this ingester owning no tokens, due to conflict +// resolution in ring merge function. Hopefully the leaving ingester will retry transfer again. +func (i *Ingester) checkFromIngesterIsInLeavingState(ctx context.Context, fromIngesterID string) error { + v, err := i.lifecycler.KVStore.Get(ctx, ring.ConsulKey) + if err != nil { + return errors.Wrap(err, "get ring") + } + if v == nil { + return fmt.Errorf("ring not found when checking state of source ingester") + } + r, ok := v.(*ring.Desc) + if !ok || r == nil { + return fmt.Errorf("ring not found, got %T", v) + } + + if r.Ingesters == nil || r.Ingesters[fromIngesterID].State != ring.LEAVING { + return fmt.Errorf("source ingester is not in a LEAVING state, found state=%v", r.Ingesters[fromIngesterID].State) + } + + // all fine + return nil +} + // StopIncomingRequests implements ring.Lifecycler. func (i *Ingester) StopIncomingRequests() { i.shutdownMtx.Lock() From 89ccb691e42c38d257206db9f687f98bc149bee5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 28 Nov 2019 19:06:14 +0100 Subject: [PATCH 4/4] Move import to group of imports with third-party packages. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/loki/modules.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 8f404b0c5cfa..e717e27617d2 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -7,14 +7,13 @@ import ( "strings" "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/storage" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" "google.golang.org/grpc/health/grpc_health_v1"