Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memberlist ignore old tombstones #4420

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* [CHANGE] Some files and directories created by Mimir components on local disk now have stricter permissions, and are only readable by owner, but not group or others. #4394
* [CHANGE] Compactor: compactor will no longer try to compact blocks that are already marked for deletion. Previously compactor would consider blocks marked for deletion within `-compactor.deletion-delay / 2` period as eligible for compaction. #4328
* [CHANGE] Memberlist: forward only changes, not entire original message. #4419
* [CHANGE] Memberlist: don't accept old tombstones as incoming change, and don't forward such messages to other gossip members. #4420
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this listed as a breaking change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good shout. Should be bugfix as well.

* [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262
* [ENHANCEMENT] Reduce memory used by streaming queries, particularly in ruler. #4341
* [ENHANCEMENT] Ring: allow experimental configuration of disabling of heartbeat timeouts by setting the relevant configuration value to zero. Applies to the following: #4342
Expand Down
11 changes: 11 additions & 0 deletions pkg/ring/kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,17 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
total, removed := result.RemoveTombstones(limit)
m.storeTombstones.WithLabelValues(key).Set(float64(total))
m.storeRemovedTombstones.WithLabelValues(key).Add(float64(removed))

// Remove tombstones from change too. If change turns out to be empty after this,
// we don't need to change local value either!
//
// Note that "result" and "change" may actually be the same Mergeable. That is why we
// call RemoveTombstones on "result" first, so that we get the correct metrics. Calling
// RemoveTombstones twice with same limit should be noop.
change.RemoveTombstones(limit)
if len(change.MergeContent()) == 0 {
return nil, 0, nil
}
}

newVersion := curr.version + 1
Expand Down
104 changes: 102 additions & 2 deletions pkg/ring/kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,19 @@ func (d *data) MergeContent() []string {
return out
}

func (d *data) RemoveTombstones(limit time.Time) (_, _ int) {
// nothing to do
// This method deliberately ignores zero limit, so that tests can observe LEFT state as well.
func (d *data) RemoveTombstones(limit time.Time) (total, removed int) {
for n, m := range d.Members {
if m.State == LEFT {
if time.Unix(m.Timestamp, 0).Before(limit) {
// remove it
delete(d.Members, n)
removed++
} else {
total++
}
}
}
return
}

Expand Down Expand Up @@ -1137,6 +1148,95 @@ func TestNotifyMsgResendsOnlyChanges(t *testing.T) {
}}, d)
}

func TestSendingOldTombstoneShouldNotForwardMessage(t *testing.T) {
codec := dataCodec{}

cfg := KVConfig{}
// We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor.
cfg.RetransmitMult = 1
cfg.LeftIngestersTimeout = 5 * time.Minute
cfg.Codecs = append(cfg.Codecs, codec)

kv := NewKV(cfg, log.NewNopLogger())
require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv))
defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck

client, err := NewClient(kv, codec)
require.NoError(t, err)

now := time.Now()

// No broadcast messages from KV at the beginning.
require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32)))

for _, tc := range []struct {
name string
valueBeforeSend *data // value in KV store before sending messsage
msgToSend *data
valueAfterSend *data // value in KV store after sending message
broadcastMessage *data // broadcasted change, if not nil
}{
// These tests follow each other (end state of KV in state is starting point in the next state).
{
name: "old tombstone, empty KV",
valueBeforeSend: nil,
msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() - int64(2*cfg.LeftIngestersTimeout.Seconds()), State: LEFT}}},
valueAfterSend: nil, // no change to KV
broadcastMessage: nil, // no new message
},

{
name: "recent tombstone, empty KV",
valueBeforeSend: nil,
msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT}}},
broadcastMessage: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT}}},
valueAfterSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}},
},

{
name: "old tombstone, KV contains tombstone already",
valueBeforeSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}},
msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() - 10, State: LEFT}}},
broadcastMessage: nil,
valueAfterSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}},
},

{
name: "fresh tombstone, KV contains tombstone already",
valueBeforeSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}},
msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() + 10, State: LEFT}}},
broadcastMessage: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() + 10, State: LEFT}}},
valueAfterSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() + 10, State: LEFT, Tokens: []uint32{}}}},
},
} {
t.Run(tc.name, func(t *testing.T) {
d := getData(t, client, key)
if tc.valueBeforeSend == nil {
require.True(t, d == nil || len(d.Members) == 0)
} else {
require.Equal(t, tc.valueBeforeSend, d, "valueBeforeSend")
}

kv.NotifyMsg(marshalKeyValuePair(t, key, codec, tc.msgToSend))

bs := kv.GetBroadcasts(0, math.MaxInt32)
if tc.broadcastMessage == nil {
require.Equal(t, 0, len(bs), "expected no broadcast message")
} else {
require.Equal(t, 1, len(bs), "expected broadcast message")
require.Equal(t, tc.broadcastMessage, decodeDataFromMarshalledKeyValuePair(t, bs[0], key, codec))
}

d = getData(t, client, key)
if tc.valueAfterSend == nil {
require.True(t, d == nil || len(d.Members) == 0)
} else {
require.Equal(t, tc.valueAfterSend, d, "valueAfterSend")
}
})
}
}

func decodeDataFromMarshalledKeyValuePair(t *testing.T, marshalledKVP []byte, key string, codec dataCodec) *data {
kvp := KeyValuePair{}
require.NoError(t, kvp.Unmarshal(marshalledKVP))
Expand Down