Skip to content

Commit

Permalink
perf(listener): sync postage stamps in batches (#2641)
Browse files Browse the repository at this point in the history
  • Loading branch information
ralph-pichler authored Nov 25, 2021
1 parent 42e3260 commit 6776715
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 26 deletions.
4 changes: 0 additions & 4 deletions .github/patches/listener.patch

This file was deleted.

3 changes: 0 additions & 3 deletions .github/workflows/beekeeper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ jobs:
run: |
patch pkg/postage/batchstore/reserve.go .github/patches/postagereserve.patch
patch pkg/postage/postagecontract/contract.go .github/patches/postagecontract.patch
patch pkg/postage/listener/listener.go .github/patches/listener.patch
patch pkg/postage/service.go .github/patches/postageservice.patch
- name: Prepare local cluster
run: |
Expand Down Expand Up @@ -170,7 +169,6 @@ jobs:
run: |
patch pkg/postage/batchstore/reserve.go .github/patches/postagereserve.patch
patch pkg/postage/postagecontract/contract.go .github/patches/postagecontract.patch
patch pkg/postage/listener/listener.go .github/patches/listener.patch
patch pkg/postage/service.go .github/patches/postageservice.patch
- name: Prepare testing cluster (Node connection and clef enabled)
run: |
Expand Down Expand Up @@ -255,7 +253,6 @@ jobs:
run: |
patch pkg/postage/batchstore/reserve.go .github/patches/postagereserve.patch
patch pkg/postage/postagecontract/contract.go .github/patches/postagecontract.patch
patch pkg/postage/listener/listener.go .github/patches/listener.patch
patch pkg/postage/service.go .github/patches/postageservice.patch
patch pkg/postage/batchstore/reserve.go .github/patches/postagereserve_gc.patch
- name: Prepare testing cluster (storage incentives setup)
Expand Down
3 changes: 2 additions & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ const (
lightRefreshRate = int64(450000)
basePrice = 10000
postageSyncingStallingTimeout = 10 * time.Minute
postageSyncingBackoffTimeout = 5 * time.Second
)

func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, networkID uint64, logger logging.Logger, libp2pPrivateKey, pssPrivateKey *ecdsa.PrivateKey, o *Options) (b *Bee, err error) {
Expand Down Expand Up @@ -469,7 +470,7 @@ func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, netwo
postageSyncStart = startBlock
}

eventListener = listener.New(logger, swapBackend, postageContractAddress, o.BlockTime, &pidKiller{node: b}, postageSyncingStallingTimeout)
eventListener = listener.New(logger, swapBackend, postageContractAddress, o.BlockTime, &pidKiller{node: b}, postageSyncingStallingTimeout, postageSyncingBackoffTimeout)
b.listenerCloser = eventListener

batchSvc, err = batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync)
Expand Down
3 changes: 2 additions & 1 deletion pkg/postage/listener/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ var (
BatchDepthIncreaseTopic = batchDepthIncreaseTopic
PriceUpdateTopic = priceUpdateTopic

TailSize = tailSize
TailSize = tailSize
BatchFactor = batchFactor
)
30 changes: 25 additions & 5 deletions pkg/postage/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (
)

const (
blockPage = 5000 // how many blocks to sync every time we page
tailSize = 4 // how many blocks to tail from the tip of the chain
blockPage = 5000 // how many blocks to sync every time we page
tailSize = 4 // how many blocks to tail from the tip of the chain
batchFactor = uint64(5) // minimal number of blocks to sync at once
)

var (
Expand Down Expand Up @@ -66,6 +67,7 @@ type listener struct {
metrics metrics
shutdowner Shutdowner
stallingTimeout time.Duration
backoffTime time.Duration
}

func New(
Expand All @@ -75,6 +77,7 @@ func New(
blockTime uint64,
shutdowner Shutdowner,
stallingTimeout time.Duration,
backoffTime time.Duration,
) postage.Listener {
return &listener{
logger: logger,
Expand All @@ -85,6 +88,7 @@ func New(
metrics: newMetrics(),
shutdowner: shutdowner,
stallingTimeout: stallingTimeout,
backoffTime: backoffTime,
}
}

Expand Down Expand Up @@ -174,14 +178,13 @@ func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan stru
cancel()
}()

chainUpdateInterval := (time.Duration(l.blockTime) * time.Second) / 2

synced := make(chan struct{})
closeOnce := new(sync.Once)
paged := make(chan struct{}, 1)
paged <- struct{}{}

lastProgress := time.Now()
lastConfirmedBlock := uint64(0)

l.wg.Add(1)
listenf := func() error {
Expand All @@ -194,10 +197,21 @@ func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan stru
return ErrPostageSyncingStalled
}

// if we have a last blocknumber from the backend we can make a good estimate on when we need to requery
// otherwise we just use the backoff time
var expectedWaitTime time.Duration
if lastConfirmedBlock != 0 {
nextExpectedBatchBlock := (lastConfirmedBlock/batchFactor + 1) * batchFactor
remainingBlocks := nextExpectedBatchBlock - lastConfirmedBlock
expectedWaitTime = time.Duration(l.blockTime*remainingBlocks) * time.Second
} else {
expectedWaitTime = l.backoffTime
}

select {
case <-paged:
// if we paged then it means there's more things to sync on
case <-time.After(chainUpdateInterval):
case <-time.After(expectedWaitTime):
case <-l.quit:
return nil
}
Expand All @@ -208,6 +222,7 @@ func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan stru
if err != nil {
l.metrics.BackendErrors.Inc()
l.logger.Warningf("listener: could not get block number: %v", err)
lastConfirmedBlock = 0
continue
}

Expand All @@ -218,6 +233,10 @@ func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan stru

// consider to-tailSize as the "latest" block we need to sync to
to = to - tailSize
lastConfirmedBlock = to

// round down to the largest multiple of batchFactor
to = (to / batchFactor) * batchFactor

if to < from {
// if the blockNumber is actually less than what we already, it might mean the backend is not synced or some reorg scenario
Expand All @@ -237,6 +256,7 @@ func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan stru
if err != nil {
l.metrics.BackendErrors.Inc()
l.logger.Warningf("listener: could not get logs: %v", err)
lastConfirmedBlock = 0
continue
}

Expand Down
31 changes: 19 additions & 12 deletions pkg/postage/listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,14 @@ var addr common.Address = common.HexToAddress("abcdef")

var postageStampAddress common.Address = common.HexToAddress("eeee")

const stallingTimeout = 5 * time.Second
const (
stallingTimeout = 5 * time.Second
backoffTime = 5 * time.Second
)

func toBatchBlock(block uint64) uint64 {
return (block / listener.BatchFactor) * listener.BatchFactor
}

func TestListener(t *testing.T) {
logger := logging.New(io.Discard, 0)
Expand All @@ -50,7 +57,7 @@ func TestListener(t *testing.T) {
),
)

l := listener.New(logger, mf, postageStampAddress, 1, nil, stallingTimeout)
l := listener.New(logger, mf, postageStampAddress, 1, nil, stallingTimeout, backoffTime)
l.Listen(0, ev)

select {
Expand Down Expand Up @@ -81,7 +88,7 @@ func TestListener(t *testing.T) {
topup.toLog(496),
),
)
l := listener.New(logger, mf, postageStampAddress, 1, nil, stallingTimeout)
l := listener.New(logger, mf, postageStampAddress, 1, nil, stallingTimeout, backoffTime)
l.Listen(0, ev)

select {
Expand Down Expand Up @@ -112,7 +119,7 @@ func TestListener(t *testing.T) {
depthIncrease.toLog(496),
),
)
l := listener.New(logger, mf, postageStampAddress, 1, nil, stallingTimeout)
l := listener.New(logger, mf, postageStampAddress, 1, nil, stallingTimeout, backoffTime)
l.Listen(0, ev)

select {
Expand Down Expand Up @@ -141,7 +148,7 @@ func TestListener(t *testing.T) {
priceUpdate.toLog(496),
),
)
l := listener.New(logger, mf, postageStampAddress, 1, nil, stallingTimeout)
l := listener.New(logger, mf, postageStampAddress, 1, nil, stallingTimeout, backoffTime)
l.Listen(0, ev)
select {
case e := <-evC:
Expand Down Expand Up @@ -193,7 +200,7 @@ func TestListener(t *testing.T) {
),
WithBlockNumber(blockNumber),
)
l := listener.New(logger, mf, postageStampAddress, 1, nil, stallingTimeout)
l := listener.New(logger, mf, postageStampAddress, 1, nil, stallingTimeout, backoffTime)
l.Listen(0, ev)

select {
Expand Down Expand Up @@ -250,7 +257,7 @@ func TestListener(t *testing.T) {

select {
case e := <-evC:
e.(blockNumberCall).compare(t, blockNumber-uint64(listener.TailSize)) // event args should be equal
e.(blockNumberCall).compare(t, toBatchBlock(blockNumber-uint64(listener.TailSize))) // event args should be equal
case <-time.After(timeout):
t.Fatal("timed out waiting for block number update")
}
Expand All @@ -263,12 +270,12 @@ func TestListener(t *testing.T) {
mf := newMockFilterer(
WithBlockNumberErrorOnce(errors.New("dummy error"), blockNumber),
)
l := listener.New(logger, mf, postageStampAddress, 1, shutdowner, stallingTimeout)
l := listener.New(logger, mf, postageStampAddress, 1, shutdowner, stallingTimeout, 0*time.Second)
l.Listen(0, ev)

select {
case e := <-evC:
e.(blockNumberCall).compare(t, blockNumber-uint64(listener.TailSize)) // event args should be equal
e.(blockNumberCall).compare(t, toBatchBlock(blockNumber-uint64(listener.TailSize))) // event args should be equal
case <-time.After(timeout):
t.Fatal("timed out waiting for block number update")
}
Expand All @@ -280,7 +287,7 @@ func TestListener(t *testing.T) {
mf := newMockFilterer(
WithBlockNumberError(errors.New("dummy error")),
)
l := listener.New(logger, mf, postageStampAddress, 1, shutdowner, 50*time.Millisecond)
l := listener.New(logger, mf, postageStampAddress, 1, shutdowner, 50*time.Millisecond, 0*time.Second)
l.Listen(0, ev)

start := time.Now()
Expand All @@ -302,12 +309,12 @@ func TestListener(t *testing.T) {
mf := newMockFilterer(
WithBlockNumber(blockNumber),
)
l := listener.New(logger, mf, postageStampAddress, 1, shutdowner, stallingTimeout)
l := listener.New(logger, mf, postageStampAddress, 1, shutdowner, stallingTimeout, backoffTime)
l.Listen(0, ev)

select {
case e := <-evC:
e.(blockNumberCall).compare(t, blockNumber-uint64(listener.TailSize)) // event args should be equal
e.(blockNumberCall).compare(t, toBatchBlock(blockNumber-uint64(listener.TailSize))) // event args should be equal
case <-time.After(timeout):
t.Fatal("timed out waiting for block number update")
}
Expand Down

0 comments on commit 6776715

Please sign in to comment.