Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events listener error #100

Merged
merged 3 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 89 additions & 126 deletions blockchain/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,28 @@ package blockchain

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/cenkalti/backoff"
gsrpc "github.com/centrifuge/go-substrate-rpc-client/v4"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/exec"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/parser"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/retriever"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/state"
"github.com/centrifuge/go-substrate-rpc-client/v4/types"
"golang.org/x/sync/errgroup"

"github.com/cerebellum-network/cere-ddc-sdk-go/blockchain/pallets"
)

var errCancelled = errors.New("cancelled")

type EventsListener func(events []*parser.Event, blockNumber types.BlockNumber, blockHash types.Hash)
type EventsListener func(events []*parser.Event, blockNumber types.BlockNumber, blockHash types.Hash) error

type Client struct {
*gsrpc.SubstrateAPI

mu sync.Mutex
eventsListeners map[*EventsListener]struct{}

isListening uint32
cancelListening func()
errsListening chan error

DdcClusters pallets.DdcClustersApi
DdcCustomers pallets.DdcCustomersApi
DdcNodes pallets.DdcNodesApi
Expand All @@ -60,22 +50,20 @@ func NewClient(url string) (*Client, error) {
}, nil
}

// StartEventsListening subscribes for blockchain events and passes events starting from the
// 'begin' block to registered events listeners. Listeners registered after this call will only
// receive live events meaning all listeners which need historical events from 'begin' block
// should be registered at the moment of calling this function. The 'afterBlock' callback is
// invoked after all registered events listeners are already invoked.
func (c *Client) StartEventsListening(
// ListenEvents listens for blockchain events and sequentially calls registered events listeners to
// process incoming events. It starts from the block begin and calls callback after when all events
// listeners already called on a block events.
//
// ListenEvents always returns a non-nil error from a registered events listener or a callback
// after.
func (c *Client) ListenEvents(
ctx context.Context,
begin types.BlockNumber,
after func(blockNumber types.BlockNumber, blockHash types.Hash),
) (context.CancelFunc, <-chan error, error) {
if !atomic.CompareAndSwapUint32(&c.isListening, 0, 1) {
return c.cancelListening, c.errsListening, nil
}

after func(blockNumber types.BlockNumber, blockHash types.Hash) error,
) error {
sub, err := c.RPC.Chain.SubscribeNewHeads()
if err != nil {
return nil, nil, fmt.Errorf("subscribe new heads: %w", err)
return err
}

retriever, err := retriever.NewEventRetriever(
Expand All @@ -87,142 +75,134 @@ func (c *Client) StartEventsListening(
exec.NewRetryableExecutor[[]*parser.Event](exec.WithMaxRetryCount(0)),
)
if err != nil {
return nil, nil, fmt.Errorf("event retriever: %w", err)
return err
}

c.errsListening = make(chan error)
g, ctx := errgroup.WithContext(ctx)

liveHeadersC := sub.Chan()
histHeadersC := make(chan types.Header)
var wg sync.WaitGroup
go func() {
<-ctx.Done()
sub.Unsubscribe()
}()

// Query historical headers.
var cancelled atomic.Value
cancelled.Store(false)
wg.Add(1)
go func(beginBlock types.BlockNumber, live <-chan types.Header, hist chan types.Header) {
defer wg.Done()
defer close(hist)

firstLiveHeader := <-live // the first live header is the last historical header

for block := beginBlock; block < firstLiveHeader.Number; {
var header *types.Header
err := retryUntilCancelled(func() error {
blockHash, err := c.RPC.Chain.GetBlockHash(uint64(block))
if err != nil {
c.errsListening <- fmt.Errorf("get historical block hash: %w", err)
return err
}
histHeadersC := make(chan types.Header)
g.Go(func() error {
defer close(histHeadersC)

header, err = c.RPC.Chain.GetHeader(blockHash)
if err != nil {
c.errsListening <- fmt.Errorf("get historical header: %w", err)
return err
}
firstLiveHeader, ok := <-liveHeadersC // first live header will be the last historical
if !ok {
return ctx.Err()
}

return nil
}, &cancelled)
for block := begin; block < firstLiveHeader.Number; block++ {
blockHash, err := c.RPC.Chain.GetBlockHash(uint64(block))
if err != nil {
if err == errCancelled {
return
}
continue
return err
}

header, err := c.RPC.Chain.GetHeader(blockHash)
if err != nil {
return err
}

hist <- *header
select {
case <-ctx.Done():
return ctx.Err()
case histHeadersC <- *header:
}
}

block++
select {
case <-ctx.Done():
return ctx.Err()
case histHeadersC <- firstLiveHeader:
}

hist <- firstLiveHeader
}(begin, liveHeadersC, histHeadersC)
return nil
})

// Sequence historical and live headers.
headersC := make(chan types.Header)
wg.Add(1)
go func(hist, live <-chan types.Header, headersC chan types.Header) {
defer wg.Done()
g.Go(func() error {
defer close(headersC)

for header := range hist {
headersC <- header
for header := range histHeadersC {
select {
case <-ctx.Done():
return ctx.Err()
case headersC <- header:
}
}

for header := range live {
headersC <- header
for header := range liveHeadersC {
select {
case <-ctx.Done():
return ctx.Err()
case headersC <- header:
}
}
}(histHeadersC, liveHeadersC, headersC)

return nil
})

// Retrieve events skipping blocks before 'begin'.
eventsC := make(chan blockEvents)
wg.Add(1)
go func(headersC <-chan types.Header, eventsC chan blockEvents) {
defer wg.Done()
g.Go(func() error {
defer close(eventsC)

for header := range headersC {
if header.Number < begin {
continue
}

var hash types.Hash
var events []*parser.Event
err := retryUntilCancelled(func() error {
var err error
hash, err = c.RPC.Chain.GetBlockHash(uint64(header.Number))
if err != nil {
c.errsListening <- fmt.Errorf("get block hash: %w", err)
return err
}

events, err = retriever.GetEvents(hash)
if err != nil {
c.errsListening <- fmt.Errorf("events retriever: %w", err)
return err
}
hash, err := c.RPC.Chain.GetBlockHash(uint64(header.Number))
if err != nil {
return err
}

return nil
}, &cancelled)
events, err := retriever.GetEvents(hash)
if err != nil {
continue
return err
}

eventsC <- blockEvents{
select {
case <-ctx.Done():
return ctx.Err()
case eventsC <- blockEvents{
Events: events,
Hash: hash,
Number: header.Number,
}:
}
}
}(headersC, eventsC)

return nil
})

// Invoke listeners.
wg.Add(1)
go func(eventsC <-chan blockEvents) {
defer wg.Done()
g.Go(func() error {
for blockEvents := range eventsC {
for callback := range c.eventsListeners {
(*callback)(blockEvents.Events, blockEvents.Number, blockEvents.Hash)
err := (*callback)(blockEvents.Events, blockEvents.Number, blockEvents.Hash)
if err != nil {
return err
}
}

if after != nil {
after(blockEvents.Number, blockEvents.Hash)
err := after(blockEvents.Number, blockEvents.Hash)
if err != nil {
return err
}
}
}
}(eventsC)

once := sync.Once{}
c.cancelListening = func() {
once.Do(func() {
sub.Unsubscribe()
cancelled.Store(true)
wg.Wait()
close(c.errsListening)
c.isListening = 0
})
}
return ctx.Err()
})

return c.cancelListening, c.errsListening, nil
return g.Wait()
}

// RegisterEventsListener subscribes given callback to blockchain events.
Expand All @@ -246,20 +226,3 @@ type blockEvents struct {
Hash types.Hash
Number types.BlockNumber
}

func retryUntilCancelled(f func() error, cancelled *atomic.Value) error {
expbackoff := backoff.NewExponentialBackOff()
expbackoff.MaxElapsedTime = 0 // never stop
expbackoff.InitialInterval = 10 * time.Second
expbackoff.Multiplier = 2
expbackoff.MaxInterval = 10 * time.Minute

ff := func() error {
if cancelled.Load().(bool) {
return backoff.Permanent(errCancelled)
}
return f()
}

return backoff.Retry(ff, expbackoff)
}
2 changes: 1 addition & 1 deletion blockchain/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module github.com/cerebellum-network/cere-ddc-sdk-go/blockchain
go 1.18

require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/centrifuge/go-substrate-rpc-client/v4 v4.2.1
golang.org/x/sync v0.7.0
)

require (
Expand Down
4 changes: 2 additions & 2 deletions blockchain/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migc
github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA=
github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k=
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce h1:YtWJF7RHm2pYCvA5t0RPmAaLUhREsKuKd+SLhxFbFeQ=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cosmos/go-bip39 v1.0.0 h1:pcomnQdrdH22njcAatO0yWojsUnCO3y2tNoV1cb6hHY=
github.com/cosmos/go-bip39 v1.0.0/go.mod h1:RNJv0H/pOIVgxw6KS7QeX2a0Uo0aKUlfhZ4xuwvCdJw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -64,6 +62,8 @@ golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 h1:hNQpMuAJe5CtcUqCXaWga3FHu
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08=
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
Expand Down
Loading