Skip to content

Commit

Permalink
pkg/ingester: check that ingester is in LEAVING state when transferri…
Browse files Browse the repository at this point in the history
…ng chunks and claiming tokens. Required when using memberlist client. (#1300)

* On chunks transfer, verify that source ingester is LEAVING

This is important when using gossiping ring. If source ingester is not
leaving yet, than claiming its tokens will fail (merge function
will not move ownership to the target ingester), and target ingester
will end up with no tokens in the ring.

* Register ring to prometheus.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Update check using latest version from Cortex.

There is no direct usage of logging, which is consistent
with the rest of this method.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Move import to group of imports with third-party packages.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
  • Loading branch information
pstibrany authored and cyriltovena committed Dec 6, 2019
1 parent 17b2ce0 commit dd96fa1
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
31 changes: 31 additions & 0 deletions pkg/ingester/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
if fromIngesterID == "" {
fromIngesterID = chunkSet.FromIngesterId
level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID)

// Before transfer, make sure 'from' ingester is in correct state to call ClaimTokensFor later
err := i.checkFromIngesterIsInLeavingState(stream.Context(), fromIngesterID)
if err != nil {
return errors.Wrap(err, "TransferChunks: checkFromIngesterIsInLeavingState")
}
}

userCtx := user.InjectOrgID(stream.Context(), chunkSet.UserId)
Expand Down Expand Up @@ -127,6 +133,31 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
return nil
}

// Ring gossiping: check if "from" ingester is in LEAVING state. It should be, but we may not see that yet
// when using gossip ring. If we cannot see ingester is the LEAVING state yet, we don't accept this
// transfer, as claiming tokens would possibly end up with this ingester owning no tokens, due to conflict
// resolution in ring merge function. Hopefully the leaving ingester will retry transfer again.
func (i *Ingester) checkFromIngesterIsInLeavingState(ctx context.Context, fromIngesterID string) error {
v, err := i.lifecycler.KVStore.Get(ctx, ring.ConsulKey)
if err != nil {
return errors.Wrap(err, "get ring")
}
if v == nil {
return fmt.Errorf("ring not found when checking state of source ingester")
}
r, ok := v.(*ring.Desc)
if !ok || r == nil {
return fmt.Errorf("ring not found, got %T", v)
}

if r.Ingesters == nil || r.Ingesters[fromIngesterID].State != ring.LEAVING {
return fmt.Errorf("source ingester is not in a LEAVING state, found state=%v", r.Ingesters[fromIngesterID].State)
}

// all fine
return nil
}

// StopIncomingRequests implements ring.Lifecycler.
func (i *Ingester) StopIncomingRequests() {
i.shutdownMtx.Lock()
Expand Down
2 changes: 2 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cortexproject/cortex/pkg/util"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
"google.golang.org/grpc/health/grpc_health_v1"
Expand Down Expand Up @@ -120,6 +121,7 @@ func (t *Loki) initRing() (err error) {
if err != nil {
return
}
prometheus.MustRegister(t.ring)
t.server.HTTP.Handle("/ring", t.ring)
return
}
Expand Down

0 comments on commit dd96fa1

Please sign in to comment.