Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txHandler: Random Early Detection for backlog queue #4797

Merged
merged 61 commits into from
Dec 23, 2022

Conversation

AlgoAxel
Copy link
Contributor

@AlgoAxel AlgoAxel commented Nov 15, 2022

What

This PR introduces the following features and changes:

  • An Elastic Rate Limiter has been added to util, meant to manage mutli-client access to a single resource.
  • A Congestion Manager has been added to the same file, meant to advise the Elastic Rate Limiter in times of heavy load
  • Peers can now add OnClose functionality so the ERL can effectively close "Reservations" for peers when they disconnect

This Elastic Rate Limiter has been attached in front of TXHandler's adding to the Mempool.

Why

Today, peers send messages which are enqueued to our Mempool, with no regard for maximum bandwidth of the reciever. It is possible for a noisy peer to create difficulty for all peers adding to the Mempool. We would rather balance and selectively constrain peers based on their dominance in the messaging queue, and the service's ability to handle the load.

How

In TX Handler:

TX Handler now has an erl, Elastic Rate Limiter, and a Random Early Detection Congestion Manager cm inside it. When a new request comes, the TX Handler will receive from the ERL for a "capacityGuard" object, which is then attached to the message being enqueued. When dequeued, the Release and Served are called on the guard, restoring capacity to the ERL, and notifying the congestion manager that the request has been serviced

In the Elastic Rate Limiter:

  • A channel exists, with a static capacity . this is the Shared Capacity. When created, it is filled with capacity.
  • When an ERLClient attempts to consume capacity, first attempt to make a reserved capacity for it, and error if it can't. Also attach closeReservation to the client's OnClose
  • Reserved capacities are channels which also contain capacity, which is drained from the shared capacity. In this way, the total amount of capacity is never over-provisioned.

In the Congestion Manager:

RED Congestion manager implements the CongestionManager interface.

  • A run() loop listens for new consumed, served and shouldDropQuery messages
  • every N ticks, a new Target Service Rate is calculated using a sliding window of arrival and services
  • when the service is stopped, it records the values back to the structure for unit testing support:w
  • because the run loop runs on its own thread, the data is 1R1W

The "Random Early Detection" Congestion Management Algorithm

It was requested I write a short explainer for posterity, detailing the behavior of the congestion management algorithm. Wikipedia gives a good overview of how this works, but here's how it works specific to the txBacklog:

When a capacity unit is consumed, it records the time in a slice dedicated to consumption by that client.
When a capacity unit is served, it records the time in a slice for all serves (serves are not tracked by client because it is not immediately useful to do so, but it could in the future)

All consumption times and serve times are trimmed to a rolling window size (default 10s) and recalculated on an as-needed basis when a shouldDrop request arrives, or when the target service rate is being calculated

To minimize excess computation, a service rate is calculated every N ticks (default is equal to 1/s at 100 requests per second). The service rate is equal to the total number of serves / number of clients / window size. You can think of this as "Serves per client per second"

When the Congestion manager is queried if it shouldDrop:

  • Take the number of consumptions by the client, and divide it by the rolling window size. This is the arrival rate.
  • Take the currently calculated target service rate, and compare them as a ratio.
  • Compare that ratio with a random float
  • Additionally, exponents are applied to increase the "contrast" on what clients are dropped most. The exact equation is:
ArrivalRate^4 / TargetRate^4 -> AdjustedProbability

Should Drop IF a Random Float is lower than AdjustedProbability

Testing

Unit Tests

Unit tests have been written for the Elastic Rate Limiter itself, confirming it can safely vend and receive capacity.
Unit tests have also been written for the RED congestion manager

Manual Tests

Ran standard cluster tests, and originally observed a decrease in overall TPS, and a TPS over time which looked consistent with congestion control being active, and perhaps too punishing for the average case. I then introduced an exponential curve to the shouldDrop calculation, which mitigated most of the TPS reduction, which reflected in a less aggressive looking congestion management in the TPS graph.

Update: Since adding some fixes in the congestion management served() reporting, there is no longer an impact on TPS. The rate limiting was likely being applied too often due to the misreporting of served events, which ironically shows us that the congestion management does what it's supposed to when it's active. However, with the correct served event reporting, TPS is unimpeded.

Correctness test

Two clients sending in 15 sec, the second overflows the backlog in few threads.

client 1 sends 100k txns w/o dealay

| config  | dropped | user1 accepted | user1 sent | ratio |
+---------+---------+----------------+------------+-------+
| master  | 451719  | 20813          | 100k       | 0.2   |
| erl off | 444427  | 21807          | 100k       | 0.21  |
| efl on  | 482393  | 30025          | 100k       | 0.3   |

client 1 with the 50us delay, runtime 15sec

| config  | dropped | user1 accepted | user1 sent | ratio |
+---------+---------+----------------+------------+-------+
| master  | 462748  | 8616           |            |       |
| erl off | 458140  | 10702          | 30978      | 0.34  |
| efl on  | 412903  | 20201          | 30714      | 0.65. |

This structure has not been tested in any way, it is commited for
posterity as I work.
@codecov
Copy link

codecov bot commented Nov 15, 2022

Codecov Report

Merging #4797 (6f9b88a) into master (ca04aad) will increase coverage by 0.07%.
The diff coverage is 71.25%.

@@            Coverage Diff             @@
##           master    #4797      +/-   ##
==========================================
+ Coverage   53.49%   53.56%   +0.07%     
==========================================
  Files         432      433       +1     
  Lines       53624    53864     +240     
==========================================
+ Hits        28687    28854     +167     
- Misses      22708    22766      +58     
- Partials     2229     2244      +15     
Impacted Files Coverage Δ
config/localTemplate.go 43.75% <ø> (ø)
network/wsPeer.go 68.32% <0.00%> (-0.66%) ⬇️
data/txHandler.go 58.28% <9.37%> (-5.33%) ⬇️
util/rateLimit.go 82.35% <82.35%> (ø)
ledger/blockqueue.go 85.63% <0.00%> (-2.88%) ⬇️
catchup/service.go 68.11% <0.00%> (-0.97%) ⬇️
data/transactions/verify/txn.go 73.72% <0.00%> (-0.85%) ⬇️
ledger/tracker.go 74.26% <0.00%> (-0.85%) ⬇️
network/wsNetwork.go 64.92% <0.00%> (+0.17%) ⬆️
crypto/merkletrie/node.go 93.48% <0.00%> (+1.86%) ⬆️
... and 1 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@AlgoAxel AlgoAxel changed the title Add basic ElasticRateLimiter struct Elastic Rate Limiter and Congestion Manager for TXHandler Nov 18, 2022
@AlgoAxel AlgoAxel changed the title Elastic Rate Limiter and Congestion Manager for TXHandler TXHandler: Elastic Rate Limiter and Congestion Manager for TXHandler Nov 18, 2022
@AlgoAxel AlgoAxel marked this pull request as ready for review November 18, 2022 20:54
data/txHandler.go Outdated Show resolved Hide resolved
data/txHandler.go Outdated Show resolved Hide resolved
data/txHandler.go Outdated Show resolved Hide resolved
data/txHandler.go Outdated Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
data/txHandler.go Outdated Show resolved Hide resolved
Copy link
Contributor

@brianolson brianolson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one weird design thought; and we'll want cluster tests to check system performance.

data/txHandler.go Outdated Show resolved Hide resolved
data/txHandler.go Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
util/rateLimit.go Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
data/txHandler.go Outdated Show resolved Hide resolved
data/txHandler.go Outdated Show resolved Hide resolved
data/txHandler.go Outdated Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
util/rateLimit.go Show resolved Hide resolved
util/rateLimit.go Show resolved Hide resolved
util/rateLimit.go Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
util/rateLimit.go Show resolved Hide resolved
util/rateLimit.go Show resolved Hide resolved
util/rateLimit.go Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
util/rateLimit.go Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
data/txHandler.go Outdated Show resolved Hide resolved
data/txHandler.go Outdated Show resolved Hide resolved
data/txHandler.go Outdated Show resolved Hide resolved
icorderi
icorderi previously approved these changes Dec 22, 2022
util/rateLimit.go Outdated Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
util/rateLimit.go Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
util/rateLimit.go Outdated Show resolved Hide resolved
ts := []time.Time{}
consumedByClient[e.c] = &ts
}
*(consumedByClient[e.c]) = append(*(consumedByClient[e.c]), e.t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the upper bound for this time stamp arrays?

Copy link
Contributor Author

@AlgoAxel AlgoAxel Dec 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly, no upper bound, but the length of these lists will be proportional to the service rate:

  • client submits backlogSize transactions, filling the backlog
  • txHandler is magic and can instantly dequeue all requests
  • client fills the backlog again, repeat

However many times this can happen within the rolling window (10s) is how long the list could get, theoretically. Same answer for all-clients, since they'd share the ability to fill the backlog.

And obviously, window size is directly proportional to this as well, since a 20s window affords twice as much time to load and unload the backlog.

util/rateLimit.go Show resolved Hide resolved
* package level `var` for common Errors
* pruning optimizations
* closeReservation unit tests
* pointer to interface replaced by interface itself
@algorandskiy algorandskiy merged commit 50d5dfd into algorand:master Dec 23, 2022
@algorandskiy algorandskiy changed the title TXHandler: Elastic Rate Limiter and Congestion Manager for TXHandler txHandler: Random Early Detection for backlog queue Dec 23, 2022
Comment on lines +567 to +568
if err != nil {
handler.erl.EnableCongestionControl()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks strange to read that an error happened, and the response is to enable something.
It seems that the err is not necessarily an unexpected situation, but is a control parameter. ConsumeCapacity does not describe in the comments what errors are returned, and need digging into the internal functions to understand what is expected.
Either add a comment to describe what err entails, or use a different control parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point -- using an error as a control is uncommon. But I think it makes sense here, so it does deserve a comment.

The current strategy for TX Handler to use the ERL is that it asks for capacity, and while it is available, everything is fine. However, if there should ever be an issue granting capacity, then TX Handler decides that the ERL should use congestion management.

That "Issue Granting Capacity" indicates that at the moment, the underlying TX Backlog is full. It is indeed an error, but it's an error we encounter any time the inputs overwhelm this handler, so it's not exactly "unexpected" like you mention. Pre-ERL this error was/is just as likely, but it happens lower down, when the TXHandler attempts to put the item onto the backlog and fails (and no sort of function calling happens inside the condition).

Another way of putting this -- enablement of congestion control is indeed the response to an error condition -- if the handler is being overwhelmed, that's an error. And if it's generating errors for being overwhelmed, that's our signal to turn on defensive measures (congestion management)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point -- using an error as a control is uncommon. But I think it makes sense here, so it does deserve a comment.

That is all I need :-)

Comment on lines +257 to +274
if wi.capguard != nil {
if err := wi.capguard.Release(); err != nil {
logging.Base().Warnf("Failed to release capacity to ElasticRateLimiter: %v", err)
}
}
if handler.checkAlreadyCommitted(wi) {
transactionMessagesAlreadyCommitted.Inc(nil)
if wi.capguard != nil {
wi.capguard.Served()
}
continue
}

// enqueue the task to the verification pool.
handler.txVerificationPool.EnqueueBacklog(handler.ctx, handler.asyncVerifySignature, wi, nil)
if wi.capguard != nil {
wi.capguard.Served()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several points here:

  1. Served and Release both return err. One is checked, others are not. Why return an error if it is not important?
  2. When Release returns an error, it is of no consequence here. The logging of the warning can be inside Release.
  3. Why is it necessary to have 3 calls here? It is complicating the backlog code so much. There has to be another messaging mechanism.

Copy link
Contributor Author

@AlgoAxel AlgoAxel Jan 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. That's a good point, Served probably does not need to return an error. Served is what gets used to calculate Service Rates for the ShouldDrop, so if data doesn't get to it, the math is a little wrong (which is okay). By contrast, Release is what puts capacity back into the ERL. If it fails, it means we're leaking capacity and in theory it would impact the txHandler over time.

EDIT: actually, Served() does not return an error, so as far as I can tell I am not dropping any errors on the ground here. The error check for Release is the only one

  1. It can be moved, but I'd prefer to address this at a separate point from these other comments and the unit test fix that is ongoing. If we want I can cut a task to add logging to the ERL (it currently has no logging), which should be real easy.

  2. Not sure what to tell you here, this is where request servicing happens, so it's the most appropriate spot to invoke the Return/Serve. I'll explain why they're needed where they are though:

  • wi.capguard.Release() on line 258 -- the message has been pulled off the backlog, meaning there is now more room in the backlog to service requests. Therefore, the capguard that is attached to this message should go and return itself. There's really no other place to put this.

  • wi.capguard.Served() on line 265 -- the message is already committed, no further work is needed here. Since this request is now done, we call Served() to indicate our worker completed this work.

  • wi.capguard.Served() on line 273 -- we have enqueued the work to the backlog, hooray! Since the request is now done, we call Served() to indicate this. We don't defer, since work done after this point doesn't count toward the time handling the request.

So all in all, I think this is right -- when the request is dequeued from the protected resource, the capacity is available and gets returned. When the handler fully "handles" the item, it gets served. The nil check on the capguard is needed since this whole feature is configurable on/off. Not having an ERL means you wouldn't have a capguard.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(1) I must be mistaken here. I don't know what I was thinking...
for (2 & 3), here is my suggestion. It will only add a single line for handling capguard.
If will clean up more than 50% of the code in this segment: 18 lines -> 8 lines

Suggested change
if wi.capguard != nil {
if err := wi.capguard.Release(); err != nil {
logging.Base().Warnf("Failed to release capacity to ElasticRateLimiter: %v", err)
}
}
if handler.checkAlreadyCommitted(wi) {
transactionMessagesAlreadyCommitted.Inc(nil)
if wi.capguard != nil {
wi.capguard.Served()
}
continue
}
// enqueue the task to the verification pool.
handler.txVerificationPool.EnqueueBacklog(handler.ctx, handler.asyncVerifySignature, wi, nil)
if wi.capguard != nil {
wi.capguard.Served()
}
ac := handler.checkAlreadyCommitted(wi)
handleCapguard(wi, ac)
if ac {
transactionMessagesAlreadyCommitted.Inc(nil)
continue
}
// enqueue the task to the verification pool.
handler.txVerificationPool.EnqueueBacklog(handler.ctx, handler.asyncVerifySignature, wi, nil)

Then you can add this function wherever you please, outside txHandler.go

func handleCapguard(wi, alreadyCommitted) {
			if wi.capguard != nil {
				if err := wi.capguard.Release(); err != nil {
					logging.Base().Warnf("Failed to release capacity to ElasticRateLimiter: %v", err)
				}
			}
			if alreadyCommitted {
				if wi.capguard != nil {
					wi.capguard.Served()
				}
			}
}

var capguard *util.ErlCapacityGuard
if handler.erl != nil {
// consume a capacity unit
capguard, err = handler.erl.ConsumeCapacity(rawmsg.Sender.(util.ErlClient))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What makes you sure this will always return a valid pointer? rawmsg.Sender.(util.ErlClient)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rawmsg.Sender is a Peer interface, and my understanding is that the only in-use implementation of a Peer is the wsPeer.

wsPeer satisfies ErlClient by implementing OnClose.

Since Peer is just an empty interface, it makes it hard to say I'm "sure" it will always be a valid pointer.

This looks like it might be a go pattern I have yet to learn(?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Peer is just an empty interface, it makes it hard to say I'm "sure" it will always be a valid pointer.

Yes, and we should not be adding more into this.

It is really not worth taking a chance here.

Suggested change
capguard, err = handler.erl.ConsumeCapacity(rawmsg.Sender.(util.ErlClient))
capguard, err = handler.erl.ConsumeCapacity(rawmsg.Sender)

Then, inside consume capacity:

ConsumeCapacity(p Peer) {
	erlclient, ok := p.(util.ErlClient)
	if !ok {
		return nil, error("unsupported peer")
	}
...
}

AlgoAxel added a commit to AlgoAxel/go-algorand that referenced this pull request Jan 9, 2023
Addresses a flaky Unit Test by not overloading the component under test
(previously, 5k messages were driven as-fast-as-possible. Now, 10k
messages are driven in batches of 500 with a 100ms delay). The new
behavior is more accurate to a realistic usage pattern, as well.

This also addresses some deletable code and adds comments where
requested from some post-merge requests on pull algorand#4797.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants