Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continue events listening after blockchain RPC unavailability #99

Merged
merged 5 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 78 additions & 27 deletions blockchain/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package blockchain

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/cenkalti/backoff"
gsrpc "github.com/centrifuge/go-substrate-rpc-client/v4"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/exec"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/parser"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/retriever"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/state"
Expand All @@ -15,6 +20,8 @@ import (
"github.com/cerebellum-network/cere-ddc-sdk-go/blockchain/pallets"
)

var errCancelled = errors.New("cancelled")

type EventsListener func(events []*parser.Event, blockNumber types.BlockNumber, blockHash types.Hash)

type Client struct {
Expand Down Expand Up @@ -71,7 +78,14 @@ func (c *Client) StartEventsListening(
return nil, nil, fmt.Errorf("subscribe new heads: %w", err)
}

retriever, err := retriever.NewDefaultEventRetriever(state.NewEventProvider(c.RPC.State), c.RPC.State)
retriever, err := retriever.NewEventRetriever(
parser.NewEventParser(),
state.NewEventProvider(c.RPC.State),
c.RPC.State,
registry.NewFactory(),
exec.NewRetryableExecutor[*types.StorageDataRaw](exec.WithMaxRetryCount(0)),
exec.NewRetryableExecutor[[]*parser.Event](exec.WithMaxRetryCount(0)),
)
if err != nil {
return nil, nil, fmt.Errorf("event retriever: %w", err)
}
Expand All @@ -92,25 +106,33 @@ func (c *Client) StartEventsListening(

firstLiveHeader := <-live // the first live header is the last historical header

for block := beginBlock; block < firstLiveHeader.Number; block++ {
blockHash, err := c.RPC.Chain.GetBlockHash(uint64(block))
for block := beginBlock; block < firstLiveHeader.Number; {
var header *types.Header
err := retryUntilCancelled(func() error {
blockHash, err := c.RPC.Chain.GetBlockHash(uint64(block))
if err != nil {
c.errsListening <- fmt.Errorf("get historical block hash: %w", err)
return err
}

header, err = c.RPC.Chain.GetHeader(blockHash)
if err != nil {
c.errsListening <- fmt.Errorf("get historical header: %w", err)
return err
}

return nil
}, &cancelled)
if err != nil {
c.errsListening <- fmt.Errorf("get historical block hash: %w", err)
return
}

header, err := c.RPC.Chain.GetHeader(blockHash)
if err != nil {
c.errsListening <- fmt.Errorf("get historical header: %w", err)
return
if err == errCancelled {
return
}
continue
}

hist <- *header

// Graceful stop finishes with the block before exiting.
if cancelled.Load().(bool) {
return
}
block++
}

hist <- firstLiveHeader
Expand All @@ -123,12 +145,12 @@ func (c *Client) StartEventsListening(
defer wg.Done()
defer close(headersC)

for set := range hist {
headersC <- set
for header := range hist {
headersC <- header
}

for set := range live {
headersC <- set
for header := range live {
headersC <- header
}
}(histHeadersC, liveHeadersC, headersC)

Expand All @@ -144,15 +166,25 @@ func (c *Client) StartEventsListening(
continue
}

hash, err := c.RPC.Chain.GetBlockHash(uint64(header.Number))
if err != nil {
c.errsListening <- fmt.Errorf("get block hash: %w", err)
return
}

events, err := retriever.GetEvents(hash)
var hash types.Hash
var events []*parser.Event
err := retryUntilCancelled(func() error {
var err error
hash, err = c.RPC.Chain.GetBlockHash(uint64(header.Number))
if err != nil {
c.errsListening <- fmt.Errorf("get block hash: %w", err)
return err
}

events, err = retriever.GetEvents(hash)
if err != nil {
c.errsListening <- fmt.Errorf("events retriever: %w", err)
return err
}

return nil
}, &cancelled)
if err != nil {
c.errsListening <- fmt.Errorf("events retriever: %w", err)
continue
}

Expand All @@ -165,7 +197,9 @@ func (c *Client) StartEventsListening(
}(headersC, eventsC)

// Invoke listeners.
wg.Add(1)
go func(eventsC <-chan blockEvents) {
defer wg.Done()
for blockEvents := range eventsC {
for callback := range c.eventsListeners {
(*callback)(blockEvents.Events, blockEvents.Number, blockEvents.Hash)
Expand Down Expand Up @@ -212,3 +246,20 @@ type blockEvents struct {
Hash types.Hash
Number types.BlockNumber
}

func retryUntilCancelled(f func() error, cancelled *atomic.Value) error {
expbackoff := backoff.NewExponentialBackOff()
expbackoff.MaxElapsedTime = 0 // never stop
expbackoff.InitialInterval = 10 * time.Second
expbackoff.Multiplier = 2
expbackoff.MaxInterval = 10 * time.Minute

ff := func() error {
if cancelled.Load().(bool) {
return backoff.Permanent(errCancelled)
}
return f()
}

return backoff.Retry(ff, expbackoff)
}
5 changes: 4 additions & 1 deletion blockchain/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ module github.com/cerebellum-network/cere-ddc-sdk-go/blockchain

go 1.18

require github.com/centrifuge/go-substrate-rpc-client/v4 v4.2.1
require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/centrifuge/go-substrate-rpc-client/v4 v4.2.1
)

require (
github.com/ChainSafe/go-schnorrkel v1.1.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions blockchain/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migc
github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA=
github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k=
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce h1:YtWJF7RHm2pYCvA5t0RPmAaLUhREsKuKd+SLhxFbFeQ=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cosmos/go-bip39 v1.0.0 h1:pcomnQdrdH22njcAatO0yWojsUnCO3y2tNoV1cb6hHY=
github.com/cosmos/go-bip39 v1.0.0/go.mod h1:RNJv0H/pOIVgxw6KS7QeX2a0Uo0aKUlfhZ4xuwvCdJw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
Loading