Skip to content

Commit

Permalink
Connected chain2head protobuf
Browse files Browse the repository at this point in the history
  • Loading branch information
0xsharma committed Dec 2, 2021
1 parent e6235a8 commit 4488927
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 131 deletions.
207 changes: 108 additions & 99 deletions command/server/proto/server.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions command/server/proto/server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ message ChainWatchRequest {
message ChainWatchResponse {
repeated BlockStub oldchain = 1;
repeated BlockStub newchain = 2;
string type = 3;
}

message BlockStub {
Expand Down
2 changes: 2 additions & 0 deletions command/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/ethstats"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/graphql"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
Expand All @@ -38,6 +39,7 @@ type Server struct {
backend *eth.Ethereum
grpcServer *grpc.Server
tracer *sdktrace.TracerProvider
headSub event.Subscription
}

func NewServer(config *Config) (*Server, error) {
Expand Down
32 changes: 26 additions & 6 deletions command/server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/command/server/pprof"
"github.com/ethereum/go-ethereum/command/server/proto"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
)
Expand Down Expand Up @@ -109,17 +110,36 @@ func (s *Server) ChainSetHead(ctx context.Context, req *proto.ChainSetHeadReques
return &proto.ChainSetHeadResponse{}, nil
}

func ConvertBlockToBlockStub(blocks []*types.Block) []*proto.BlockStub {

var blockStubs []*proto.BlockStub

for _, block := range blocks {
blockStub := &proto.BlockStub{
Hash: block.Hash().String(),
Number: block.NumberU64(),
}
blockStubs = append(blockStubs, blockStub)
}

return blockStubs
}

func (s *Server) ChainWatch(req *proto.ChainWatchRequest, reply proto.Bor_ChainWatchServer) error {
// 1. start the feed to the blcokchain events
// 2. for each event send a proto.ChainWatchResponse

chainHeadCh := make(chan core.ChainHeadEvent, chainHeadChanSize)
s.headSub = s.backend.SubscribeChainHeadEvent(chainHeadCh)
chain2HeadChanSize := 10

chain2HeadCh := make(chan core.Chain2HeadEvent, chain2HeadChanSize)
s.headSub = s.backend.APIBackend.SubscribeChain2HeadEvent(chain2HeadCh)

for {
msg := <-chainHeadCh
reply.Send(&proto.ChainWatchResponse{})
msg := <-chain2HeadCh
fmt.Print(msg)
reply.Send(&proto.ChainWatchResponse{Type: msg.Type,
Newchain: ConvertBlockToBlockStub(msg.NewChain),
Oldchain: ConvertBlockToBlockStub(msg.OldChain),
})
}

return nil
}
53 changes: 28 additions & 25 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,16 @@ type BlockChain struct {
// * nil: disable tx reindexer/deleter, but still index new blocks
txLookupLimit uint64

hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block
hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
chain2HeadFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block

// This mutex synchronizes chain write operations.
// Readers don't need to take it, they can just read the database.
Expand Down Expand Up @@ -1641,22 +1642,20 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
bc.stateSyncFeed.Send(StateSyncEvent{Data: data})
}

// ...
// bc.chain2HeadFeed.Send(ChainHeadEvent2{
// Type: "head",
// NewChain: []{block}
// })
bc.chain2HeadFeed.Send(Chain2HeadEvent{
Type: "head",
NewChain: []*types.Block{block},
})

// BOR
}
} else {
bc.chainSideFeed.Send(ChainSideEvent{Block: block})

// ...
// bc.chain2HeadFeed.Send(ChainHeadEvent2{
// Type: "fork",
// NewChain: []{block}
// })
bc.chain2HeadFeed.Send(Chain2HeadEvent{
Type: "fork",
NewChain: []*types.Block{block},
})
}
return status, nil
}
Expand Down Expand Up @@ -2276,12 +2275,11 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// Ensure the user sees large reorgs
if len(oldChain) > 0 && len(newChain) > 0 {

// ...
// bc.chain2HeadFeed.Send(ChainHeadEvent2{
// Type: "reorg",
// NewChain: newChain,
// OldChain: oldChain,
// })
bc.chain2HeadFeed.Send(Chain2HeadEvent{
Type: "reorg",
NewChain: newChain,
OldChain: oldChain,
})

logFn := log.Info
msg := "Chain reorg detected"
Expand Down Expand Up @@ -2591,6 +2589,11 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su
return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch))
}

// SubscribeChain2HeadEvent registers a subscription of ChainHeadEvent. ()
func (bc *BlockChain) SubscribeChain2HeadEvent(ch chan<- Chain2HeadEvent) event.Subscription {
return bc.scope.Track(bc.chain2HeadFeed.Subscribe(ch))
}

// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
return bc.scope.Track(bc.chainSideFeed.Subscribe(ch))
Expand Down
6 changes: 6 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,9 @@ type ChainSideEvent struct {
}

type ChainHeadEvent struct{ Block *types.Block }

type Chain2HeadEvent struct {
NewChain []*types.Block
OldChain []*types.Block
Type string
}
4 changes: 4 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ func (b *EthAPIBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) e
return b.eth.BlockChain().SubscribeChainHeadEvent(ch)
}

func (b *EthAPIBackend) SubscribeChain2HeadEvent(ch chan<- core.Chain2HeadEvent) event.Subscription {
return b.eth.BlockChain().SubscribeChain2HeadEvent(ch)
}

func (b *EthAPIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
return b.eth.BlockChain().SubscribeChainSideEvent(ch)
}
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ github.com/Azure/go-autorest/autorest/date v0.2.0 h1:yW+Zlqf26583pE43KhfnhFcdmSW
github.com/Azure/go-autorest/autorest/date v0.2.0/go.mod h1:vcORJHLJEh643/Ioh9+vPmf1Ij9AEBM5FuBIXLmIy0g=
github.com/Azure/go-autorest/autorest/mocks v0.1.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0=
github.com/Azure/go-autorest/autorest/mocks v0.2.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0=
github.com/Azure/go-autorest/autorest/mocks v0.3.0 h1:qJumjCaCudz+OcqE9/XtEPfvtOjOmKaui4EOpFI6zZc=
github.com/Azure/go-autorest/autorest/mocks v0.3.0/go.mod h1:a8FDP3DYzQ4RYfVAxAN3SVSiiO77gL2j2ronKKP0syM=
github.com/Azure/go-autorest/logger v0.1.0 h1:ruG4BSDXONFRrZZJ2GUXDiUyVpayPmb1GnWeHDdaNKY=
github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc=
Expand Down

0 comments on commit 4488927

Please sign in to comment.