Skip to content

Commit

Permalink
parallelize memberlist notified message processing (#110)
Browse files Browse the repository at this point in the history
Notified messages in KV memberlist are now processed asynchronously by a worker pool.
This will facilitate vertical scaling in conditions where UDP packet pressure is high due to a high number of instances of the memberlist.

Some unit test had been additionally tweaked to support the new async model.
  • Loading branch information
ortuman authored May 23, 2022
1 parent 94ec9ad commit f5a17a2
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [ENHANCEMENT] Add runutil.CloseWithLogOnErr function. #58
* [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #76 #77 #84 #91 #93
* [ENHANCEMENT] Memberlist: prepare the data to send on the write before starting counting the elapsed time for `-memberlist.packet-write-timeout`, in order to reduce chances we hit the timeout when sending a packet to other node. #89
* [ENHANCEMENT] Memberlist: parallelize processing of messages received by memberlist. #110
* [ENHANCEMENT] flagext: for cases such as `DeprecatedFlag()` that need a logger, add RegisterFlagsWithLogger. #80
* [ENHANCEMENT] Added option to BasicLifecycler to keep instance in the ring when stopping. #97
* [ENHANCEMENT] Add WaitRingTokensStability function to ring, to be able to wait on ring stability excluding allowed state transitions. #95
Expand Down
111 changes: 79 additions & 32 deletions kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
const (
maxCasRetries = 10 // max retries in CAS operation
noChangeDetectedRetrySleep = time.Second // how long to sleep after no change was detected in CAS
notifyMsgQueueSize = 1024 // size of buffered channels to handle memberlist messages
)

// Client implements kv.Client interface, by using memberlist.KV
Expand Down Expand Up @@ -251,13 +252,18 @@ type KV struct {
receivedMessagesSize int
messageCounter int // Used to give each message in the sentMessages and receivedMessages a unique ID, for UI.

// Per-key value update workers
workersMu sync.Mutex
workersChannels map[string]chan valueUpdate

// closed on shutdown
shutdown chan struct{}

// metrics
numberOfReceivedMessages prometheus.Counter
totalSizeOfReceivedMessages prometheus.Counter
numberOfInvalidReceivedMessages prometheus.Counter
numberOfDroppedMessages prometheus.Counter
numberOfPulls prometheus.Counter
numberOfPushes prometheus.Counter
totalSizeOfPulls prometheus.Counter
Expand Down Expand Up @@ -319,6 +325,12 @@ func (v ValueDesc) Clone() (result ValueDesc) {
return
}

type valueUpdate struct {
value []byte
codec codec.Codec
messageSize int
}

func (v ValueDesc) String() string {
return fmt.Sprintf("version: %d, codec: %s", v.Version, v.CodecID)
}
Expand All @@ -339,17 +351,17 @@ func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer
cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace

mlkv := &KV{
cfg: cfg,
logger: logger,
registerer: registerer,
provider: dnsProvider,

store: make(map[string]ValueDesc),
codecs: make(map[string]codec.Codec),
watchers: make(map[string][]chan string),
prefixWatchers: make(map[string][]chan string),
shutdown: make(chan struct{}),
maxCasRetries: maxCasRetries,
cfg: cfg,
logger: logger,
registerer: registerer,
provider: dnsProvider,
store: make(map[string]ValueDesc),
codecs: make(map[string]codec.Codec),
watchers: make(map[string][]chan string),
prefixWatchers: make(map[string][]chan string),
workersChannels: make(map[string]chan valueUpdate),
shutdown: make(chan struct{}),
maxCasRetries: maxCasRetries,
}

mlkv.createAndRegisterMetrics()
Expand Down Expand Up @@ -429,7 +441,6 @@ func (m *KV) starting(_ context.Context) error {
if err != nil {
return fmt.Errorf("failed to create memberlist: %v", err)
}

// Finish delegate initialization.
m.memberlist = list
m.broadcasts = &memberlist.TransmitLimitedQueue{
Expand Down Expand Up @@ -931,8 +942,6 @@ func (m *KV) NodeMeta(limit int) []byte {
// NotifyMsg is method from Memberlist Delegate interface
// Called when single message is received, i.e. what our broadcastNewValue has sent.
func (m *KV) NotifyMsg(msg []byte) {
m.initWG.Wait()

m.numberOfReceivedMessages.Inc()
m.totalSizeOfReceivedMessages.Add(float64(len(msg)))

Expand All @@ -957,29 +966,67 @@ func (m *KV) NotifyMsg(msg []byte) {
return
}

// we have a ring update! Let's merge it with our version of the ring for given key
mod, version, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec)
ch := m.getKeyWorkerChannel(kvPair.Key)
select {
case ch <- valueUpdate{value: kvPair.Value, codec: codec, messageSize: len(msg)}:
default:
m.numberOfDroppedMessages.Inc()
level.Warn(m.logger).Log("msg", "notify queue full, dropping message", "key", kvPair.Key)
}
}

func (m *KV) getKeyWorkerChannel(key string) chan<- valueUpdate {
m.workersMu.Lock()
defer m.workersMu.Unlock()

ch := m.workersChannels[key]
if ch == nil {
// spawn a key associated worker goroutine to process updates in background
ch = make(chan valueUpdate, notifyMsgQueueSize)
go m.processValueUpdate(ch, key)

changes := []string(nil)
if mod != nil {
changes = mod.MergeContent()
m.workersChannels[key] = ch
}
return ch
}

m.addReceivedMessage(Message{
Time: time.Now(),
Size: len(msg),
Pair: kvPair,
Version: version,
Changes: changes,
})
func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) {
for {
select {
case update := <-workerCh:
// we have a value update! Let's merge it with our current version for given key
mod, version, err := m.mergeBytesValueForKey(key, update.value, update.codec)

if err != nil {
level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err)
} else if version > 0 {
m.notifyWatchers(kvPair.Key)
changes := []string(nil)
if mod != nil {
changes = mod.MergeContent()
}

m.addReceivedMessage(Message{
Time: time.Now(),
Size: update.messageSize,
Pair: KeyValuePair{
Key: key,
Value: update.value,
Codec: update.codec.CodecID(),
},
Version: version,
Changes: changes,
})

// Don't resend original message, but only changes.
m.broadcastNewValue(kvPair.Key, mod, version, codec)
if err != nil {
level.Error(m.logger).Log("msg", "failed to store received value", "key", key, "err", err)
} else if version > 0 {
m.notifyWatchers(key)

// Don't resend original message, but only changes.
m.broadcastNewValue(key, mod, version, update.codec)
}

case <-m.shutdown:
// stop running on shutdown
return
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,9 @@ func TestNotifyMsgResendsOnlyChanges(t *testing.T) {
"c": {Timestamp: now.Unix(), State: ACTIVE},
}}))

// Wait until KV update has been processed.
time.Sleep(time.Millisecond * 100)

// Check two things here:
// 1) state of value in KV store
// 2) broadcast message only has changed members
Expand Down Expand Up @@ -1220,6 +1223,9 @@ func TestSendingOldTombstoneShouldNotForwardMessage(t *testing.T) {

kv.NotifyMsg(marshalKeyValuePair(t, key, codec, tc.msgToSend))

// Wait until KV update has been processed.
time.Sleep(time.Millisecond * 100)

bs := kv.GetBroadcasts(0, math.MaxInt32)
if tc.broadcastMessage == nil {
require.Equal(t, 0, len(bs), "expected no broadcast message")
Expand Down
7 changes: 7 additions & 0 deletions kv/memberlist/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ func (m *KV) createAndRegisterMetrics() {
Help: "Number of received broadcast user messages that were invalid. Hopefully 0.",
})

m.numberOfDroppedMessages = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{
Namespace: m.cfg.MetricsNamespace,
Subsystem: subsystem,
Name: "received_broadcasts_dropped_total",
Help: "Number of received broadcast user messages that were dropped. Hopefully 0.",
})

m.numberOfPushes = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{
Namespace: m.cfg.MetricsNamespace,
Subsystem: subsystem,
Expand Down

0 comments on commit f5a17a2

Please sign in to comment.