diff --git a/CHANGELOG.md b/CHANGELOG.md index f9cb11448..05f190c84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -236,6 +236,7 @@ * [ENHANCEMENT] Cache: Add `.Advance()` methods to mock cache clients for easier testing of TTLs. #601 * [ENHANCEMENT] Memberlist: Add concurrency to the transport's WriteTo method. #525 * [ENHANCEMENT] Memberlist: Notifications can now be processed once per interval specified by `-memberlist.notify-interval` to reduce notify storms in large clusters. #592 +* [ENHANCEMENT] Memberlist: Implemented the `Delete` operation in the memberlist backed KV store. How frequently deleted entries are cleaned up is specified by the `-memberlist.obsolete-entries-timeout` flag. #612 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 * [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109 diff --git a/kv/memberlist/kv.pb.go b/kv/memberlist/kv.pb.go index 4c2eb9265..2080e9789 100644 --- a/kv/memberlist/kv.pb.go +++ b/kv/memberlist/kv.pb.go @@ -76,6 +76,10 @@ type KeyValuePair struct { Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` // ID of the codec used to write the value Codec string `protobuf:"bytes,3,opt,name=codec,proto3" json:"codec,omitempty"` + // Is this Key marked for deletion? + Deleted bool `protobuf:"varint,4,opt,name=deleted,proto3" json:"deleted,omitempty"` + // When was the key last updated? + UpdateTimeMillis int64 `protobuf:"varint,5,opt,name=update_time_millis,json=updateTimeMillis,proto3" json:"update_time_millis,omitempty"` } func (m *KeyValuePair) Reset() { *m = KeyValuePair{} } @@ -131,6 +135,20 @@ func (m *KeyValuePair) GetCodec() string { return "" } +func (m *KeyValuePair) GetDeleted() bool { + if m != nil { + return m.Deleted + } + return false +} + +func (m *KeyValuePair) GetUpdateTimeMillis() int64 { + if m != nil { + return m.UpdateTimeMillis + } + return 0 +} + func init() { proto.RegisterType((*KeyValueStore)(nil), "memberlist.KeyValueStore") proto.RegisterType((*KeyValuePair)(nil), "memberlist.KeyValuePair") @@ -139,22 +157,25 @@ func init() { func init() { proto.RegisterFile("kv.proto", fileDescriptor_2216fe83c9c12408) } var fileDescriptor_2216fe83c9c12408 = []byte{ - // 236 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xc8, 0x2e, 0xd3, 0x2b, - 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0xca, 0x4d, 0xcd, 0x4d, 0x4a, 0x2d, 0xca, 0xc9, 0x2c, 0x2e, - 0x91, 0xd2, 0x4d, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0xcf, 0x4f, - 0xcf, 0xd7, 0x07, 0x2b, 0x49, 0x2a, 0x4d, 0x03, 0xf3, 0xc0, 0x1c, 0x30, 0x0b, 0xa2, 0x55, 0xc9, - 0x9e, 0x8b, 0xd7, 0x3b, 0xb5, 0x32, 0x2c, 0x31, 0xa7, 0x34, 0x35, 0xb8, 0x24, 0xbf, 0x28, 0x55, - 0x48, 0x8f, 0x8b, 0xb5, 0x20, 0x31, 0xb3, 0xa8, 0x58, 0x82, 0x51, 0x81, 0x59, 0x83, 0xdb, 0x48, - 0x42, 0x0f, 0x61, 0xb6, 0x1e, 0x4c, 0x65, 0x40, 0x62, 0x66, 0x51, 0x10, 0x44, 0x99, 0x92, 0x0f, - 0x17, 0x0f, 0xb2, 0xb0, 0x90, 0x00, 0x17, 0x73, 0x76, 0x6a, 0xa5, 0x04, 0xa3, 0x02, 0xa3, 0x06, - 0x67, 0x10, 0x88, 0x29, 0x24, 0xc2, 0xc5, 0x5a, 0x06, 0x92, 0x96, 0x60, 0x52, 0x60, 0xd4, 0xe0, - 0x09, 0x82, 0x70, 0x40, 0xa2, 0xc9, 0xf9, 0x29, 0xa9, 0xc9, 0x12, 0xcc, 0x60, 0x95, 0x10, 0x8e, - 0x93, 0xc9, 0x85, 0x87, 0x72, 0x0c, 0x37, 0x1e, 0xca, 0x31, 0x7c, 0x78, 0x28, 0xc7, 0xd8, 0xf0, - 0x48, 0x8e, 0x71, 0xc5, 0x23, 0x39, 0xc6, 0x13, 0x8f, 0xe4, 0x18, 0x2f, 0x3c, 0x92, 0x63, 0x7c, - 0xf0, 0x48, 0x8e, 0xf1, 0xc5, 0x23, 0x39, 0x86, 0x0f, 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, - 0xb8, 0xf0, 0x58, 0x8e, 0xe1, 0xc6, 0x63, 0x39, 0x86, 0x24, 0x36, 0xb0, 0x5f, 0x8c, 0x01, 0x01, - 0x00, 0x00, 0xff, 0xff, 0x7a, 0x22, 0xdf, 0xec, 0x12, 0x01, 0x00, 0x00, + // 288 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x4c, 0x90, 0x31, 0x4b, 0xc3, 0x40, + 0x18, 0x86, 0xef, 0x33, 0x56, 0xeb, 0x59, 0xa1, 0x1c, 0x0e, 0x87, 0xc3, 0x47, 0xe8, 0x94, 0x41, + 0x53, 0x50, 0x77, 0xc1, 0x55, 0x04, 0x89, 0xe2, 0x5a, 0x92, 0xe6, 0xb3, 0x1e, 0xcd, 0x71, 0x25, + 0xbd, 0x14, 0xba, 0xf9, 0x13, 0x1c, 0xfc, 0x11, 0xfe, 0x14, 0xc7, 0x8e, 0x1d, 0xed, 0x65, 0x71, + 0xec, 0x4f, 0x90, 0x5c, 0x28, 0xba, 0xbd, 0xcf, 0xfb, 0x3e, 0x77, 0xc3, 0xc7, 0xbb, 0xd3, 0x45, + 0x3c, 0x2b, 0x8d, 0x35, 0x82, 0x6b, 0xd2, 0x19, 0x95, 0x85, 0x9a, 0xdb, 0xb3, 0x8b, 0x89, 0xb2, + 0xaf, 0x55, 0x16, 0x8f, 0x8d, 0x1e, 0x4e, 0xcc, 0xc4, 0x0c, 0xbd, 0x92, 0x55, 0x2f, 0x9e, 0x3c, + 0xf8, 0xd4, 0x3e, 0x1d, 0xdc, 0xf0, 0x93, 0x3b, 0x5a, 0x3e, 0xa7, 0x45, 0x45, 0x8f, 0xd6, 0x94, + 0x24, 0x62, 0xde, 0x99, 0xa5, 0xaa, 0x9c, 0x4b, 0x08, 0x83, 0xe8, 0xf8, 0x52, 0xc6, 0x7f, 0x7f, + 0xc7, 0x3b, 0xf3, 0x21, 0x55, 0x65, 0xd2, 0x6a, 0x83, 0x0f, 0xe0, 0xbd, 0xff, 0xbd, 0xe8, 0xf3, + 0x60, 0x4a, 0x4b, 0x09, 0x21, 0x44, 0x47, 0x49, 0x13, 0xc5, 0x29, 0xef, 0x2c, 0x9a, 0x59, 0xee, + 0x85, 0x10, 0xf5, 0x92, 0x16, 0x9a, 0x76, 0x6c, 0x72, 0x1a, 0xcb, 0xc0, 0x9b, 0x2d, 0x08, 0xc9, + 0x0f, 0x73, 0x2a, 0xc8, 0x52, 0x2e, 0xf7, 0x43, 0x88, 0xba, 0xc9, 0x0e, 0xc5, 0x39, 0x17, 0xd5, + 0x2c, 0x4f, 0x2d, 0x8d, 0xac, 0xd2, 0x34, 0xd2, 0xaa, 0x28, 0xd4, 0x5c, 0x76, 0x42, 0x88, 0x82, + 0xa4, 0xdf, 0x2e, 0x4f, 0x4a, 0xd3, 0xbd, 0xef, 0x6f, 0xaf, 0x57, 0x1b, 0x64, 0xeb, 0x0d, 0xb2, + 0xed, 0x06, 0xe1, 0xcd, 0x21, 0x7c, 0x3a, 0x84, 0x2f, 0x87, 0xb0, 0x72, 0x08, 0xdf, 0x0e, 0xe1, + 0xc7, 0x21, 0xdb, 0x3a, 0x84, 0xf7, 0x1a, 0xd9, 0xaa, 0x46, 0xb6, 0xae, 0x91, 0x65, 0x07, 0xfe, + 0x28, 0x57, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xe0, 0x1f, 0xee, 0xce, 0x5b, 0x01, 0x00, 0x00, } func (this *KeyValueStore) Equal(that interface{}) bool { @@ -214,6 +235,12 @@ func (this *KeyValuePair) Equal(that interface{}) bool { if this.Codec != that1.Codec { return false } + if this.Deleted != that1.Deleted { + return false + } + if this.UpdateTimeMillis != that1.UpdateTimeMillis { + return false + } return true } func (this *KeyValueStore) GoString() string { @@ -232,11 +259,13 @@ func (this *KeyValuePair) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 7) + s := make([]string, 0, 9) s = append(s, "&memberlist.KeyValuePair{") s = append(s, "Key: "+fmt.Sprintf("%#v", this.Key)+",\n") s = append(s, "Value: "+fmt.Sprintf("%#v", this.Value)+",\n") s = append(s, "Codec: "+fmt.Sprintf("%#v", this.Codec)+",\n") + s = append(s, "Deleted: "+fmt.Sprintf("%#v", this.Deleted)+",\n") + s = append(s, "UpdateTimeMillis: "+fmt.Sprintf("%#v", this.UpdateTimeMillis)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -305,6 +334,21 @@ func (m *KeyValuePair) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.UpdateTimeMillis != 0 { + i = encodeVarintKv(dAtA, i, uint64(m.UpdateTimeMillis)) + i-- + dAtA[i] = 0x28 + } + if m.Deleted { + i-- + if m.Deleted { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x20 + } if len(m.Codec) > 0 { i -= len(m.Codec) copy(dAtA[i:], m.Codec) @@ -373,6 +417,12 @@ func (m *KeyValuePair) Size() (n int) { if l > 0 { n += 1 + l + sovKv(uint64(l)) } + if m.Deleted { + n += 2 + } + if m.UpdateTimeMillis != 0 { + n += 1 + sovKv(uint64(m.UpdateTimeMillis)) + } return n } @@ -405,6 +455,8 @@ func (this *KeyValuePair) String() string { `Key:` + fmt.Sprintf("%v", this.Key) + `,`, `Value:` + fmt.Sprintf("%v", this.Value) + `,`, `Codec:` + fmt.Sprintf("%v", this.Codec) + `,`, + `Deleted:` + fmt.Sprintf("%v", this.Deleted) + `,`, + `UpdateTimeMillis:` + fmt.Sprintf("%v", this.UpdateTimeMillis) + `,`, `}`, }, "") return s @@ -631,6 +683,45 @@ func (m *KeyValuePair) Unmarshal(dAtA []byte) error { } m.Codec = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Deleted", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Deleted = bool(v != 0) + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field UpdateTimeMillis", wireType) + } + m.UpdateTimeMillis = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowKv + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.UpdateTimeMillis |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipKv(dAtA[iNdEx:]) diff --git a/kv/memberlist/kv.proto b/kv/memberlist/kv.proto index cc5f12463..b2e513b07 100644 --- a/kv/memberlist/kv.proto +++ b/kv/memberlist/kv.proto @@ -19,4 +19,9 @@ message KeyValuePair { // ID of the codec used to write the value string codec = 3; + + // Is this Key marked for deletion? + bool deleted = 4; + // When was the key last updated? + int64 update_time_millis = 5; } diff --git a/kv/memberlist/memberlist_client.go b/kv/memberlist/memberlist_client.go index 452798e04..571d18b34 100644 --- a/kv/memberlist/memberlist_client.go +++ b/kv/memberlist/memberlist_client.go @@ -72,8 +72,14 @@ func (c *Client) Get(ctx context.Context, key string) (interface{}, error) { } // Delete is part of kv.Client interface. -func (c *Client) Delete(_ context.Context, _ string) error { - return errors.New("memberlist does not support Delete") +func (c *Client) Delete(ctx context.Context, key string) error { + err := c.awaitKVRunningOrStopping(ctx) + if err != nil { + return err + } + + c.kv.Delete(key) + return nil } // CAS is part of kv.Client interface @@ -155,7 +161,8 @@ type KVConfig struct { RejoinInterval time.Duration `yaml:"rejoin_interval" category:"advanced"` // Remove LEFT ingesters from ring after this timeout. - LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"` + LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"` + ObsoleteEntriesTimeout time.Duration `yaml:"obsolete_entries_timeout" category:"advanced"` // Timeout used when leaving the memberlist cluster. LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"` @@ -188,6 +195,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.BoolVar(&cfg.AbortIfJoinFails, prefix+"memberlist.abort-if-join-fails", cfg.AbortIfJoinFails, "If this node fails to join memberlist cluster, abort.") f.DurationVar(&cfg.RejoinInterval, prefix+"memberlist.rejoin-interval", 0, "If not 0, how often to rejoin the cluster. Occasional rejoin can help to fix the cluster split issue, and is harmless otherwise. For example when using only few components as a seed nodes (via -memberlist.join), then it's recommended to use rejoin. If -memberlist.join points to dynamic service that resolves to all gossiping nodes (eg. Kubernetes headless service), then rejoin is not needed.") f.DurationVar(&cfg.LeftIngestersTimeout, prefix+"memberlist.left-ingesters-timeout", 5*time.Minute, "How long to keep LEFT ingesters in the ring.") + f.DurationVar(&cfg.ObsoleteEntriesTimeout, prefix+"memberlist.obsolete-entries-timeout", mlDefaults.PushPullInterval, "How long to keep obsolete entries in the KV store.") f.DurationVar(&cfg.LeaveTimeout, prefix+"memberlist.leave-timeout", 20*time.Second, "Timeout for leaving memberlist cluster.") f.DurationVar(&cfg.GossipInterval, prefix+"memberlist.gossip-interval", mlDefaults.GossipInterval, "How often to gossip.") f.IntVar(&cfg.GossipNodes, prefix+"memberlist.gossip-nodes", mlDefaults.GossipNodes, "How many nodes to gossip to.") @@ -242,7 +250,7 @@ type KV struct { gossipBroadcasts *memberlist.TransmitLimitedQueue // queue for messages that we forward from other nodes // KV Store. - storeMu sync.Mutex + storeMu sync.RWMutex store map[string]ValueDesc // Codec registry @@ -317,7 +325,7 @@ type Message struct { Changes []string // List of changes in this message (as computed by *this* node). } -// ValueDesc stores the value along with it's codec and local version. +// ValueDesc stores the value along with its codec and local version. type ValueDesc struct { // We store the decoded value here to prevent decoding the entire state for every // update we receive. Whilst the updates are small and fast to decode, @@ -330,6 +338,9 @@ type ValueDesc struct { // ID of codec used to write this value. Only used when sending full state. CodecID string + + Deleted bool + UpdateTime time.Time } func (v ValueDesc) Clone() (result ValueDesc) { @@ -344,6 +355,8 @@ type valueUpdate struct { value []byte codec codec.Codec messageSize int + deleted bool + updateTime time.Time } func (v ValueDesc) String() string { @@ -508,6 +521,14 @@ func (m *KV) running(ctx context.Context) error { tickerChan = t.C } + var obsoleteEntriesTickerChan <-chan time.Time + if m.cfg.ObsoleteEntriesTimeout > 0 { + obsoleteEntriesTicker := time.NewTicker(m.cfg.ObsoleteEntriesTimeout) + defer obsoleteEntriesTicker.Stop() + + obsoleteEntriesTickerChan = obsoleteEntriesTicker.C + } + logger := log.With(m.logger, "phase", "periodic_rejoin") for { select { @@ -521,6 +542,11 @@ func (m *KV) running(ctx context.Context) error { level.Warn(logger).Log("msg", "re-joining memberlist cluster failed", "err", err, "next_try_in", m.cfg.RejoinInterval) } + case <-obsoleteEntriesTickerChan: + // cleanupObsoleteEntries is normally called during push/pull, but if there are no other + // nodes to push/pull with, we can call it periodically to make sure we remove unused entries from memory. + m.cleanupObsoleteEntries() + case <-ctx.Done(): return nil } @@ -1005,6 +1031,20 @@ func (m *KV) notifyWatchersSync(key string) { } } +func (m *KV) Delete(key string) { + m.storeMu.Lock() + defer m.storeMu.Unlock() + + val, ok := m.store[key] + if !ok || val.Deleted { + return + } + + val.Deleted = true + val.UpdateTime = time.Now() + m.store[key] = val +} + // CAS implements Compare-And-Set/Swap operation. // // CAS expects that value returned by 'f' function implements Mergeable interface. If it doesn't, CAS fails immediately. @@ -1035,7 +1075,7 @@ outer: } } - change, newver, retry, err := m.trySingleCas(key, codec, f) + change, newver, retry, updateTime, err := m.trySingleCas(key, codec, f) if err != nil { level.Debug(m.logger).Log("msg", "CAS attempt failed", "err", err, "retry", retry) @@ -1050,7 +1090,7 @@ outer: m.casSuccesses.Inc() m.notifyWatchers(key) - m.broadcastNewValue(key, change, newver, codec, true) + m.broadcastNewValue(key, change, newver, codec, true, false, updateTime) } return nil @@ -1067,50 +1107,52 @@ outer: // returns change, error (or nil, if CAS succeeded), and whether to retry or not. // returns errNoChangeDetected if merge failed to detect change in f's output. -func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) (Mergeable, uint, bool, error) { +func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{}) (out interface{}, retry bool, err error)) (Mergeable, uint, bool, time.Time, error) { val, ver, err := m.get(key, codec) if err != nil { - return nil, 0, false, fmt.Errorf("failed to get value: %v", err) + return nil, 0, false, time.Time{}, fmt.Errorf("failed to get value: %v", err) } out, retry, err := f(val) if err != nil { - return nil, 0, retry, fmt.Errorf("fn returned error: %v", err) + return nil, 0, retry, time.Time{}, fmt.Errorf("fn returned error: %v", err) } if out == nil { // no change to be done - return nil, 0, false, nil + return nil, 0, false, time.Time{}, nil } // Don't even try incomingValue, ok := out.(Mergeable) if !ok || incomingValue == nil { - return nil, 0, retry, fmt.Errorf("invalid type: %T, expected Mergeable", out) + return nil, 0, retry, time.Time{}, fmt.Errorf("invalid type: %T, expected Mergeable", out) } // To support detection of removed items from value, we will only allow CAS operation to // succeed if version hasn't changed, i.e. state hasn't changed since running 'f'. // Supplied function may have kept a reference to the returned "incoming value". // If KV store will keep this value as well, it needs to make a clone. - change, newver, err := m.mergeValueForKey(key, incomingValue, true, ver, codec) + ut := time.Now() + + change, newver, err := m.mergeValueForKey(key, incomingValue, true, ver, codec, false, ut) if err == errVersionMismatch { - return nil, 0, retry, err + return nil, 0, retry, time.Time{}, err } if err != nil { - return nil, 0, retry, fmt.Errorf("merge failed: %v", err) + return nil, 0, retry, time.Time{}, fmt.Errorf("merge failed: %v", err) } if newver == 0 { // CAS method reacts on this error - return nil, 0, retry, errNoChangeDetected + return nil, 0, retry, time.Time{}, errNoChangeDetected } - return change, newver, retry, nil + return change, newver, retry, ut, nil } -func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, locallyGenerated bool) { +func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, locallyGenerated bool, deleted bool, updateTime time.Time) { if locallyGenerated && m.State() != services.Running { level.Warn(m.logger).Log("msg", "skipped broadcasting of locally-generated update because memberlist KV is shutting down", "key", key) return @@ -1123,7 +1165,7 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec return } - kvPair := KeyValuePair{Key: key, Value: data, Codec: codec.CodecID()} + kvPair := KeyValuePair{Key: key, Value: data, Codec: codec.CodecID(), Deleted: deleted, UpdateTimeMillis: updateTimeMillis(updateTime)} pairData, err := kvPair.Marshal() if err != nil { level.Error(m.logger).Log("msg", "failed to serialize KV pair", "key", key, "version", version, "err", err) @@ -1200,7 +1242,7 @@ func (m *KV) NotifyMsg(msg []byte) { ch := m.getKeyWorkerChannel(kvPair.Key) select { - case ch <- valueUpdate{value: kvPair.Value, codec: codec, messageSize: len(msg)}: + case ch <- valueUpdate{value: kvPair.Value, codec: codec, messageSize: len(msg), deleted: kvPair.Deleted, updateTime: updateTime(kvPair.UpdateTimeMillis)}: default: m.numberOfDroppedMessages.Inc() level.Warn(m.logger).Log("msg", "notify queue full, dropping message", "key", kvPair.Key) @@ -1227,7 +1269,7 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { select { case update := <-workerCh: // we have a value update! Let's merge it with our current version for given key - mod, version, err := m.mergeBytesValueForKey(key, update.value, update.codec) + mod, version, err := m.mergeBytesValueForKey(key, update.value, update.codec, update.deleted, update.updateTime) changes := []string(nil) if mod != nil { @@ -1252,7 +1294,7 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { m.notifyWatchers(key) // Don't resend original message, but only changes. - m.broadcastNewValue(key, mod, version, update.codec, false) + m.broadcastNewValue(key, mod, version, update.codec, false, update.deleted, update.updateTime) } case <-m.shutdown: @@ -1326,6 +1368,8 @@ func (m *KV) LocalState(_ bool) []byte { kvPair.Key = key kvPair.Value = encoded kvPair.Codec = val.CodecID + kvPair.Deleted = val.Deleted + kvPair.UpdateTimeMillis = updateTimeMillis(val.UpdateTime) ser, err := kvPair.Marshal() if err != nil { @@ -1407,8 +1451,13 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) { continue } + updateTime := updateTime(kvPair.UpdateTimeMillis) + if updateTime.IsZero() { + updateTime = time.Now() + } + // we have both key and value, try to merge it with our state - change, newver, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec) + change, newver, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec, kvPair.Deleted, updateTime) changes := []string(nil) if change != nil { @@ -1427,7 +1476,7 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) { level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err) } else if newver > 0 { m.notifyWatchers(kvPair.Key) - m.broadcastNewValue(kvPair.Key, change, newver, codec, false) + m.broadcastNewValue(kvPair.Key, change, newver, codec, false, kvPair.Deleted, updateTime) } } @@ -1436,7 +1485,7 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) { } } -func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec.Codec) (Mergeable, uint, error) { +func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec.Codec, deleted bool, updateTime time.Time) (Mergeable, uint, error) { decodedValue, err := codec.Decode(incomingData) if err != nil { return nil, 0, fmt.Errorf("failed to decode value: %v", err) @@ -1448,14 +1497,14 @@ func (m *KV) mergeBytesValueForKey(key string, incomingData []byte, codec codec. } // No need to clone this "incomingValue", since we have just decoded it from bytes, and won't be using it. - return m.mergeValueForKey(key, incomingValue, false, 0, codec) + return m.mergeValueForKey(key, incomingValue, false, 0, codec, deleted, updateTime) } // Merges incoming value with value we have in our store. Returns "a change" that can be sent to other // cluster members to update their state, and new version of the value. // If CAS version is specified, then merging will fail if state has changed already, and errVersionMismatch is reported. // If no modification occurred, new version is 0. -func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValueRequiresClone bool, casVersion uint, codec codec.Codec) (Mergeable, uint, error) { +func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValueRequiresClone bool, casVersion uint, codec codec.Codec, deleted bool, updateTime time.Time) (Mergeable, uint, error) { m.storeMu.Lock() defer m.storeMu.Unlock() @@ -1496,10 +1545,19 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue } newVersion := curr.Version + 1 + newUpdateTime := curr.UpdateTime + newDeleted := curr.Deleted + + if !updateTime.IsZero() && updateTime.After(newUpdateTime) { + newUpdateTime = updateTime + newDeleted = deleted + } m.store[key] = ValueDesc{ - value: result, - Version: newVersion, - CodecID: codec.CodecID(), + value: result, + Version: newVersion, + CodecID: codec.CodecID(), + Deleted: newDeleted, + UpdateTime: newUpdateTime, } // The "changes" returned by Merge() can contain references to the "result" @@ -1584,6 +1642,17 @@ func (m *KV) deleteSentReceivedMessages() { m.receivedMessagesSize = 0 } +func (m *KV) cleanupObsoleteEntries() { + m.storeMu.Lock() + defer m.storeMu.Unlock() + + for k, v := range m.store { + if v.Deleted && time.Since(v.UpdateTime) > m.cfg.ObsoleteEntriesTimeout { + delete(m.store, k) + } + } +} + func addMessageToBuffer(msgs []Message, size int, limit int, msg Message) ([]Message, int) { msgs = append(msgs, msg) size += msg.Size @@ -1595,3 +1664,17 @@ func addMessageToBuffer(msgs []Message, size int, limit int, msg Message) ([]Mes return msgs, size } + +func updateTime(val int64) time.Time { + if val == 0 { + return time.Time{} + } + return time.UnixMilli(val) +} + +func updateTimeMillis(ts time.Time) int64 { + if ts.IsZero() { + return 0 + } + return ts.UnixMilli() +} diff --git a/kv/memberlist/memberlist_client_test.go b/kv/memberlist/memberlist_client_test.go index 47e8b3a8f..5aaec9431 100644 --- a/kv/memberlist/memberlist_client_test.go +++ b/kv/memberlist/memberlist_client_test.go @@ -565,6 +565,7 @@ func defaultKVConfig(i int) KVConfig { cfg.GossipInterval = 100 * time.Millisecond cfg.GossipNodes = 10 cfg.PushPullInterval = 5 * time.Second + cfg.ObsoleteEntriesTimeout = 5 * time.Second cfg.TCPTransport = TCPTransportConfig{ BindAddrs: getLocalhostAddrs(), @@ -574,6 +575,53 @@ func defaultKVConfig(i int) KVConfig { return cfg } +func TestDelete(t *testing.T) { + t.Parallel() + + c := dataCodec{} + + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.TCPTransport = TCPTransportConfig{ + BindAddrs: getLocalhostAddrs(), + BindPort: 0, // randomize ports + } + cfg.GossipNodes = 1 + cfg.GossipInterval = 100 * time.Millisecond + cfg.ObsoleteEntriesTimeout = 1 * time.Second + cfg.ClusterLabelVerificationDisabled = true + cfg.Codecs = []codec.Codec{c} + + mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) + defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck + + kv, err := NewClient(mkv, c) + require.NoError(t, err) + + const key = "test" + + val := get(t, kv, key) + if val != nil { + t.Error("Expected nil, got:", val) + } + + err = cas(kv, key, updateFn("test")) + require.NoError(t, err) + + err = kv.Delete(context.Background(), key) + if err != nil { + t.Fatalf("Failed to delete key %s: %v", key, err) + } + + time.Sleep(2 * time.Second) // wait for obsolete entries to be removed + val = get(t, kv, key) + + if val != nil { + t.Errorf("Expected nil, got: %v", val) + } +} + func TestMultipleClients(t *testing.T) { t.Parallel() @@ -653,7 +701,6 @@ func TestMultipleClientsWithSameLabelWithClusterLabelVerification(t *testing.T) cfg := defaultKVConfig(i) cfg.ClusterLabel = label - return cfg } @@ -1686,11 +1733,11 @@ func TestGetBroadcastsPrefersLocalUpdates(t *testing.T) { require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32))) // Check that locally-generated broadcast messages will be prioritized and sent out first, even if they are enqueued later or are smaller than other messages in the queue. - kv.broadcastNewValue("non-local", smallUpdate, 1, codec, false) - kv.broadcastNewValue("non-local", bigUpdate, 2, codec, false) - kv.broadcastNewValue("local", smallUpdate, 1, codec, true) - kv.broadcastNewValue("local", bigUpdate, 2, codec, true) - kv.broadcastNewValue("local", mediumUpdate, 3, codec, true) + kv.broadcastNewValue("non-local", smallUpdate, 1, codec, false, false, time.Now()) + kv.broadcastNewValue("non-local", bigUpdate, 2, codec, false, false, time.Now()) + kv.broadcastNewValue("local", smallUpdate, 1, codec, true, false, time.Now()) + kv.broadcastNewValue("local", bigUpdate, 2, codec, true, false, time.Now()) + kv.broadcastNewValue("local", mediumUpdate, 3, codec, true, false, time.Now()) err := testutil.GatherAndCompare(reg, bytes.NewBufferString(` # HELP memberlist_client_messages_in_broadcast_queue Number of user messages in the broadcast queue diff --git a/kv/memberlist/status.gohtml b/kv/memberlist/status.gohtml index 6f845b6e0..524acb809 100644 --- a/kv/memberlist/status.gohtml +++ b/kv/memberlist/status.gohtml @@ -22,6 +22,8 @@