Skip to content

Commit

Permalink
Merge pull request ethereum#45 from maticnetwork/dev-subscribe-deposits
Browse files Browse the repository at this point in the history
MAT-871: Subscribe to State Sync Events on Bor
  • Loading branch information
jdkanani authored Apr 6, 2020
2 parents c6881be + 7183e21 commit 86a8696
Show file tree
Hide file tree
Showing 15 changed files with 161 additions and 29 deletions.
3 changes: 3 additions & 0 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,9 @@ func (fb *filterBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEve
func (fb *filterBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return fb.bc.SubscribeLogsEvent(ch)
}
func (fb *filterBackend) SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription {
return fb.bc.SubscribeStateEvent(ch)
}

func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 }
func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) {
Expand Down
18 changes: 18 additions & 0 deletions consensus/bor/bor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/maticnetwork/bor/core/vm"
"github.com/maticnetwork/bor/crypto"
"github.com/maticnetwork/bor/ethdb"
"github.com/maticnetwork/bor/event"
"github.com/maticnetwork/bor/internal/ethapi"
"github.com/maticnetwork/bor/log"
"github.com/maticnetwork/bor/params"
Expand Down Expand Up @@ -245,6 +246,8 @@ type Bor struct {
stateReceiverABI abi.ABI
HeimdallClient IHeimdallClient

stateDataFeed event.Feed
scope event.SubscriptionScope
// The fields below are for testing only
fakeDiff bool // Skip difficulty verifications
}
Expand Down Expand Up @@ -1201,6 +1204,16 @@ func (c *Bor) CommitStates(
"txHash", eventRecord.TxHash,
"chainID", eventRecord.ChainID,
)
stateData := types.StateData{
Did: eventRecord.ID,
Contract: eventRecord.Contract,
Data: hex.EncodeToString(eventRecord.Data),
TxHash: eventRecord.TxHash,
}

go func() {
c.stateDataFeed.Send(core.NewStateChangeEvent{StateData: &stateData})
}()

recordBytes, err := rlp.EncodeToBytes(eventRecord)
if err != nil {
Expand All @@ -1226,6 +1239,11 @@ func (c *Bor) CommitStates(
return nil
}

// SubscribeStateEvent registers a subscription of ChainSideEvent.
func (c *Bor) SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription {
return c.scope.Track(c.stateDataFeed.Subscribe(ch))
}

func (c *Bor) SetHeimdallClient(h IHeimdallClient) {
c.HeimdallClient = h
}
Expand Down
8 changes: 7 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sync/atomic"
"time"

"github.com/hashicorp/golang-lru"
"github.com/maticnetwork/bor/common"
"github.com/maticnetwork/bor/common/mclock"
"github.com/maticnetwork/bor/common/prque"
Expand All @@ -42,7 +43,6 @@ import (
"github.com/maticnetwork/bor/params"
"github.com/maticnetwork/bor/rlp"
"github.com/maticnetwork/bor/trie"
"github.com/hashicorp/golang-lru"
)

var (
Expand Down Expand Up @@ -142,6 +142,7 @@ type BlockChain struct {
chainHeadFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
stateDataFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block

Expand Down Expand Up @@ -2177,6 +2178,11 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript
return bc.scope.Track(bc.logsFeed.Subscribe(ch))
}

// SubscribeStateEvent registers a subscription of ChainSideEvent.
func (bc *BlockChain) SubscribeStateEvent(ch chan<- NewStateChangeEvent) event.Subscription {
return bc.scope.Track(bc.stateDataFeed.Subscribe(ch))
}

// SubscribeBlockProcessingEvent registers a subscription of bool where true means
// block processing has started while false means it has stopped.
func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
Expand Down
4 changes: 4 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type ChainEvent struct {
Logs []*types.Log
}

type NewStateChangeEvent struct {
StateData *types.StateData
}

type ChainSideEvent struct {
Block *types.Block
}
Expand Down
8 changes: 8 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ type txdata struct {
Hash *common.Hash `json:"hash" rlp:"-"`
}

// State represents state received from Ethereum Blockchain
type StateData struct {
Did uint64
Contract common.Address
Data string
TxHash common.Hash
}

type txdataMarshaling struct {
AccountNonce hexutil.Uint64
Price *hexutil.Big
Expand Down
6 changes: 6 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/maticnetwork/bor/accounts"
"github.com/maticnetwork/bor/common"
"github.com/maticnetwork/bor/common/math"
"github.com/maticnetwork/bor/consensus/bor"
"github.com/maticnetwork/bor/core"
"github.com/maticnetwork/bor/core/bloombits"
"github.com/maticnetwork/bor/core/rawdb"
Expand Down Expand Up @@ -155,6 +156,11 @@ func (b *EthAPIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) e
return b.eth.BlockChain().SubscribeChainSideEvent(ch)
}

func (b *EthAPIBackend) SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription {
engine := b.eth.Engine()
return engine.(*bor.Bor).SubscribeStateEvent(ch)
}

func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return b.eth.BlockChain().SubscribeLogsEvent(ch)
}
Expand Down
34 changes: 33 additions & 1 deletion eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package filters

import (
"bytes"
"context"
"encoding/json"
"errors"
Expand Down Expand Up @@ -215,7 +216,6 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er
go func() {
headers := make(chan *types.Header)
headersSub := api.events.SubscribeNewHeads(headers)

for {
select {
case h := <-headers:
Expand All @@ -233,6 +233,38 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er
return rpcSub, nil
}

// NewDeposits send a notification each time a new deposit received from bridge.
func (api *PublicFilterAPI) NewDeposits(ctx context.Context, crit ethereum.FilterState) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

rpcSub := notifier.CreateSubscription()
go func() {
stateData := make(chan *types.StateData)
stateDataSub := api.events.SubscribeNewDeposits(stateData)

for {
select {
case h := <-stateData:
if crit.Did == h.Did || bytes.Compare(crit.Contract.Bytes(), h.Contract.Bytes()) == 0 ||
(crit.Did == 0 && crit.Contract == common.Address{}) {
notifier.Notify(rpcSub.ID, h)
}
case <-rpcSub.Err():
stateDataSub.Unsubscribe()
return
case <-notifier.Closed():
stateDataSub.Unsubscribe()
return
}
}
}()

return rpcSub, nil
}

// Logs creates a subscription that fires for all new log that match the given filter criteria.
func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
Expand Down
1 change: 1 addition & 0 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Backend interface {

SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription

Expand Down
49 changes: 42 additions & 7 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ const (
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription

//StateSubscription to listen main chain state
StateSubscription

// LastSubscription keeps track of the last index
LastIndexSubscription
)
Expand All @@ -68,6 +72,8 @@ const (
logsChanSize = 10
// chainEvChanSize is the size of channel listening to ChainEvent.
chainEvChanSize = 10
// stateEvChanSize is the size of channel listening to ChainEvent.
stateEvChanSize = 10
)

var (
Expand All @@ -82,6 +88,7 @@ type subscription struct {
logs chan []*types.Log
hashes chan []common.Hash
headers chan *types.Header
stateData chan *types.StateData
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
}
Expand All @@ -99,15 +106,18 @@ type EventSystem struct {
logsSub event.Subscription // Subscription for new log event
rmLogsSub event.Subscription // Subscription for removed log event
chainSub event.Subscription // Subscription for new chain event
stateSub event.Subscription // Subscription for new state change event
pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event

// Channels
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
logsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
logsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
stateCh chan core.NewStateChangeEvent // Channel to receive deposit state change event

}

// NewEventSystem creates a new manager that listens for event on the given mux,
Expand All @@ -127,19 +137,21 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
stateCh: make(chan core.NewStateChangeEvent, stateEvChanSize),
}

// Subscribe events
m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
m.stateSub = m.backend.SubscribeStateEvent(m.stateCh)
// TODO(rjl493456442): use feed to subscribe pending log event
m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})

// Make sure none of the subscriptions are empty
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil ||
m.pendingLogSub.Closed() {
m.stateSub == nil || m.pendingLogSub.Closed() {
log.Crit("Subscribe for event system failed")
}

Expand Down Expand Up @@ -292,6 +304,23 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: headers,
stateData: make(chan *types.StateData),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}

// SubscribeNewDeposits creates a subscription that writes details about the new state sync events (from mainchain to Bor)
func (es *EventSystem) SubscribeNewDeposits(stateData chan *types.StateData) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: StateSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
stateData: stateData,
installed: make(chan struct{}),
err: make(chan error),
}
Expand Down Expand Up @@ -355,6 +384,10 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
for _, f := range filters[PendingTransactionsSubscription] {
f.hashes <- hashes
}
case core.NewStateChangeEvent:
for _, f := range filters[StateSubscription] {
f.stateData <- e.StateData
}
case core.ChainEvent:
for _, f := range filters[BlocksSubscription] {
f.headers <- e.Block.Header()
Expand Down Expand Up @@ -471,6 +504,8 @@ func (es *EventSystem) eventLoop() {
es.broadcast(index, ev)
case ev := <-es.chainCh:
es.broadcast(index, ev)
case ev := <-es.stateCh:
es.broadcast(index, ev)
case ev, active := <-es.pendingLogSub.Chan():
if !active { // system stopped
return
Expand Down
7 changes: 6 additions & 1 deletion ethclient/ethclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"fmt"
"math/big"

"github.com/maticnetwork/bor"
ethereum "github.com/maticnetwork/bor"
"github.com/maticnetwork/bor/common"
"github.com/maticnetwork/bor/common/hexutil"
"github.com/maticnetwork/bor/core/types"
Expand Down Expand Up @@ -324,6 +324,11 @@ func (ec *Client) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header)
return ec.c.EthSubscribe(ctx, ch, "newHeads")
}

// SubscribeNewDeposit subscribes to new state sync events
func (ec *Client) SubscribeNewDeposit(ctx context.Context, ch chan<- *types.StateData) (ethereum.Subscription, error) {
return ec.c.EthSubscribe(ctx, ch, "newDeposits", nil)
}

// State Access

// NetworkID returns the network ID (also known as the chain ID) for this chain.
Expand Down
Loading

0 comments on commit 86a8696

Please sign in to comment.