Skip to content

Commit

Permalink
Shant's proposal for updates
Browse files Browse the repository at this point in the history
  • Loading branch information
algonautshant committed Feb 15, 2023
1 parent 3ad90d2 commit 4b034e7
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 45 deletions.
33 changes: 9 additions & 24 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util"
"github.com/algorand/go-algorand/util/erl"
"github.com/algorand/go-algorand/util/execpool"
"github.com/algorand/go-algorand/util/metrics"
)
Expand Down Expand Up @@ -106,7 +106,7 @@ type txBacklogMsg struct {
rawmsgDataHash *crypto.Digest // hash (if any) of raw message data from the network
unverifiedTxGroupHash *crypto.Digest // hash (if any) of the unverifiedTxGroup
verificationErr error // The verification error generated by the verification function, if any.
capguard *util.ErlCapacityGuard // the structure returned from the elastic rate limiter, to be released when dequeued
capguard *erl.ErlCapacityGuard // the structure returned from the elastic rate limiter, to be released when dequeued
}

// TxHandler handles transaction messages
Expand All @@ -128,7 +128,7 @@ type TxHandler struct {
streamVerifier *verify.StreamVerifier
streamVerifierChan chan *verify.UnverifiedElement
streamVerifierDropped chan *verify.UnverifiedElement
erl *util.ElasticRateLimiter
erl *erl.ElasticRateLimiter
}

// TxHandlerOpts is TxHandler configuration options
Expand Down Expand Up @@ -182,7 +182,7 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) {
}

if opts.Config.EnableTxBacklogRateLimiting {
rateLimiter := util.NewElasticRateLimiter(
rateLimiter := erl.NewElasticRateLimiter(
txBacklogSize,
opts.Config.TxBacklogReservedCapacityPerPeer,
time.Duration(opts.Config.TxBacklogServiceRateWindowSeconds)*time.Second,
Expand Down Expand Up @@ -296,7 +296,7 @@ func (handler *TxHandler) backlogWorker() {
}
ac := handler.checkAlreadyCommitted(wi)
// mark the capacity as released, and as served too, if it's alreadyCommitted
handleCapguard(wi.capguard, true, ac)
wi.capguard.HandleCapguard()
if ac {
transactionMessagesAlreadyCommitted.Inc(nil)
continue
Expand All @@ -308,8 +308,6 @@ func (handler *TxHandler) backlogWorker() {
transactionMessagesDroppedFromBacklog.Inc(nil)
return
}
// mark the capguard as served
handleCapguard(wi.capguard, false, true)
case wi, ok := <-handler.postVerificationQueue:
if !ok {
// this is never happening since handler.postVerificationQueue is never closed
Expand Down Expand Up @@ -570,17 +568,18 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
consumed := 0

var err error
var capguard *util.ErlCapacityGuard
var capguard *erl.ErlCapacityGuard
if handler.erl != nil {
// consume a capacity unit
// if the elastic rate limiter cannot vend a capacity, the error it returns
// is sufficient to indicate that we should enable Congestion Control, because
// an issue in vending capacity indicates the underlying resource (TXBacklog) is full
capguard, err = handler.erl.ConsumeCapacity(rawmsg.Sender)
var enable bool
capguard, enable, err = handler.erl.ConsumeCapacity(rawmsg.Sender)
// do not enable congestion control if consumption failed because the sender couldn't be used for consumption
// as that sender is simply unable to consume capacity (and therefore should not affect congestion control)
// this should not currently be possible as all senders are wsPeers
if err != nil && !errors.Is(err, util.ErrIncompatibleERLClient) {
if err != nil && enable {
handler.erl.EnableCongestionControl()
// if there is no capacity, it is the same as if we failed to put the item onto the backlog, so report such
transactionMessagesDroppedFromBacklog.Inc(nil)
Expand Down Expand Up @@ -756,17 +755,3 @@ func (handler *solicitedTxHandler) Handle(txgroup []transactions.SignedTxn) erro
}
return nil
}

func handleCapguard(c *util.ErlCapacityGuard, released, served bool) {
if c == nil {
return
}
if released {
if err := c.Release(); err != nil {
logging.Base().Warnf("Failed to release capacity to ElasticRateLimiter: %v", err)
}
}
if served {
c.Served()
}
}
30 changes: 21 additions & 9 deletions util/rateLimit.go → util/erl/rateLimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package util
package erl

import (
"context"
Expand All @@ -26,6 +26,7 @@ import (
"sync"
"time"

"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/util/metrics"
"github.com/algorand/go-deadlock"
)
Expand Down Expand Up @@ -173,10 +174,10 @@ func (erl *ElasticRateLimiter) DisableCongestionControl() {
// - the client provided doesn't support the ErlClient interface
// - there is not sufficient free capacity to assign a reserved capacity block
// - there is no reserved or shared capacity available for the client
func (erl *ElasticRateLimiter) ConsumeCapacity(client interface{}) (*ErlCapacityGuard, error) {
func (erl *ElasticRateLimiter) ConsumeCapacity(client interface{}) (*ErlCapacityGuard, bool, error) {
c, ok := client.(ErlClient)
if !ok {
return nil, ErrIncompatibleERLClient
return nil, false, ErrIncompatibleERLClient
}
var q capacityQueue
var err error
Expand All @@ -192,14 +193,14 @@ func (erl *ElasticRateLimiter) ConsumeCapacity(client interface{}) (*ErlCapacity
if !exists {
q, err = erl.openReservation(c)
if err != nil {
return nil, err
return nil, true, err
}
// if the client has been given a new reservation, make sure it cleans up OnClose
c.OnClose(func() { erl.closeReservation(c) })

// if this reservation is newly created, directly (blocking) take a capacity
q.blockingConsume()
return &ErlCapacityGuard{cq: q, cm: erl.cm}, nil
return &ErlCapacityGuard{cq: q, cm: erl.cm}, false, nil
}

// Step 1: Attempt consumption from the reserved queue
Expand All @@ -208,7 +209,7 @@ func (erl *ElasticRateLimiter) ConsumeCapacity(client interface{}) (*ErlCapacity
if erl.cm != nil {
erl.cm.Consumed(c, time.Now()) // notify the congestion manager that this client consumed from this queue
}
return &cg, nil
return &cg, false, nil
}
// Step 2: Potentially gate shared queue access if the congestion manager disallows it
if erl.cm != nil &&
Expand All @@ -217,17 +218,17 @@ func (erl *ElasticRateLimiter) ConsumeCapacity(client interface{}) (*ErlCapacity
if erl.congestionControlCounter != nil {
erl.congestionControlCounter.Inc(nil)
}
return nil, errConManDropped
return nil, true, errConManDropped
}
// Step 3: Attempt consumption from the shared queue
cg, err = erl.sharedCapacity.consume(erl.cm)
if err != nil {
return nil, err
return nil, true, err
}
if erl.cm != nil {
erl.cm.Consumed(c, time.Now()) // notify the congestion manager that this client consumed from this queue
}
return &cg, nil
return &cg, false, nil
}

// openReservation creates an entry in the ElasticRateLimiter's reservedCapacity map,
Expand Down Expand Up @@ -561,3 +562,14 @@ func prune(ts *[]time.Time, cutoff time.Time) int {
*ts = (*ts)[:0]
return 0
}

func (cg *ErlCapacityGuard) HandleCapguard() {
if cg == nil {
return
}
if err := cg.Release(); err != nil {
logging.Base().Warnf("Failed to release capacity to ElasticRateLimiter: %v", err)
}
cg.Served()

}
26 changes: 14 additions & 12 deletions util/rateLimit_test.go → util/erl/rateLimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package util
package erl

import (
"testing"
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestElasticRateLimiterCongestionControlled(t *testing.T) {
// give the ERL a congestion controler with well defined behavior for testing
erl.cm = mockCongestionControl{}

_, err := erl.ConsumeCapacity(client)
_, _, err := erl.ConsumeCapacity(client)
// because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share,
// wait a moment before testing the size of the sharedCapacity
time.Sleep(100 * time.Millisecond)
Expand All @@ -62,18 +62,18 @@ func TestElasticRateLimiterCongestionControlled(t *testing.T) {
assert.NoError(t, err)

erl.EnableCongestionControl()
_, err = erl.ConsumeCapacity(client)
_, _, err = erl.ConsumeCapacity(client)
assert.Equal(t, 0, len(erl.capacityByClient[client]))
assert.Equal(t, 1, len(erl.sharedCapacity))
assert.NoError(t, err)

_, err = erl.ConsumeCapacity(client)
_, _, err = erl.ConsumeCapacity(client)
assert.Equal(t, 0, len(erl.capacityByClient[client]))
assert.Equal(t, 1, len(erl.sharedCapacity))
assert.Error(t, err)

erl.DisableCongestionControl()
_, err = erl.ConsumeCapacity(client)
_, _, err = erl.ConsumeCapacity(client)
assert.Equal(t, 0, len(erl.capacityByClient[client]))
assert.Equal(t, 0, len(erl.sharedCapacity))
assert.NoError(t, err)
Expand All @@ -85,14 +85,14 @@ func TestReservations(t *testing.T) {
client2 := mockClient("client2")
erl := NewElasticRateLimiter(4, 1, time.Second, nil)

_, err := erl.ConsumeCapacity(client1)
_, _, err := erl.ConsumeCapacity(client1)
// because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share,
// wait a moment before testing the size of the sharedCapacity
time.Sleep(100 * time.Millisecond)
assert.Equal(t, 1, len(erl.capacityByClient))
assert.NoError(t, err)

_, err = erl.ConsumeCapacity(client2)
_, _, err = erl.ConsumeCapacity(client2)
// because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share,
// wait a moment before testing the size of the sharedCapacity
time.Sleep(100 * time.Millisecond)
Expand All @@ -110,31 +110,33 @@ func TestConsumeReleaseCapacity(t *testing.T) {
client := mockClient("client")
erl := NewElasticRateLimiter(4, 3, time.Second, nil)

c1, err := erl.ConsumeCapacity(client)
c1, en, err := erl.ConsumeCapacity(client)
// because the ERL gives capacity to a reservation, and then asynchronously drains capacity from the share,
// wait a moment before testing the size of the sharedCapacity
time.Sleep(100 * time.Millisecond)
assert.Equal(t, 2, len(erl.capacityByClient[client]))
assert.Equal(t, 1, len(erl.sharedCapacity))
assert.NoError(t, err)
assert.False(t, en)

_, err = erl.ConsumeCapacity(client)
_, _, err = erl.ConsumeCapacity(client)
assert.Equal(t, 1, len(erl.capacityByClient[client]))
assert.Equal(t, 1, len(erl.sharedCapacity))
assert.NoError(t, err)

_, err = erl.ConsumeCapacity(client)
_, _, err = erl.ConsumeCapacity(client)
assert.Equal(t, 0, len(erl.capacityByClient[client]))
assert.Equal(t, 1, len(erl.sharedCapacity))
assert.NoError(t, err)

// remember this capacity, as it is a shared capacity
c4, err := erl.ConsumeCapacity(client)
c4, en, err := erl.ConsumeCapacity(client)
assert.Equal(t, 0, len(erl.capacityByClient[client]))
assert.Equal(t, 0, len(erl.sharedCapacity))
assert.NoError(t, err)
assert.False(t, en)

_, err = erl.ConsumeCapacity(client)
_, _, err = erl.ConsumeCapacity(client)
assert.Equal(t, 0, len(erl.capacityByClient[client]))
assert.Equal(t, 0, len(erl.sharedCapacity))
assert.Error(t, err)
Expand Down

0 comments on commit 4b034e7

Please sign in to comment.