Skip to content

Commit

Permalink
Rename "casQueue" to "localQueue".
Browse files Browse the repository at this point in the history
Split messages_in_broadcast_queue metric into two values, one for each queue.
  • Loading branch information
pstibrany committed Jul 17, 2024
1 parent a892e97 commit 979c031
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 44 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
* [CHANGE] Changed `ShouldLog()` function signature in `middleware.OptionalLogging` interface to `ShouldLog(context.Context) (bool, string)`: the returned `string` contains an optional reason. When reason is valued, `GRPCServerLog` adds `(<reason>)` suffix to the error. #514
* [CHANGE] Cache: Remove superfluous `cache.RemoteCacheClient` interface and unify all caches using the `cache.Cache` interface. #520
* [CHANGE] Updated the minimum required Go version to 1.21. #540
* [CHANGE] memberlist: Metric `memberlist_client_messages_in_broadcast_queue` is now split into `queue="local"` and `queue="gossip"` values. #539
* [FEATURE] Cache: Add support for configuring a Redis cache backend. #268 #271 #276
* [FEATURE] Add support for waiting on the rate limiter using the new `WaitN` method. #279
* [FEATURE] Add `log.BufferedLogger` type. #338
Expand Down Expand Up @@ -214,8 +215,8 @@
* [ENHANCEMENT] SpanProfiler: do less work on unsampled traces. #528
* [ENHANCEMENT] Log Middleware: if the trace is not sampled, log its ID as `trace_id_unsampled` instead of `trace_id`. #529
* [EHNANCEMENT] httpgrpc: httpgrpc Server can now use error message from special HTTP header when converting HTTP response to an error. This is useful when HTTP response body contains binary data that doesn't form valid utf-8 string, otherwise grpc would fail to marshal returned error. #531
* [ENHANCEMENT] memberlist: use separate queue for broadcast messages that are result of CAS updates, and prioritize CAS update messages when sending broadcasts. On stopping, only wait for CAS updates queue to be empty. #539
* [ENHANCEMENT] memberlist: Added `-<prefix>memberlist.broadcast-timeout-for-cas-updates-on-shutdown` option to set timeout for sending CAS updates on shutdown, instead of previously hardcoded 10s (which is still the default). #539
* [ENHANCEMENT] memberlist: use separate queue for broadcast messages that are result of local updates, and prioritize locally-generated messages when sending broadcasts. On stopping, only wait for queue with locally-generated messages to be empty. #539
* [ENHANCEMENT] memberlist: Added `-<prefix>memberlist.broadcast-timeout-for-local-updates-on-shutdown` option to set timeout for sending locally-generated updates on shutdown, instead of previously hardcoded 10s (which is still the default). #539
* [CHANGE] Backoff: added `Backoff.ErrCause()` which is like `Backoff.Err()` but returns the context cause if backoff is terminated because the context has been canceled. #538
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
Expand Down
47 changes: 24 additions & 23 deletions kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ type KVConfig struct {
LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"`

// Timeout used when leaving the memberlist cluster.
LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"`
BroadcastTimeoutForCasUpdatesOnShutdown time.Duration `yaml:"broadcast_timeout__for_cas_updates_on_shutdown" category:"advanced"`
LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"`
BroadcastTimeoutForLocalUpdatesOnShutdown time.Duration `yaml:"broadcast_timeout_for_local_updates_on_shutdown" category:"advanced"`

// How much space to use to keep received and sent messages in memory (for troubleshooting).
MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes" category:"advanced"`
Expand Down Expand Up @@ -199,7 +199,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.")
f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.")
f.BoolVar(&cfg.ClusterLabelVerificationDisabled, prefix+"memberlist.cluster-label-verification-disabled", mlDefaults.SkipInboundLabelCheck, "When true, memberlist doesn't verify that inbound packets and gossip streams have the cluster label matching the configured one. This verification should be disabled while rolling out the change to the configured cluster label in a live memberlist cluster.")
f.DurationVar(&cfg.BroadcastTimeoutForCasUpdatesOnShutdown, prefix+"memberlist.broadcast-timeout-for-cas-updates-on-shutdown", 10*time.Second, "Timeout for broadcasting all remaining CAS updates to other nodes when shutting down. Only used if there are nodes left in the memberlist cluster, and only applies to CAS updates, not other kind of broadcast messages. 0 = no timeout, wait until all CAS updates are sent.")
f.DurationVar(&cfg.BroadcastTimeoutForLocalUpdatesOnShutdown, prefix+"memberlist.broadcast-timeout-for-local-updates-on-shutdown", 10*time.Second, "Timeout for broadcasting all remaining locally-generated updates to other nodes when shutting down. Only used if there are nodes left in the memberlist cluster, and only applies to locally-generated updates, not to broadcast messages that are result of incoming gossip updates. 0 = no timeout, wait until all locally-generated updates are sent.")

cfg.TCPTransport.RegisterFlagsWithPrefix(f, prefix)
}
Expand Down Expand Up @@ -233,10 +233,10 @@ type KV struct {
// dns discovery provider
provider DNSProvider

// Protects access to memberlist and gossipBroadcasts fields.
// Protects access to memberlist and broadcast queues.
delegateReady atomic.Bool
memberlist *memberlist.Memberlist
casBroadcasts *memberlist.TransmitLimitedQueue // queue for messages generated locally
localBroadcasts *memberlist.TransmitLimitedQueue // queue for messages generated locally
gossipBroadcasts *memberlist.TransmitLimitedQueue // queue for messages that we forward from other nodes

// KV Store.
Expand Down Expand Up @@ -276,7 +276,8 @@ type KV struct {
numberOfPushes prometheus.Counter
totalSizeOfPulls prometheus.Counter
totalSizeOfPushes prometheus.Counter
numberOfBroadcastMessagesInQueue prometheus.GaugeFunc
numberOfGossipMessagesInQueue prometheus.GaugeFunc
numberOfLocalMessagesInQueue prometheus.GaugeFunc
totalSizeOfBroadcastMessagesInQueue prometheus.Gauge
numberOfBroadcastMessagesDropped prometheus.Counter
casAttempts prometheus.Counter
Expand Down Expand Up @@ -459,7 +460,7 @@ func (m *KV) starting(ctx context.Context) error {
}
// Finish delegate initialization.
m.memberlist = list
m.casBroadcasts = &memberlist.TransmitLimitedQueue{
m.localBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: list.NumMembers,
RetransmitMult: mlCfg.RetransmitMult,
}
Expand Down Expand Up @@ -726,24 +727,24 @@ func (m *KV) discoverMembers(ctx context.Context, members []string) []string {
func (m *KV) stopping(_ error) error {
level.Info(m.logger).Log("msg", "leaving memberlist cluster")

// Wait until CAS broadcast queue is empty, but don't wait for too long.
// Wait until queue with locally-generated messages is empty, but don't wait for too long.
// Also don't wait if there is just one node left.
// Once we enter Stopping state, we don't queue any more CAS messages.
// Note: Once we enter Stopping state, we don't queue more locally-generated messages.

deadline := time.Now().Add(m.cfg.BroadcastTimeoutForCasUpdatesOnShutdown)
deadline := time.Now().Add(m.cfg.BroadcastTimeoutForLocalUpdatesOnShutdown)

msgs := m.casBroadcasts.NumQueued()
msgs := m.localBroadcasts.NumQueued()
nodes := m.memberlist.NumMembers()
for msgs > 0 && nodes > 1 && (m.cfg.BroadcastTimeoutForCasUpdatesOnShutdown <= 0 || time.Now().Before(deadline)) {
level.Info(m.logger).Log("msg", "waiting for CAS broadcast messages to be sent out", "count", msgs, "nodes", nodes)
for msgs > 0 && nodes > 1 && (m.cfg.BroadcastTimeoutForLocalUpdatesOnShutdown <= 0 || time.Now().Before(deadline)) {
level.Info(m.logger).Log("msg", "waiting for locally-generated broadcast messages to be sent out", "count", msgs, "nodes", nodes)
time.Sleep(250 * time.Millisecond)

msgs = m.casBroadcasts.NumQueued()
msgs = m.localBroadcasts.NumQueued()
nodes = m.memberlist.NumMembers()
}

if msgs > 0 {
level.Warn(m.logger).Log("msg", "broadcast messages left in CAS queue", "count", msgs, "nodes", nodes)
level.Warn(m.logger).Log("msg", "locally-generated broadcast messages left the queue", "count", msgs, "nodes", nodes)
}

err := m.memberlist.Leave(m.cfg.LeaveTimeout)
Expand Down Expand Up @@ -1041,9 +1042,9 @@ func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{})
return change, newver, retry, nil
}

func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, isCas bool) {
if isCas && m.State() != services.Running {
level.Warn(m.logger).Log("msg", "skipped broadcasting CAS update because memberlist KV is shutting down", "key", key)
func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, locallyGenerated bool) {
if locallyGenerated && m.State() != services.Running {
level.Warn(m.logger).Log("msg", "skipped broadcasting of locally-generated update because memberlist KV is shutting down", "key", key)
return
}

Expand Down Expand Up @@ -1084,8 +1085,8 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec

m.totalSizeOfBroadcastMessagesInQueue.Add(float64(l))

if isCas {
m.casBroadcasts.QueueBroadcast(b)
if locallyGenerated {
m.localBroadcasts.QueueBroadcast(b)
} else {
m.gossipBroadcasts.QueueBroadcast(b)
}
Expand Down Expand Up @@ -1200,10 +1201,10 @@ func (m *KV) GetBroadcasts(overhead, limit int) [][]byte {
return nil
}

// Prioritize CAS queue
msgs := m.casBroadcasts.GetBroadcasts(overhead, limit)
// Prioritize locally-generated messages
msgs := m.localBroadcasts.GetBroadcasts(overhead, limit)

// Decrease limit for each message we got from CAS broadcasts.
// Decrease limit for each message we got from locally-generated broadcasts.
for _, m := range msgs {
limit -= overhead + len(m)
}
Expand Down
36 changes: 23 additions & 13 deletions kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1648,7 +1648,7 @@ func (p delayedDNSProviderMock) Addresses() []string {
return p.resolved
}

func TestGetBroadcastsPrefersCASUpdates(t *testing.T) {
func TestGetBroadcastsPrefersLocalUpdates(t *testing.T) {
codec := dataCodec{}

cfg := KVConfig{
Expand All @@ -1661,7 +1661,8 @@ func TestGetBroadcastsPrefersCASUpdates(t *testing.T) {
cfg.RetransmitMult = 1
cfg.Codecs = append(cfg.Codecs, codec)

kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry())
reg := prometheus.NewRegistry()
kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, reg)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv))
defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck

Expand All @@ -1673,21 +1674,30 @@ func TestGetBroadcastsPrefersCASUpdates(t *testing.T) {
// No broadcast messages from KV at the beginning.
require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32)))

// Check that CAS broadcast messages will be prioritized and sent out first, even if they are enqueued later or are smaller than other messages in the queue.
kv.broadcastNewValue("notcas", smallUpdate, 1, codec, false)
kv.broadcastNewValue("notcas", bigUpdate, 2, codec, false)
kv.broadcastNewValue("cas", smallUpdate, 1, codec, true)
kv.broadcastNewValue("cas", bigUpdate, 2, codec, true)
// Check that locally-generated broadcast messages will be prioritized and sent out first, even if they are enqueued later or are smaller than other messages in the queue.
kv.broadcastNewValue("non-local", smallUpdate, 1, codec, false)
kv.broadcastNewValue("non-local", bigUpdate, 2, codec, false)
kv.broadcastNewValue("local", smallUpdate, 1, codec, true)
kv.broadcastNewValue("local", bigUpdate, 2, codec, true)

err := testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP memberlist_client_messages_in_broadcast_queue Number of user messages in the broadcast queue
# TYPE memberlist_client_messages_in_broadcast_queue gauge
memberlist_client_messages_in_broadcast_queue{queue="gossip"} 2
memberlist_client_messages_in_broadcast_queue{queue="local"} 2
`), "memberlist_client_messages_in_broadcast_queue")
require.NoError(t, err)

msgs := kv.GetBroadcasts(0, 10000)
require.Len(t, msgs, 4) // we get all 4 messages
require.Equal(t, "cas", getKey(t, msgs[0]))
require.Equal(t, "cas", getKey(t, msgs[1]))
require.Equal(t, "notcas", getKey(t, msgs[2]))
require.Equal(t, "notcas", getKey(t, msgs[3]))
require.Equal(t, "local", getKey(t, msgs[0]))
require.Equal(t, "local", getKey(t, msgs[1]))
require.Equal(t, "non-local", getKey(t, msgs[2]))
require.Equal(t, "non-local", getKey(t, msgs[3]))

// Check that TransmitLimitedQueue.GetBroadcasts preferred larger messages (it does that).
require.True(t, len(msgs[0]) > len(msgs[1])) // Bigger CAS message is returned before smaller one
require.True(t, len(msgs[2]) > len(msgs[3])) // Bigger non-CAS message is returned before smaller one
require.True(t, len(msgs[0]) > len(msgs[1])) // Bigger local message is returned before smaller one
require.True(t, len(msgs[2]) > len(msgs[3])) // Bigger non-local message is returned before smaller one
}

func getKey(t *testing.T, msg []byte) string {
Expand Down
30 changes: 24 additions & 6 deletions kv/memberlist/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,33 @@ func (m *KV) createAndRegisterMetrics() {
Help: "Total size of pulled state",
})

m.numberOfBroadcastMessagesInQueue = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{
Namespace: m.cfg.MetricsNamespace,
Subsystem: subsystem,
Name: "messages_in_broadcast_queue",
Help: "Number of user messages in the broadcast queue",
const queueMetricName = "messages_in_broadcast_queue"
const queueMetricHelp = "Number of user messages in the broadcast queue"

m.numberOfGossipMessagesInQueue = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{
Namespace: m.cfg.MetricsNamespace,
Subsystem: subsystem,
Name: queueMetricName,
Help: queueMetricHelp,
ConstLabels: map[string]string{"queue": "gossip"},
}, func() float64 {
// Queues are not set before Starting state
if m.State() == services.Running || m.State() == services.Stopping {
return float64(m.gossipBroadcasts.NumQueued())
}
return 0
})

m.numberOfLocalMessagesInQueue = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{
Namespace: m.cfg.MetricsNamespace,
Subsystem: subsystem,
Name: queueMetricName,
Help: queueMetricHelp,
ConstLabels: map[string]string{"queue": "local"},
}, func() float64 {
// Queues are not set before Starting state
if m.State() == services.Running || m.State() == services.Stopping {
return float64(m.gossipBroadcasts.NumQueued()) + float64(m.casBroadcasts.NumQueued())
return float64(m.localBroadcasts.NumQueued())
}
return 0
})
Expand Down

0 comments on commit 979c031

Please sign in to comment.