diff --git a/blockchain/client.go b/blockchain/client.go index 45d180e..0d57eae 100644 --- a/blockchain/client.go +++ b/blockchain/client.go @@ -2,11 +2,16 @@ package blockchain import ( "context" + "errors" "fmt" "sync" "sync/atomic" + "time" + "github.com/cenkalti/backoff" gsrpc "github.com/centrifuge/go-substrate-rpc-client/v4" + "github.com/centrifuge/go-substrate-rpc-client/v4/registry" + "github.com/centrifuge/go-substrate-rpc-client/v4/registry/exec" "github.com/centrifuge/go-substrate-rpc-client/v4/registry/parser" "github.com/centrifuge/go-substrate-rpc-client/v4/registry/retriever" "github.com/centrifuge/go-substrate-rpc-client/v4/registry/state" @@ -15,6 +20,8 @@ import ( "github.com/cerebellum-network/cere-ddc-sdk-go/blockchain/pallets" ) +var errCancelled = errors.New("cancelled") + type EventsListener func(events []*parser.Event, blockNumber types.BlockNumber, blockHash types.Hash) type Client struct { @@ -71,7 +78,14 @@ func (c *Client) StartEventsListening( return nil, nil, fmt.Errorf("subscribe new heads: %w", err) } - retriever, err := retriever.NewDefaultEventRetriever(state.NewEventProvider(c.RPC.State), c.RPC.State) + retriever, err := retriever.NewEventRetriever( + parser.NewEventParser(), + state.NewEventProvider(c.RPC.State), + c.RPC.State, + registry.NewFactory(), + exec.NewRetryableExecutor[*types.StorageDataRaw](exec.WithMaxRetryCount(0)), + exec.NewRetryableExecutor[[]*parser.Event](exec.WithMaxRetryCount(0)), + ) if err != nil { return nil, nil, fmt.Errorf("event retriever: %w", err) } @@ -92,25 +106,33 @@ func (c *Client) StartEventsListening( firstLiveHeader := <-live // the first live header is the last historical header - for block := beginBlock; block < firstLiveHeader.Number; block++ { - blockHash, err := c.RPC.Chain.GetBlockHash(uint64(block)) + for block := beginBlock; block < firstLiveHeader.Number; { + var header *types.Header + err := retryUntilCancelled(func() error { + blockHash, err := c.RPC.Chain.GetBlockHash(uint64(block)) + if err != nil { + c.errsListening <- fmt.Errorf("get historical block hash: %w", err) + return err + } + + header, err = c.RPC.Chain.GetHeader(blockHash) + if err != nil { + c.errsListening <- fmt.Errorf("get historical header: %w", err) + return err + } + + return nil + }, &cancelled) if err != nil { - c.errsListening <- fmt.Errorf("get historical block hash: %w", err) - return - } - - header, err := c.RPC.Chain.GetHeader(blockHash) - if err != nil { - c.errsListening <- fmt.Errorf("get historical header: %w", err) - return + if err == errCancelled { + return + } + continue } hist <- *header - // Graceful stop finishes with the block before exiting. - if cancelled.Load().(bool) { - return - } + block++ } hist <- firstLiveHeader @@ -123,12 +145,12 @@ func (c *Client) StartEventsListening( defer wg.Done() defer close(headersC) - for set := range hist { - headersC <- set + for header := range hist { + headersC <- header } - for set := range live { - headersC <- set + for header := range live { + headersC <- header } }(histHeadersC, liveHeadersC, headersC) @@ -144,15 +166,25 @@ func (c *Client) StartEventsListening( continue } - hash, err := c.RPC.Chain.GetBlockHash(uint64(header.Number)) - if err != nil { - c.errsListening <- fmt.Errorf("get block hash: %w", err) - return - } - - events, err := retriever.GetEvents(hash) + var hash types.Hash + var events []*parser.Event + err := retryUntilCancelled(func() error { + var err error + hash, err = c.RPC.Chain.GetBlockHash(uint64(header.Number)) + if err != nil { + c.errsListening <- fmt.Errorf("get block hash: %w", err) + return err + } + + events, err = retriever.GetEvents(hash) + if err != nil { + c.errsListening <- fmt.Errorf("events retriever: %w", err) + return err + } + + return nil + }, &cancelled) if err != nil { - c.errsListening <- fmt.Errorf("events retriever: %w", err) continue } @@ -165,7 +197,9 @@ func (c *Client) StartEventsListening( }(headersC, eventsC) // Invoke listeners. + wg.Add(1) go func(eventsC <-chan blockEvents) { + defer wg.Done() for blockEvents := range eventsC { for callback := range c.eventsListeners { (*callback)(blockEvents.Events, blockEvents.Number, blockEvents.Hash) @@ -212,3 +246,20 @@ type blockEvents struct { Hash types.Hash Number types.BlockNumber } + +func retryUntilCancelled(f func() error, cancelled *atomic.Value) error { + expbackoff := backoff.NewExponentialBackOff() + expbackoff.MaxElapsedTime = 0 // never stop + expbackoff.InitialInterval = 10 * time.Second + expbackoff.Multiplier = 2 + expbackoff.MaxInterval = 10 * time.Minute + + ff := func() error { + if cancelled.Load().(bool) { + return backoff.Permanent(errCancelled) + } + return f() + } + + return backoff.Retry(ff, expbackoff) +} diff --git a/blockchain/go.mod b/blockchain/go.mod index c939806..f75bd98 100644 --- a/blockchain/go.mod +++ b/blockchain/go.mod @@ -2,7 +2,10 @@ module github.com/cerebellum-network/cere-ddc-sdk-go/blockchain go 1.18 -require github.com/centrifuge/go-substrate-rpc-client/v4 v4.2.1 +require ( + github.com/cenkalti/backoff v2.2.1+incompatible + github.com/centrifuge/go-substrate-rpc-client/v4 v4.2.1 +) require ( github.com/ChainSafe/go-schnorrkel v1.1.0 // indirect diff --git a/blockchain/go.sum b/blockchain/go.sum index 8721e6f..43188f8 100644 --- a/blockchain/go.sum +++ b/blockchain/go.sum @@ -6,6 +6,8 @@ github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migc github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k= github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce h1:YtWJF7RHm2pYCvA5t0RPmAaLUhREsKuKd+SLhxFbFeQ= +github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= +github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cosmos/go-bip39 v1.0.0 h1:pcomnQdrdH22njcAatO0yWojsUnCO3y2tNoV1cb6hHY= github.com/cosmos/go-bip39 v1.0.0/go.mod h1:RNJv0H/pOIVgxw6KS7QeX2a0Uo0aKUlfhZ4xuwvCdJw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=