Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

erl: Simplifications to public interfaces #5141

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 15 additions & 19 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util"
"github.com/algorand/go-algorand/util/erl"
"github.com/algorand/go-algorand/util/execpool"
"github.com/algorand/go-algorand/util/metrics"
)
Expand Down Expand Up @@ -106,7 +106,7 @@ type txBacklogMsg struct {
rawmsgDataHash *crypto.Digest // hash (if any) of raw message data from the network
unverifiedTxGroupHash *crypto.Digest // hash (if any) of the unverifiedTxGroup
verificationErr error // The verification error generated by the verification function, if any.
capguard *util.ErlCapacityGuard // the structure returned from the elastic rate limiter, to be released when dequeued
capguard *erl.ErlCapacityGuard // the structure returned from the elastic rate limiter, to be released when dequeued
}

// TxHandler handles transaction messages
Expand All @@ -128,7 +128,7 @@ type TxHandler struct {
streamVerifier *verify.StreamVerifier
streamVerifierChan chan *verify.UnverifiedElement
streamVerifierDropped chan *verify.UnverifiedElement
erl *util.ElasticRateLimiter
erl *erl.ElasticRateLimiter
}

// TxHandlerOpts is TxHandler configuration options
Expand Down Expand Up @@ -182,7 +182,7 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) {
}

if opts.Config.EnableTxBacklogRateLimiting {
rateLimiter := util.NewElasticRateLimiter(
rateLimiter := erl.NewElasticRateLimiter(
txBacklogSize,
opts.Config.TxBacklogReservedCapacityPerPeer,
time.Duration(opts.Config.TxBacklogServiceRateWindowSeconds)*time.Second,
Expand Down Expand Up @@ -294,16 +294,11 @@ func (handler *TxHandler) backlogWorker() {
// this is never happening since handler.backlogQueue is never closed
return
}
if wi.capguard != nil {
if err := wi.capguard.Release(); err != nil {
logging.Base().Warnf("Failed to release capacity to ElasticRateLimiter: %v", err)
}
}
if handler.checkAlreadyCommitted(wi) {
ac := handler.checkAlreadyCommitted(wi)
// mark the capacity as released, and as served too, if it's alreadyCommitted
wi.capguard.HandleCapguard()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not functionally equivalent to the original code: for each incoming wi Released is called in order to release capacity for a client. Then, Served called after some delay when wi gets actually queued into the verifier, so it could account for delays in batch verifier.
But I think this Served is misplaced and should be in case wi, ok := <-handler.postVerificationQueue: branch when we actually get wi verified by the batch signature verification.
@AlgoAxel @algonautshant opinions?

if ac {
transactionMessagesAlreadyCommitted.Inc(nil)
if wi.capguard != nil {
wi.capguard.Served()
}
continue
}
// handler.streamVerifierChan does not receive if ctx is cancled
Expand All @@ -313,9 +308,6 @@ func (handler *TxHandler) backlogWorker() {
transactionMessagesDroppedFromBacklog.Inc(nil)
return
}
if wi.capguard != nil {
wi.capguard.Served()
}
case wi, ok := <-handler.postVerificationQueue:
if !ok {
// this is never happening since handler.postVerificationQueue is never closed
Expand Down Expand Up @@ -576,14 +568,18 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
consumed := 0

var err error
var capguard *util.ErlCapacityGuard
var capguard *erl.ErlCapacityGuard
if handler.erl != nil {
// consume a capacity unit
// if the elastic rate limiter cannot vend a capacity, the error it returns
// is sufficient to indicate that we should enable Congestion Control, because
// an issue in vending capacity indicates the underlying resource (TXBacklog) is full
capguard, err = handler.erl.ConsumeCapacity(rawmsg.Sender.(util.ErlClient))
if err != nil {
var enable bool
capguard, enable, err = handler.erl.ConsumeCapacity(rawmsg.Sender)
// do not enable congestion control if consumption failed because the sender couldn't be used for consumption
// as that sender is simply unable to consume capacity (and therefore should not affect congestion control)
// this should not currently be possible as all senders are wsPeers
if err != nil && enable {
handler.erl.EnableCongestionControl()
// if there is no capacity, it is the same as if we failed to put the item onto the backlog, so report such
transactionMessagesDroppedFromBacklog.Inc(nil)
Expand Down
41 changes: 30 additions & 11 deletions util/rateLimit.go → util/erl/rateLimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package util
package erl

import (
"context"
Expand All @@ -26,15 +26,18 @@ import (
"sync"
"time"

"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/util/metrics"
"github.com/algorand/go-deadlock"
)

var errConManDropped = errors.New("congestionManager prevented client from consuming capacity")
var ErrIncompatibleERLClient = errors.New("provided client doesn't satisfy ERLClient")
var errFailedConsume = errors.New("could not consume capacity from capacityQueue")
var errERLReservationExists = errors.New("client already has a reservation")
var errReservationExists = errors.New("client already has a reservation")
var errCapacityReturn = errors.New("could not replace capacity to channel")

var errConManDropped = errors.New("congestionManager prevented client from consuming capacity")

// ElasticRateLimiter holds and distributes capacity through capacityQueues
// Capacity consumers are given an error if there is no capacity available for them,
// and a "capacityGuard" structure they can use to return the capacity when finished
Expand Down Expand Up @@ -168,9 +171,14 @@ func (erl *ElasticRateLimiter) DisableCongestionControl() {
// ConsumeCapacity will dispense one capacity from either the resource's reservedCapacity,
// and will return a guard who can return capacity when the client is ready
// Returns an error if the capacity could not be vended, which could be:
// - the client provided doesn't support the ErlClient interface
// - there is not sufficient free capacity to assign a reserved capacity block
// - there is no reserved or shared capacity available for the client
func (erl *ElasticRateLimiter) ConsumeCapacity(c ErlClient) (*ErlCapacityGuard, error) {
func (erl *ElasticRateLimiter) ConsumeCapacity(client interface{}) (*ErlCapacityGuard, bool, error) {
c, ok := client.(ErlClient)
if !ok {
return nil, false, ErrIncompatibleERLClient
}
var q capacityQueue
var err error
var exists bool
Expand All @@ -185,14 +193,14 @@ func (erl *ElasticRateLimiter) ConsumeCapacity(c ErlClient) (*ErlCapacityGuard,
if !exists {
q, err = erl.openReservation(c)
if err != nil {
return nil, err
return nil, true, err
}
// if the client has been given a new reservation, make sure it cleans up OnClose
c.OnClose(func() { erl.closeReservation(c) })

// if this reservation is newly created, directly (blocking) take a capacity
q.blockingConsume()
return &ErlCapacityGuard{cq: q, cm: erl.cm}, nil
return &ErlCapacityGuard{cq: q, cm: erl.cm}, false, nil
}

// Step 1: Attempt consumption from the reserved queue
Expand All @@ -201,7 +209,7 @@ func (erl *ElasticRateLimiter) ConsumeCapacity(c ErlClient) (*ErlCapacityGuard,
if erl.cm != nil {
erl.cm.Consumed(c, time.Now()) // notify the congestion manager that this client consumed from this queue
}
return &cg, nil
return &cg, false, nil
}
// Step 2: Potentially gate shared queue access if the congestion manager disallows it
if erl.cm != nil &&
Expand All @@ -210,17 +218,17 @@ func (erl *ElasticRateLimiter) ConsumeCapacity(c ErlClient) (*ErlCapacityGuard,
if erl.congestionControlCounter != nil {
erl.congestionControlCounter.Inc(nil)
}
return nil, errConManDropped
return nil, true, errConManDropped
}
// Step 3: Attempt consumption from the shared queue
cg, err = erl.sharedCapacity.consume(erl.cm)
if err != nil {
return nil, err
return nil, true, err
}
if erl.cm != nil {
erl.cm.Consumed(c, time.Now()) // notify the congestion manager that this client consumed from this queue
}
return &cg, nil
return &cg, false, nil
}

// openReservation creates an entry in the ElasticRateLimiter's reservedCapacity map,
Expand All @@ -229,7 +237,7 @@ func (erl *ElasticRateLimiter) openReservation(c ErlClient) (capacityQueue, erro
erl.clientLock.Lock()
defer erl.clientLock.Unlock()
if _, exists := erl.capacityByClient[c]; exists {
return capacityQueue(nil), errERLReservationExists
return capacityQueue(nil), errReservationExists
}
// guard against overprovisioning, if there is less than a reservedCapacity amount left
remaining := erl.MaxCapacity - (erl.CapacityPerReservation * len(erl.capacityByClient))
Expand Down Expand Up @@ -554,3 +562,14 @@ func prune(ts *[]time.Time, cutoff time.Time) int {
*ts = (*ts)[:0]
return 0
}

func (cg *ErlCapacityGuard) HandleCapguard() {
if cg == nil {
return
}
if err := cg.Release(); err != nil {
logging.Base().Warnf("Failed to release capacity to ElasticRateLimiter: %v", err)
}
cg.Served()

}
26 changes: 14 additions & 12 deletions util/rateLimit_test.go → util/erl/rateLimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package util
package erl

import (
"testing"
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestElasticRateLimiterCongestionControlled(t *testing.T) {
// give the ERL a congestion controler with well defined behavior for testing
erl.cm = mockCongestionControl{}

_, err := erl.ConsumeCapacity(client)
_, _, err := erl.ConsumeCapacity(client)
// because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share,
// wait a moment before testing the size of the sharedCapacity
time.Sleep(100 * time.Millisecond)
Expand All @@ -62,18 +62,18 @@ func TestElasticRateLimiterCongestionControlled(t *testing.T) {
assert.NoError(t, err)

erl.EnableCongestionControl()
_, err = erl.ConsumeCapacity(client)
_, _, err = erl.ConsumeCapacity(client)
assert.Equal(t, 0, len(erl.capacityByClient[client]))
assert.Equal(t, 1, len(erl.sharedCapacity))
assert.NoError(t, err)

_, err = erl.ConsumeCapacity(client)
_, _, err = erl.ConsumeCapacity(client)
assert.Equal(t, 0, len(erl.capacityByClient[client]))
assert.Equal(t, 1, len(erl.sharedCapacity))
assert.Error(t, err)

erl.DisableCongestionControl()
_, err = erl.ConsumeCapacity(client)
_, _, err = erl.ConsumeCapacity(client)
assert.Equal(t, 0, len(erl.capacityByClient[client]))
assert.Equal(t, 0, len(erl.sharedCapacity))
assert.NoError(t, err)
Expand All @@ -85,14 +85,14 @@ func TestReservations(t *testing.T) {
client2 := mockClient("client2")
erl := NewElasticRateLimiter(4, 1, time.Second, nil)

_, err := erl.ConsumeCapacity(client1)
_, _, err := erl.ConsumeCapacity(client1)
// because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share,
// wait a moment before testing the size of the sharedCapacity
time.Sleep(100 * time.Millisecond)
assert.Equal(t, 1, len(erl.capacityByClient))
assert.NoError(t, err)

_, err = erl.ConsumeCapacity(client2)
_, _, err = erl.ConsumeCapacity(client2)
// because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share,
// wait a moment before testing the size of the sharedCapacity
time.Sleep(100 * time.Millisecond)
Expand All @@ -110,31 +110,33 @@ func TestConsumeReleaseCapacity(t *testing.T) {
client := mockClient("client")
erl := NewElasticRateLimiter(4, 3, time.Second, nil)

c1, err := erl.ConsumeCapacity(client)
c1, en, err := erl.ConsumeCapacity(client)
// because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share,
// wait a moment before testing the size of the sharedCapacity
time.Sleep(100 * time.Millisecond)
assert.Equal(t, 2, len(erl.capacityByClient[client]))
assert.Equal(t, 1, len(erl.sharedCapacity))
assert.NoError(t, err)
assert.False(t, en)

_, err = erl.ConsumeCapacity(client)
_, _, err = erl.ConsumeCapacity(client)
assert.Equal(t, 1, len(erl.capacityByClient[client]))
assert.Equal(t, 1, len(erl.sharedCapacity))
assert.NoError(t, err)

_, err = erl.ConsumeCapacity(client)
_, _, err = erl.ConsumeCapacity(client)
assert.Equal(t, 0, len(erl.capacityByClient[client]))
assert.Equal(t, 1, len(erl.sharedCapacity))
assert.NoError(t, err)

// remember this capacity, as it is a shared capacity
c4, err := erl.ConsumeCapacity(client)
c4, en, err := erl.ConsumeCapacity(client)
assert.Equal(t, 0, len(erl.capacityByClient[client]))
assert.Equal(t, 0, len(erl.sharedCapacity))
assert.NoError(t, err)
assert.False(t, en)

_, err = erl.ConsumeCapacity(client)
_, _, err = erl.ConsumeCapacity(client)
assert.Equal(t, 0, len(erl.capacityByClient[client]))
assert.Equal(t, 0, len(erl.sharedCapacity))
assert.Error(t, err)
Expand Down