Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txHandler: applications rate limiter #5734

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
698221f
txHandler: sliding windows rate limiter
algorandskiy Sep 11, 2023
b780eb2
Add remote IP addr to app idx filtering
algorandskiy Sep 11, 2023
4227408
More tests + linter fixes
algorandskiy Sep 12, 2023
d912853
remove some code duplication
algorandskiy Sep 12, 2023
a142581
Shards per app
algorandskiy Sep 12, 2023
ad931f6
make interval atomic
algorandskiy Sep 12, 2023
21722be
move rate limiter configuration into local config + test
algorandskiy Sep 13, 2023
43ea42e
Fix max size checks for buckets
algorandskiy Sep 14, 2023
39ec627
Add salt to prevent censoring with reduced key size to 8 bytes
algorandskiy Sep 14, 2023
785a229
go benchmarks
algorandskiy Sep 14, 2023
e05d839
Implement LRU eviction
algorandskiy Sep 15, 2023
17127ac
Fix cache size less than number of buckets
algorandskiy Sep 18, 2023
c7e7d0b
Add app limter drop counter
algorandskiy Sep 20, 2023
c30298b
CR fixes
algorandskiy Sep 20, 2023
f5fabd5
Use admission rate instead attempted rate
algorandskiy Sep 20, 2023
7b7d4f6
Revert "Use admission rate instead attempted rate"
algorandskiy Sep 20, 2023
8902be4
Reimplement cache admission
algorandskiy Sep 20, 2023
e5c98d5
CR fixes: config rename, buckets restructure
algorandskiy Sep 22, 2023
70a6ba4
CR: use part of ipv6 address
algorandskiy Sep 22, 2023
a53c07a
start app rate limiting only of congested
algorandskiy Sep 22, 2023
612156a
use rawmsg.Received timestamp for testability and cut some ns of math
algorandskiy Sep 25, 2023
61e0264
use sync.Pool for keys and buckets
algorandskiy Sep 26, 2023
1a00cf4
set TxBacklogAppTxPerSecondRate=100
algorandskiy Sep 26, 2023
3459298
CR fixes
algorandskiy Sep 27, 2023
1e9b73d
perf: upgrade go-deadlock
algorandskiy Sep 27, 2023
d83565b
CR: do not use rawmsg.Received for limiting
algorandskiy Sep 29, 2023
ab25e9b
Merge remote-tracking branch 'upstream/master' into pavel/txhandler-a…
algorandskiy Oct 31, 2023
7d32f6e
config: migrate to v32
algorandskiy Oct 31, 2023
a66abbf
CR: enable app limiter separately with EnableAppTxBacklogRateLimiting
algorandskiy Nov 3, 2023
4ede214
wip: txgroupToKeysDups
algorandskiy Nov 7, 2023
5fc75b6
CR: dedup appids and zeros
algorandskiy Nov 8, 2023
8c4c28b
marginally optimize txnToDigest a bit
algorandskiy Nov 8, 2023
eb977e2
CR: config renaming/comment fix
algorandskiy Nov 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion config/localTemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,17 @@ type Local struct {
// TxBacklogServiceRateWindowSeconds is the window size used to determine the service rate of the txBacklog
TxBacklogServiceRateWindowSeconds int `version[27]:"10"`

// TxBacklogTxRateLimiterMaxSize denotes a max size for the tx rate limiter
// calculated as "a thousand apps on a network of thousand of peers"
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
TxBacklogTxRateLimiterMaxSize int `version[31]:"1048576"`

// TxBacklogReservedCapacityPerPeer determines how much dedicated serving capacity the TxBacklog gives each peer
TxBacklogReservedCapacityPerPeer int `version[27]:"20"`

// EnableTxBacklogRateLimiting controls if a rate limiter and congestion manager shouild be attached to the tx backlog enqueue process
// TxBacklogTxRate determines a target app rate for the tx rate limiter
TxBacklogTxRate int `version[31]:"20"`

algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
// EnableTxBacklogRateLimiting controls if a rate limiter and congestion manager should be attached to the tx backlog enqueue process
// if enabled, the over-all TXBacklog Size will be larger by MAX_PEERS*TxBacklogReservedCapacityPerPeer
EnableTxBacklogRateLimiting bool `version[27]:"false" version[30]:"true"`

Expand Down
2 changes: 2 additions & 0 deletions config/local_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ var defaultLocal = Local{
TxBacklogReservedCapacityPerPeer: 20,
TxBacklogServiceRateWindowSeconds: 10,
TxBacklogSize: 26000,
TxBacklogTxRate: 20,
TxBacklogTxRateLimiterMaxSize: 1048576,
TxIncomingFilterMaxSize: 500000,
TxIncomingFilteringFlags: 1,
TxPoolExponentialIncreaseFactor: 2,
Expand Down
249 changes: 249 additions & 0 deletions data/appRateLimiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
// Copyright (C) 2019-2023 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package data

import (
"encoding/binary"
"sync/atomic"
"time"

"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util"
"golang.org/x/crypto/blake2b"

"github.com/algorand/go-deadlock"
)

const numBuckets = 128

type keyType [8]byte

// appRateLimiter implements a sliding window counter rate limiter for applications.
// It is a shared map with numBuckets of maps each protected by its own mutex.
// Bucket is selected by hashing the application index with a seed (see memhash64).
type appRateLimiter struct {
maxBucketSize uint64
serviceRatePerWindow uint64
serviceRateWindow time.Duration

// seed for hashing application index to bucket
seed uint64
// salt for hashing application index + origin address
salt [16]byte

buckets [numBuckets]map[keyType]*appRateLimiterEntry
mus [numBuckets]deadlock.RWMutex
lrus [numBuckets]*util.List[keyType]
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved

// evictions
// TODO: delete
evictions uint64
evictionTime uint64
}

type appRateLimiterEntry struct {
prev atomic.Uint64
cur atomic.Uint64
interval atomic.Int64 // numeric representation of the current interval value
lruElement *util.ListNode[keyType]
}

// makeAppRateLimiter creates a new appRateLimiter from the parameters:
// maxCacheSize is the maximum number of entries to keep in the cache to keep it memory bounded
// maxAppPeerRate is the maximum number of admitted apps per peer per second
// serviceRateWindow is the service window
func makeAppRateLimiter(maxCacheSize uint64, maxAppPeerRate uint64, serviceRateWindow time.Duration) *appRateLimiter {
// convert target per app rate to per window service rate
serviceRatePerWindow := maxAppPeerRate * uint64(serviceRateWindow/time.Second)
maxBucketSize := maxCacheSize / numBuckets
r := &appRateLimiter{
maxBucketSize: maxBucketSize,
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
serviceRatePerWindow: serviceRatePerWindow,
serviceRateWindow: serviceRateWindow,
seed: crypto.RandUint64(),
}
crypto.RandBytes(r.salt[:])

for i := 0; i < numBuckets; i++ {
r.buckets[i] = make(map[keyType]*appRateLimiterEntry)
r.lrus[i] = util.NewList[keyType]().AllocateFreeNodes(int(maxBucketSize))
}
return r
}

func (r *appRateLimiter) entry(b int, key keyType, curInt int64) (*appRateLimiterEntry, bool) {
r.mus[b].Lock()
defer r.mus[b].Unlock()

if len(r.buckets[b]) >= int(r.maxBucketSize) {
// evict the oldest entry
start := time.Now()
atomic.AddUint64(&r.evictions, 1)

el := r.lrus[b].Back()
delete(r.buckets[b], el.Value)
r.lrus[b].Remove(el)

atomic.AddUint64(&r.evictionTime, uint64(time.Since(start)))
}

entry, ok := r.buckets[b][key]
if ok {
el := entry.lruElement
r.lrus[b].MoveToFront(el)
} else {
el := r.lrus[b].PushFront(key)
entry = &appRateLimiterEntry{lruElement: el}
entry.cur.Store(1)
entry.interval.Store(curInt)
r.buckets[b][key] = entry
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
}
return entry, ok
}

// interval calculates the interval numeric representation based on the given time
func (r *appRateLimiter) interval(now time.Time) int64 {
return now.UnixNano() / int64(r.serviceRateWindow)
}

// fraction calculates the fraction of the interval that is elapsed since the given time
func (r *appRateLimiter) fraction(now time.Time) float64 {
return float64(now.UnixNano()%int64(r.serviceRateWindow)) / float64(r.serviceRateWindow)
}

// shouldDrop returns true if the given transaction group should be dropped based on the
// on the rate for the applications in the group: the entire group is dropped if a single application
// exceeds the rate.
func (r *appRateLimiter) shouldDrop(txgroup []transactions.SignedTxn, origin []byte) bool {
return r.shouldDropInner(txgroup, origin, time.Now())
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
}

// shouldDropInner is the same as shouldDrop but accepts the current time as a parameter
// in order to make it testable
func (r *appRateLimiter) shouldDropInner(txgroup []transactions.SignedTxn, origin []byte, now time.Time) bool {
buckets, keys := txgroupToKeys(txgroup, origin, r.seed, r.salt, numBuckets)
if len(keys) == 0 {
return false
}
return r.shouldDropKeys(buckets, keys, now)
}

func (r *appRateLimiter) shouldDropKeys(buckets []int, keys []keyType, now time.Time) bool {
curInt := r.interval(now)

for i := range keys {
key := keys[i]
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
bucket := buckets[i]
entry, has := r.entry(bucket, key, curInt)
if !has {
// new entry, defaults are provided by entry() function
continue
}

interval := entry.interval.Load()
if interval != curInt {
var val uint64 = 0
if interval == curInt-1 {
// there are continuous intervals, use the previous value
val = entry.cur.Load()
}
entry.prev.Store(val)
entry.cur.Store(1)
entry.interval.Store(curInt)
} else {
entry.cur.Add(1)
}

curFraction := r.fraction(now)
rate := uint64(float64(entry.prev.Load())*(1-curFraction)) + entry.cur.Load()

if rate > r.serviceRatePerWindow {
return true
}
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
}

return false
}

// txgroupToKeys converts txgroup data to keys
func txgroupToKeys(txgroup []transactions.SignedTxn, origin []byte, seed uint64, salt [16]byte, numBuckets int) ([]int, []keyType) {
// there are max 16 * 8 = 128 apps (buckets, keys) per txgroup
// TODO: consider sync.Pool

var keys []keyType
var buckets []int
// since blake2 is a crypto hash function it seems OK to shrink 32 bytes digest down to 8.
// Rationale: we expect thousands of apps sent from thousands of peers,
// so required millions of unique pairs => 8 bytes should be enough.
// The 16 bytes salt makes it harder to find collisions if an adversary attempts to censor
// some app by finding a collision with some app and flood a network with such transactions:
// h(app + relay_ip) = h(app2 + relay_ip).
var buf [8 + 16 + 16]byte // uint64 + 16 bytes of salt + up to 16 bytes of address
txnToDigest := func(appIdx basics.AppIndex) keyType {
binary.LittleEndian.PutUint64(buf[:8], uint64(appIdx))
copy(buf[8:], salt[:])
copied := copy(buf[8+16:], origin)

h := blake2b.Sum256(buf[:8+16+copied])
var key keyType
copy(key[:], h[:len(key)])
return key
}
for i := range txgroup {
if txgroup[i].Txn.Type == protocol.ApplicationCallTx {
appIdx := txgroup[i].Txn.ApplicationID
jasonpaulos marked this conversation as resolved.
Show resolved Hide resolved
// hash appIdx into a bucket, do not use modulo since it could
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
// assign two vanilla (and presumable, popular) apps to the same bucket.
buckets = append(buckets, int(memhash64(uint64(appIdx), seed)%uint64(numBuckets)))
keys = append(keys, txnToDigest(appIdx))
if len(txgroup[i].Txn.ForeignApps) > 0 {
for _, appIdx := range txgroup[i].Txn.ForeignApps {
buckets = append(buckets, int(memhash64(uint64(appIdx), seed)%uint64(numBuckets)))
keys = append(keys, txnToDigest(appIdx))
}
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
return buckets, keys
}

const (
// Constants for multiplication: four random odd 64-bit numbers.
m1 = 16877499708836156737
m2 = 2820277070424839065
m3 = 9497967016996688599
m4 = 15839092249703872147
)

// memhash64 is uint64 hash function from go runtime
// https://go-review.googlesource.com/c/go/+/59352/4/src/runtime/hash64.go#96
func memhash64(val uint64, seed uint64) uint64 {
h := seed
h ^= val
h = rotl31(h*m1) * m2
h ^= h >> 29
h *= m3
h ^= h >> 32
return h
}

func rotl31(x uint64) uint64 {
return (x << 31) | (x >> (64 - 31))
}
Loading