Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/ingester: check that ingester is in LEAVING state when transferring chunks and claiming tokens. Required when using memberlist client. #1300

Merged
merged 4 commits into from
Dec 6, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions pkg/ingester/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,33 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
if fromIngesterID == "" {
fromIngesterID = chunkSet.FromIngesterId
level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID)

// Ring gossiping: check if "from" ingester is in LEAVING state. It should be, but we may not see that yet.
// If ingester is not LEAVING yet, we don't accept this transfer, as claiming tokens would end up
// with this ingester owning no tokens.
// Hopefully the leaving ingester will retry again.
v, err := i.lifecycler.KVStore.Get(stream.Context(), ring.ConsulKey)
if err != nil {
util.Logger.Log("msg", "TransferChunks error", "err", err)
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
return err
}
if v == nil {
err = fmt.Errorf("ring not found?!? (got nil)")
util.Logger.Log("msg", "TransferChunks error", "err", err)
return err
}
r, ok := v.(*ring.Desc)
if !ok || r == nil {
err = fmt.Errorf("ring not found?!? (got %T instead)", v)
util.Logger.Log("msg", "TransferChunks error", "err", err)
return err
}

if r.Ingesters == nil || r.Ingesters[fromIngesterID].State != ring.LEAVING {
err = fmt.Errorf("source ingester is not in a LEAVING state, found state=%v", r.Ingesters[fromIngesterID].State)
util.Logger.Log("msg", "TransferChunks error", "err", err)
return err
}
}

userCtx := user.InjectOrgID(stream.Context(), chunkSet.UserId)
Expand Down
3 changes: 3 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/ring"
Expand Down Expand Up @@ -120,6 +122,7 @@ func (t *Loki) initRing() (err error) {
if err != nil {
return
}
prometheus.MustRegister(t.ring)
t.server.HTTP.Handle("/ring", t.ring)
return
}
Expand Down