Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize memberlist notified message processing #110

Merged
merged 1 commit into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [ENHANCEMENT] Add runutil.CloseWithLogOnErr function. #58
* [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #76 #77 #84 #91 #93
* [ENHANCEMENT] Memberlist: prepare the data to send on the write before starting counting the elapsed time for `-memberlist.packet-write-timeout`, in order to reduce chances we hit the timeout when sending a packet to other node. #89
* [ENHANCEMENT] Memberlist: parallelize processing of messages received by memberlist. #110
* [ENHANCEMENT] flagext: for cases such as `DeprecatedFlag()` that need a logger, add RegisterFlagsWithLogger. #80
* [ENHANCEMENT] Added option to BasicLifecycler to keep instance in the ring when stopping. #97
* [ENHANCEMENT] Add WaitRingTokensStability function to ring, to be able to wait on ring stability excluding allowed state transitions. #95
Expand Down
111 changes: 79 additions & 32 deletions kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
const (
maxCasRetries = 10 // max retries in CAS operation
noChangeDetectedRetrySleep = time.Second // how long to sleep after no change was detected in CAS
notifyMsgQueueSize = 1024 // size of buffered channels to handle memberlist messages
)

// Client implements kv.Client interface, by using memberlist.KV
Expand Down Expand Up @@ -251,13 +252,18 @@ type KV struct {
receivedMessagesSize int
messageCounter int // Used to give each message in the sentMessages and receivedMessages a unique ID, for UI.

// Per-key value update workers
workersMu sync.Mutex
workersChannels map[string]chan valueUpdate

// closed on shutdown
shutdown chan struct{}

// metrics
numberOfReceivedMessages prometheus.Counter
totalSizeOfReceivedMessages prometheus.Counter
numberOfInvalidReceivedMessages prometheus.Counter
numberOfDroppedMessages prometheus.Counter
numberOfPulls prometheus.Counter
numberOfPushes prometheus.Counter
totalSizeOfPulls prometheus.Counter
Expand Down Expand Up @@ -319,6 +325,12 @@ func (v ValueDesc) Clone() (result ValueDesc) {
return
}

type valueUpdate struct {
value []byte
codec codec.Codec
messageSize int
}

func (v ValueDesc) String() string {
return fmt.Sprintf("version: %d, codec: %s", v.Version, v.CodecID)
}
Expand All @@ -339,17 +351,17 @@ func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer
cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace

mlkv := &KV{
cfg: cfg,
logger: logger,
registerer: registerer,
provider: dnsProvider,

store: make(map[string]ValueDesc),
codecs: make(map[string]codec.Codec),
watchers: make(map[string][]chan string),
prefixWatchers: make(map[string][]chan string),
shutdown: make(chan struct{}),
maxCasRetries: maxCasRetries,
cfg: cfg,
logger: logger,
registerer: registerer,
provider: dnsProvider,
store: make(map[string]ValueDesc),
codecs: make(map[string]codec.Codec),
watchers: make(map[string][]chan string),
prefixWatchers: make(map[string][]chan string),
workersChannels: make(map[string]chan valueUpdate),
shutdown: make(chan struct{}),
maxCasRetries: maxCasRetries,
}

mlkv.createAndRegisterMetrics()
Expand Down Expand Up @@ -429,7 +441,6 @@ func (m *KV) starting(_ context.Context) error {
if err != nil {
return fmt.Errorf("failed to create memberlist: %v", err)
}

// Finish delegate initialization.
m.memberlist = list
m.broadcasts = &memberlist.TransmitLimitedQueue{
Expand Down Expand Up @@ -931,8 +942,6 @@ func (m *KV) NodeMeta(limit int) []byte {
// NotifyMsg is method from Memberlist Delegate interface
// Called when single message is received, i.e. what our broadcastNewValue has sent.
func (m *KV) NotifyMsg(msg []byte) {
m.initWG.Wait()

m.numberOfReceivedMessages.Inc()
m.totalSizeOfReceivedMessages.Add(float64(len(msg)))

Expand All @@ -957,29 +966,67 @@ func (m *KV) NotifyMsg(msg []byte) {
return
}

// we have a ring update! Let's merge it with our version of the ring for given key
mod, version, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec)
ch := m.getKeyWorkerChannel(kvPair.Key)
select {
case ch <- valueUpdate{value: kvPair.Value, codec: codec, messageSize: len(msg)}:
default:
m.numberOfDroppedMessages.Inc()
level.Warn(m.logger).Log("msg", "notify queue full, dropping message", "key", kvPair.Key)
}
}

func (m *KV) getKeyWorkerChannel(key string) chan<- valueUpdate {
m.workersMu.Lock()
defer m.workersMu.Unlock()

ch := m.workersChannels[key]
if ch == nil {
// spawn a key associated worker goroutine to process updates in background
ch = make(chan valueUpdate, notifyMsgQueueSize)
go m.processValueUpdate(ch, key)

changes := []string(nil)
if mod != nil {
changes = mod.MergeContent()
m.workersChannels[key] = ch
}
return ch
}

m.addReceivedMessage(Message{
Time: time.Now(),
Size: len(msg),
Pair: kvPair,
Version: version,
Changes: changes,
})
func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) {
for {
select {
case update := <-workerCh:
// we have a value update! Let's merge it with our current version for given key
mod, version, err := m.mergeBytesValueForKey(key, update.value, update.codec)

if err != nil {
level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err)
} else if version > 0 {
m.notifyWatchers(kvPair.Key)
changes := []string(nil)
if mod != nil {
changes = mod.MergeContent()
}

m.addReceivedMessage(Message{
Time: time.Now(),
Size: update.messageSize,
Pair: KeyValuePair{
Key: key,
Value: update.value,
Codec: update.codec.CodecID(),
},
Version: version,
Changes: changes,
})

// Don't resend original message, but only changes.
m.broadcastNewValue(kvPair.Key, mod, version, codec)
if err != nil {
level.Error(m.logger).Log("msg", "failed to store received value", "key", key, "err", err)
} else if version > 0 {
m.notifyWatchers(key)

// Don't resend original message, but only changes.
m.broadcastNewValue(key, mod, version, update.codec)
}

case <-m.shutdown:
// stop running on shutdown
return
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,9 @@ func TestNotifyMsgResendsOnlyChanges(t *testing.T) {
"c": {Timestamp: now.Unix(), State: ACTIVE},
}}))

// Wait until KV update has been processed.
time.Sleep(time.Millisecond * 100)

// Check two things here:
// 1) state of value in KV store
// 2) broadcast message only has changed members
Expand Down Expand Up @@ -1220,6 +1223,9 @@ func TestSendingOldTombstoneShouldNotForwardMessage(t *testing.T) {

kv.NotifyMsg(marshalKeyValuePair(t, key, codec, tc.msgToSend))

// Wait until KV update has been processed.
time.Sleep(time.Millisecond * 100)

bs := kv.GetBroadcasts(0, math.MaxInt32)
if tc.broadcastMessage == nil {
require.Equal(t, 0, len(bs), "expected no broadcast message")
Expand Down
7 changes: 7 additions & 0 deletions kv/memberlist/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ func (m *KV) createAndRegisterMetrics() {
Help: "Number of received broadcast user messages that were invalid. Hopefully 0.",
})

m.numberOfDroppedMessages = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{
Namespace: m.cfg.MetricsNamespace,
Subsystem: subsystem,
Name: "received_broadcasts_dropped_total",
Help: "Number of received broadcast user messages that were dropped. Hopefully 0.",
ortuman marked this conversation as resolved.
Show resolved Hide resolved
})

m.numberOfPushes = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{
Namespace: m.cfg.MetricsNamespace,
Subsystem: subsystem,
Expand Down