From fc51e19e3f0571789c53cfb9382de6291760b0d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Mon, 23 May 2022 16:40:23 +0200 Subject: [PATCH 1/2] Update dskit to bring "Parallelize memberlist notified message processing" PR. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 1 + go.mod | 2 +- go.sum | 4 +- .../dskit/kv/memberlist/memberlist_client.go | 111 +++++++++++++----- .../grafana/dskit/kv/memberlist/metrics.go | 7 ++ .../grafana/dskit/netutil/netutil.go | 3 +- .../grafana/dskit/ring/lifecycler.go | 64 ++++++---- vendor/modules.txt | 2 +- 8 files changed, 135 insertions(+), 59 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 10fc1a41cd6..ad7f86ba316 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [CHANGE] Increased default configuration for `-server.grpc-max-recv-msg-size-bytes` and `-server.grpc-max-send-msg-size-bytes` from 4MB to 100MB. #1883 * [ENHANCEMENT] Store-gateway: Add the experimental ability to run requests in a dedicated OS thread pool. This feature can be configured using `-store-gateway.thread-pool-size` and is disabled by default. Replaces the ability to run index header operations in a dedicated thread pool. #1660 #1812 +* [ENHANCEMENT] Memberlist KV: incoming messages are now processed on per-key goroutine. This may reduce loss of "maintanance" packets in busy memberlist installations, but use more CPU. New `memberlist_client_received_broadcasts_dropped_total` counter tracks number of dropped per-key messages. * [BUGFIX] Fix regexp parsing panic for regexp label matchers with start/end quantifiers. #1883 * [BUGFIX] Ingester: fixed deceiving error log "failed to update cached shipped blocks after shipper initialisation", occurring for each new tenant in the ingester. #1893 diff --git a/go.mod b/go.mod index 18d483a88bd..4bf73b60d08 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.0 - github.com/grafana/dskit v0.0.0-20220506090252-45db43a8cfe2 + github.com/grafana/dskit v0.0.0-20220523143435-f5a17a2c14c8 github.com/grafana/e2e v0.1.1-0.20220519104354-1db01e4751fe github.com/hashicorp/golang-lru v0.5.4 github.com/json-iterator/go v1.1.12 diff --git a/go.sum b/go.sum index 00c20781f7f..d9356432353 100644 --- a/go.sum +++ b/go.sum @@ -1047,8 +1047,8 @@ github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9 h1:LQAhgcUPnzdjU github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1/go.mod h1:uPG2nyK4CtgNDmWv7qyzYcdI+S90kHHRWvHnBtEMBXM= github.com/grafana/dskit v0.0.0-20220112093026-95274ccc858d/go.mod h1:M0/dlftwBvH7+hdNNpjMa/CUXD7gsew67mbkCuDlFXE= -github.com/grafana/dskit v0.0.0-20220506090252-45db43a8cfe2 h1:EvatyCYj0QND5Wd54jRuaprO2+zmC9ndoG/sIHHiZK4= -github.com/grafana/dskit v0.0.0-20220506090252-45db43a8cfe2/go.mod h1:9It/K30QPyj/FuTqBb/SYnaS4/BJCP5YL4SRfXB7dG0= +github.com/grafana/dskit v0.0.0-20220523143435-f5a17a2c14c8 h1:u7rtjSEjrX/WOGTgRq8ZsNXOv62tRqhkb1gJtNTEzR8= +github.com/grafana/dskit v0.0.0-20220523143435-f5a17a2c14c8/go.mod h1:9It/K30QPyj/FuTqBb/SYnaS4/BJCP5YL4SRfXB7dG0= github.com/grafana/e2e v0.1.1-0.20220519104354-1db01e4751fe h1:mxrRWDjKtob43xF9nEhJthdtCzX35/800Sk7nE//YHQ= github.com/grafana/e2e v0.1.1-0.20220519104354-1db01e4751fe/go.mod h1:+26VJWpczg2OU3D0537acnHSHzhJORpxOs6F+M27tZo= github.com/grafana/memberlist v0.3.1-0.20220425183535-6b97a09b7167 h1:PgEQkGHR4YimSCEGT5IoswN9gJKZDVskf+he6UClCLw= diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go index 23c40ac764f..57c08926f7e 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go @@ -27,6 +27,7 @@ import ( const ( maxCasRetries = 10 // max retries in CAS operation noChangeDetectedRetrySleep = time.Second // how long to sleep after no change was detected in CAS + notifyMsgQueueSize = 1024 // size of buffered channels to handle memberlist messages ) // Client implements kv.Client interface, by using memberlist.KV @@ -251,6 +252,10 @@ type KV struct { receivedMessagesSize int messageCounter int // Used to give each message in the sentMessages and receivedMessages a unique ID, for UI. + // Per-key value update workers + workersMu sync.Mutex + workersChannels map[string]chan valueUpdate + // closed on shutdown shutdown chan struct{} @@ -258,6 +263,7 @@ type KV struct { numberOfReceivedMessages prometheus.Counter totalSizeOfReceivedMessages prometheus.Counter numberOfInvalidReceivedMessages prometheus.Counter + numberOfDroppedMessages prometheus.Counter numberOfPulls prometheus.Counter numberOfPushes prometheus.Counter totalSizeOfPulls prometheus.Counter @@ -319,6 +325,12 @@ func (v ValueDesc) Clone() (result ValueDesc) { return } +type valueUpdate struct { + value []byte + codec codec.Codec + messageSize int +} + func (v ValueDesc) String() string { return fmt.Sprintf("version: %d, codec: %s", v.Version, v.CodecID) } @@ -339,17 +351,17 @@ func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace mlkv := &KV{ - cfg: cfg, - logger: logger, - registerer: registerer, - provider: dnsProvider, - - store: make(map[string]ValueDesc), - codecs: make(map[string]codec.Codec), - watchers: make(map[string][]chan string), - prefixWatchers: make(map[string][]chan string), - shutdown: make(chan struct{}), - maxCasRetries: maxCasRetries, + cfg: cfg, + logger: logger, + registerer: registerer, + provider: dnsProvider, + store: make(map[string]ValueDesc), + codecs: make(map[string]codec.Codec), + watchers: make(map[string][]chan string), + prefixWatchers: make(map[string][]chan string), + workersChannels: make(map[string]chan valueUpdate), + shutdown: make(chan struct{}), + maxCasRetries: maxCasRetries, } mlkv.createAndRegisterMetrics() @@ -429,7 +441,6 @@ func (m *KV) starting(_ context.Context) error { if err != nil { return fmt.Errorf("failed to create memberlist: %v", err) } - // Finish delegate initialization. m.memberlist = list m.broadcasts = &memberlist.TransmitLimitedQueue{ @@ -931,8 +942,6 @@ func (m *KV) NodeMeta(limit int) []byte { // NotifyMsg is method from Memberlist Delegate interface // Called when single message is received, i.e. what our broadcastNewValue has sent. func (m *KV) NotifyMsg(msg []byte) { - m.initWG.Wait() - m.numberOfReceivedMessages.Inc() m.totalSizeOfReceivedMessages.Add(float64(len(msg))) @@ -957,29 +966,67 @@ func (m *KV) NotifyMsg(msg []byte) { return } - // we have a ring update! Let's merge it with our version of the ring for given key - mod, version, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec) + ch := m.getKeyWorkerChannel(kvPair.Key) + select { + case ch <- valueUpdate{value: kvPair.Value, codec: codec, messageSize: len(msg)}: + default: + m.numberOfDroppedMessages.Inc() + level.Warn(m.logger).Log("msg", "notify queue full, dropping message", "key", kvPair.Key) + } +} + +func (m *KV) getKeyWorkerChannel(key string) chan<- valueUpdate { + m.workersMu.Lock() + defer m.workersMu.Unlock() + + ch := m.workersChannels[key] + if ch == nil { + // spawn a key associated worker goroutine to process updates in background + ch = make(chan valueUpdate, notifyMsgQueueSize) + go m.processValueUpdate(ch, key) - changes := []string(nil) - if mod != nil { - changes = mod.MergeContent() + m.workersChannels[key] = ch } + return ch +} - m.addReceivedMessage(Message{ - Time: time.Now(), - Size: len(msg), - Pair: kvPair, - Version: version, - Changes: changes, - }) +func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) { + for { + select { + case update := <-workerCh: + // we have a value update! Let's merge it with our current version for given key + mod, version, err := m.mergeBytesValueForKey(key, update.value, update.codec) - if err != nil { - level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err) - } else if version > 0 { - m.notifyWatchers(kvPair.Key) + changes := []string(nil) + if mod != nil { + changes = mod.MergeContent() + } + + m.addReceivedMessage(Message{ + Time: time.Now(), + Size: update.messageSize, + Pair: KeyValuePair{ + Key: key, + Value: update.value, + Codec: update.codec.CodecID(), + }, + Version: version, + Changes: changes, + }) - // Don't resend original message, but only changes. - m.broadcastNewValue(kvPair.Key, mod, version, codec) + if err != nil { + level.Error(m.logger).Log("msg", "failed to store received value", "key", key, "err", err) + } else if version > 0 { + m.notifyWatchers(key) + + // Don't resend original message, but only changes. + m.broadcastNewValue(key, mod, version, update.codec) + } + + case <-m.shutdown: + // stop running on shutdown + return + } } } diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/metrics.go b/vendor/github.com/grafana/dskit/kv/memberlist/metrics.go index c7d3f01c277..9ab56a662db 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/metrics.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/metrics.go @@ -36,6 +36,13 @@ func (m *KV) createAndRegisterMetrics() { Help: "Number of received broadcast user messages that were invalid. Hopefully 0.", }) + m.numberOfDroppedMessages = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ + Namespace: m.cfg.MetricsNamespace, + Subsystem: subsystem, + Name: "received_broadcasts_dropped_total", + Help: "Number of received broadcast user messages that were dropped. Hopefully 0.", + }) + m.numberOfPushes = promauto.With(m.registerer).NewCounter(prometheus.CounterOpts{ Namespace: m.cfg.MetricsNamespace, Subsystem: subsystem, diff --git a/vendor/github.com/grafana/dskit/netutil/netutil.go b/vendor/github.com/grafana/dskit/netutil/netutil.go index a1b7c1d40f6..232317d4b54 100644 --- a/vendor/github.com/grafana/dskit/netutil/netutil.go +++ b/vendor/github.com/grafana/dskit/netutil/netutil.go @@ -2,6 +2,7 @@ package netutil import ( "net" + "strings" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -52,6 +53,6 @@ func privateNetworkInterfaces(all []net.Interface, fallback []string, logger log if len(privInts) == 0 { return fallback } - level.Debug(logger).Log("msg", "found network interfaces with private IP addresses assigned", "interfaces", privInts) + level.Debug(logger).Log("msg", "found network interfaces with private IP addresses assigned", "interfaces", strings.Join(privInts, " ")) return privInts } diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler.go b/vendor/github.com/grafana/dskit/ring/lifecycler.go index 9db0a7e6b0c..2479ad03c8e 100644 --- a/vendor/github.com/grafana/dskit/ring/lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/lifecycler.go @@ -13,7 +13,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" - perrors "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" @@ -110,8 +109,9 @@ type Lifecycler struct { Zone string // Whether to flush if transfer fails on shutdown. - flushOnShutdown *atomic.Bool - unregisterOnShutdown *atomic.Bool + flushOnShutdown *atomic.Bool + unregisterOnShutdown *atomic.Bool + clearTokensOnShutdown *atomic.Bool // We need to remember the ingester state, tokens and registered timestamp just in case the KV store // goes away and comes back empty. The state changes during lifecycle of instance. @@ -160,20 +160,21 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa } l := &Lifecycler{ - cfg: cfg, - flushTransferer: flushTransferer, - KVStore: store, - Addr: fmt.Sprintf("%s:%d", addr, port), - ID: cfg.ID, - RingName: ringName, - RingKey: ringKey, - flushOnShutdown: atomic.NewBool(flushOnShutdown), - unregisterOnShutdown: atomic.NewBool(cfg.UnregisterOnShutdown), - Zone: cfg.Zone, - actorChan: make(chan func()), - state: PENDING, - lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg), - logger: logger, + cfg: cfg, + flushTransferer: flushTransferer, + KVStore: store, + Addr: fmt.Sprintf("%s:%d", addr, port), + ID: cfg.ID, + RingName: ringName, + RingKey: ringKey, + flushOnShutdown: atomic.NewBool(flushOnShutdown), + unregisterOnShutdown: atomic.NewBool(cfg.UnregisterOnShutdown), + clearTokensOnShutdown: atomic.NewBool(false), + Zone: cfg.Zone, + actorChan: make(chan func()), + state: PENDING, + lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg), + logger: logger, } l.BasicService = services. @@ -393,7 +394,7 @@ func (i *Lifecycler) loop(ctx context.Context) error { // First, see if we exist in the cluster, update our state to match if we do, // and add ourselves (without tokens) if we don't. if err := i.initRing(context.Background()); err != nil { - return perrors.Wrapf(err, "failed to join the ring %s", i.RingName) + return errors.Wrapf(err, "failed to join the ring %s", i.RingName) } // We do various period tasks @@ -416,14 +417,14 @@ func (i *Lifecycler) loop(ctx context.Context) error { // let's observe the ring. By using JOINING state, this ingester will be ignored by LEAVING // ingesters, but we also signal that it is not fully functional yet. if err := i.autoJoin(context.Background(), JOINING); err != nil { - return perrors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) + return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) } level.Info(i.logger).Log("msg", "observing tokens before going ACTIVE", "ring", i.RingName) observeChan = time.After(i.cfg.ObservePeriod) } else { if err := i.autoJoin(context.Background(), ACTIVE); err != nil { - return perrors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) + return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) } } } @@ -510,11 +511,18 @@ heartbeatLoop: if i.ShouldUnregisterOnShutdown() { if err := i.unregister(context.Background()); err != nil { - return perrors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName) + return errors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName) } level.Info(i.logger).Log("msg", "instance removed from the KV store", "ring", i.RingName) } + if i.cfg.TokensFilePath != "" && i.ClearTokensOnShutdown() { + if err := os.Remove(i.cfg.TokensFilePath); err != nil { + return errors.Wrapf(err, "failed to delete tokens file %s", i.cfg.TokensFilePath) + } + level.Info(i.logger).Log("msg", "removed tokens file from disk", "path", i.cfg.TokensFilePath) + } + return nil } @@ -825,8 +833,20 @@ func (i *Lifecycler) SetUnregisterOnShutdown(enabled bool) { i.unregisterOnShutdown.Store(enabled) } +// ClearTokensOnShutdown returns if persisted tokens should be cleared on shutdown. +func (i *Lifecycler) ClearTokensOnShutdown() bool { + return i.clearTokensOnShutdown.Load() +} + +// SetClearTokensOnShutdown enables/disables deletions of tokens on shutdown. +// Set to `true` in case one wants to clear tokens on shutdown which are +// otherwise persisted, e.g. useful in custom shutdown handlers. +func (i *Lifecycler) SetClearTokensOnShutdown(enabled bool) { + i.clearTokensOnShutdown.Store(enabled) +} + func (i *Lifecycler) processShutdown(ctx context.Context) { - flushRequired := i.flushOnShutdown.Load() + flushRequired := i.FlushOnShutdown() transferStart := time.Now() if err := i.flushTransferer.TransferOut(ctx); err != nil { if err == ErrTransferDisabled { diff --git a/vendor/modules.txt b/vendor/modules.txt index fe7fda4eba0..5437452febf 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -434,7 +434,7 @@ github.com/gosimple/slug # github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9 ## explicit; go 1.13 github.com/grafana-tools/sdk -# github.com/grafana/dskit v0.0.0-20220506090252-45db43a8cfe2 +# github.com/grafana/dskit v0.0.0-20220523143435-f5a17a2c14c8 ## explicit; go 1.17 github.com/grafana/dskit/backoff github.com/grafana/dskit/concurrency From b8210f8667df6d80a39e813a44c396d379715027 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Mon, 23 May 2022 16:42:56 +0200 Subject: [PATCH 2/2] CHANGELOG.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ad7f86ba316..967299ab00e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ * [CHANGE] Increased default configuration for `-server.grpc-max-recv-msg-size-bytes` and `-server.grpc-max-send-msg-size-bytes` from 4MB to 100MB. #1883 * [ENHANCEMENT] Store-gateway: Add the experimental ability to run requests in a dedicated OS thread pool. This feature can be configured using `-store-gateway.thread-pool-size` and is disabled by default. Replaces the ability to run index header operations in a dedicated thread pool. #1660 #1812 -* [ENHANCEMENT] Memberlist KV: incoming messages are now processed on per-key goroutine. This may reduce loss of "maintanance" packets in busy memberlist installations, but use more CPU. New `memberlist_client_received_broadcasts_dropped_total` counter tracks number of dropped per-key messages. +* [ENHANCEMENT] Memberlist KV: incoming messages are now processed on per-key goroutine. This may reduce loss of "maintanance" packets in busy memberlist installations, but use more CPU. New `memberlist_client_received_broadcasts_dropped_total` counter tracks number of dropped per-key messages. #1912 * [BUGFIX] Fix regexp parsing panic for regexp label matchers with start/end quantifiers. #1883 * [BUGFIX] Ingester: fixed deceiving error log "failed to update cached shipped blocks after shipper initialisation", occurring for each new tenant in the ingester. #1893