From 2c26583f6f6a3aaf1132cc89d22d62c3685c91ca Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Tue, 10 May 2022 12:17:44 +0700 Subject: [PATCH] Rename sentry.ControlServerImpl to sentry.MultyClient and sentry.SentryServerImpl to sentry.GrpcServer #444 --- ...pc_server.go => downloader_grpc_server.go} | 0 cmd/integration/commands/stages.go | 2 +- .../commands/send_transaction_test.go | 2 +- cmd/sentry/sentry/broadcast.go | 10 +-- cmd/sentry/sentry/sentry_api.go | 10 +-- .../{sentry.go => sentry_grpc_server.go} | 64 ++++++++--------- ...try_test.go => sentry_grpc_server_test.go} | 8 +-- .../{downloader.go => sentry_multy_client.go} | 42 ++++++----- eth/backend.go | 62 ++++++++-------- ethstats/ethstats.go | 4 +- turbo/stages/mock_sentry.go | 70 +++++++++---------- turbo/stages/stageloop.go | 4 +- 12 files changed, 141 insertions(+), 137 deletions(-) rename cmd/downloader/downloader/{grpc_server.go => downloader_grpc_server.go} (100%) rename cmd/sentry/sentry/{sentry.go => sentry_grpc_server.go} (90%) rename cmd/sentry/sentry/{sentry_test.go => sentry_grpc_server_test.go} (97%) rename cmd/sentry/sentry/{downloader.go => sentry_multy_client.go} (92%) diff --git a/cmd/downloader/downloader/grpc_server.go b/cmd/downloader/downloader/downloader_grpc_server.go similarity index 100% rename from cmd/downloader/downloader/grpc_server.go rename to cmd/downloader/downloader/downloader_grpc_server.go diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 9814a0a5f25..b140819ccf2 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -1186,7 +1186,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig) br := getBlockReader(chainConfig) blockDownloaderWindow := 65536 - sentryControlServer, err := sentry.NewControlServer(db, "", chainConfig, genesisBlock.Hash(), engine, 1, nil, blockDownloaderWindow, br) + sentryControlServer, err := sentry.NewMultyClient(db, "", chainConfig, genesisBlock.Hash(), engine, 1, nil, blockDownloaderWindow, br) if err != nil { panic(err) } diff --git a/cmd/rpcdaemon/commands/send_transaction_test.go b/cmd/rpcdaemon/commands/send_transaction_test.go index 74f29460c94..188a881263a 100644 --- a/cmd/rpcdaemon/commands/send_transaction_test.go +++ b/cmd/rpcdaemon/commands/send_transaction_test.go @@ -98,7 +98,7 @@ func TestSendRawTransaction(t *testing.T) { //TODO: make propagation easy to test - now race //time.Sleep(time.Second) //sent := m.SentMessage(0) - //require.Equal(eth.ToProto[m.SentryClient.Protocol()][eth.NewPooledTransactionHashesMsg], sent.Id) + //require.Equal(eth.ToProto[m.MultyClient.Protocol()][eth.NewPooledTransactionHashesMsg], sent.Id) } func transaction(nonce uint64, gaslimit uint64, key *ecdsa.PrivateKey) types.Transaction { diff --git a/cmd/sentry/sentry/broadcast.go b/cmd/sentry/sentry/broadcast.go index 060fd65bdec..039d0d6f7a1 100644 --- a/cmd/sentry/sentry/broadcast.go +++ b/cmd/sentry/sentry/broadcast.go @@ -28,7 +28,7 @@ const ( maxTxPacketSize = 100 * 1024 ) -func (cs *ControlServerImpl) PropagateNewBlockHashes(ctx context.Context, announces []headerdownload.Announce) { +func (cs *MultyClient) PropagateNewBlockHashes(ctx context.Context, announces []headerdownload.Announce) { cs.lock.RLock() defer cs.lock.RUnlock() typedRequest := make(eth.NewBlockHashesPacket, len(announces)) @@ -72,7 +72,7 @@ func (cs *ControlServerImpl) PropagateNewBlockHashes(ctx context.Context, announ } } -func (cs *ControlServerImpl) BroadcastNewBlock(ctx context.Context, block *types.Block, td *big.Int) { +func (cs *MultyClient) BroadcastNewBlock(ctx context.Context, block *types.Block, td *big.Int) { cs.lock.RLock() defer cs.lock.RUnlock() data, err := rlp.EncodeToBytes(ð.NewBlockPacket{ @@ -116,7 +116,7 @@ func (cs *ControlServerImpl) BroadcastNewBlock(ctx context.Context, block *types } } -func (cs *ControlServerImpl) BroadcastLocalPooledTxs(ctx context.Context, txs []common.Hash) { +func (cs *MultyClient) BroadcastLocalPooledTxs(ctx context.Context, txs []common.Hash) { if len(txs) == 0 { return } @@ -179,7 +179,7 @@ func (cs *ControlServerImpl) BroadcastLocalPooledTxs(ctx context.Context, txs [] } } -func (cs *ControlServerImpl) BroadcastRemotePooledTxs(ctx context.Context, txs []common.Hash) { +func (cs *MultyClient) BroadcastRemotePooledTxs(ctx context.Context, txs []common.Hash) { if len(txs) == 0 { return } @@ -235,7 +235,7 @@ func (cs *ControlServerImpl) BroadcastRemotePooledTxs(ctx context.Context, txs [ } } -func (cs *ControlServerImpl) PropagatePooledTxsToPeersList(ctx context.Context, peers []*types2.H512, txs []common.Hash) { +func (cs *MultyClient) PropagatePooledTxsToPeersList(ctx context.Context, peers []*types2.H512, txs []common.Hash) { if len(txs) == 0 { return } diff --git a/cmd/sentry/sentry/sentry_api.go b/cmd/sentry/sentry/sentry_api.go index 9ba97e3661c..dd5a3a5b1fe 100644 --- a/cmd/sentry/sentry/sentry_api.go +++ b/cmd/sentry/sentry/sentry_api.go @@ -18,7 +18,7 @@ import ( // Methods of sentry called by Core -func (cs *ControlServerImpl) UpdateHead(ctx context.Context, height uint64, hash common.Hash, td *uint256.Int) { +func (cs *MultyClient) UpdateHead(ctx context.Context, height uint64, hash common.Hash, td *uint256.Int) { cs.lock.Lock() defer cs.lock.Unlock() cs.headHeight = height @@ -36,7 +36,7 @@ func (cs *ControlServerImpl) UpdateHead(ctx context.Context, height uint64, hash } } -func (cs *ControlServerImpl) SendBodyRequest(ctx context.Context, req *bodydownload.BodyRequest) (peerID [64]byte, ok bool) { +func (cs *MultyClient) SendBodyRequest(ctx context.Context, req *bodydownload.BodyRequest) (peerID [64]byte, ok bool) { // if sentry not found peers to send such message, try next one. stop if found. for i, ok, next := cs.randSentryIndex(); ok; i, ok = next() { if !cs.sentries[i].Ready() { @@ -78,7 +78,7 @@ func (cs *ControlServerImpl) SendBodyRequest(ctx context.Context, req *bodydownl return [64]byte{}, false } -func (cs *ControlServerImpl) SendHeaderRequest(ctx context.Context, req *headerdownload.HeaderRequest) (peerID [64]byte, ok bool) { +func (cs *MultyClient) SendHeaderRequest(ctx context.Context, req *headerdownload.HeaderRequest) (peerID [64]byte, ok bool) { // if sentry not found peers to send such message, try next one. stop if found. for i, ok, next := cs.randSentryIndex(); ok; i, ok = next() { if !cs.sentries[i].Ready() { @@ -130,7 +130,7 @@ func (cs *ControlServerImpl) SendHeaderRequest(ctx context.Context, req *headerd return [64]byte{}, false } -func (cs *ControlServerImpl) randSentryIndex() (int, bool, func() (int, bool)) { +func (cs *MultyClient) randSentryIndex() (int, bool, func() (int, bool)) { var i int if len(cs.sentries) > 1 { i = rand.Intn(len(cs.sentries) - 1) @@ -143,7 +143,7 @@ func (cs *ControlServerImpl) randSentryIndex() (int, bool, func() (int, bool)) { } // sending list of penalties to all sentries -func (cs *ControlServerImpl) Penalize(ctx context.Context, penalties []headerdownload.PenaltyItem) { +func (cs *MultyClient) Penalize(ctx context.Context, penalties []headerdownload.PenaltyItem) { for i := range penalties { outreq := proto_sentry.PenalizePeerRequest{ PeerId: gointerfaces.ConvertHashToH512(penalties[i].PeerID), diff --git a/cmd/sentry/sentry/sentry.go b/cmd/sentry/sentry/sentry_grpc_server.go similarity index 90% rename from cmd/sentry/sentry/sentry.go rename to cmd/sentry/sentry/sentry_grpc_server.go index fd46ff2a7ab..e3e1c39b7f8 100644 --- a/cmd/sentry/sentry/sentry.go +++ b/cmd/sentry/sentry/sentry_grpc_server.go @@ -478,7 +478,7 @@ func runPeer( } } -func grpcSentryServer(ctx context.Context, sentryAddr string, ss *SentryServerImpl, healthCheck bool) (*grpc.Server, error) { +func grpcSentryServer(ctx context.Context, sentryAddr string, ss *GrpcServer, healthCheck bool) (*grpc.Server, error) { // STARTING GRPC SERVER log.Info("Starting Sentry gRPC server", "on", sentryAddr) listenConfig := net.ListenConfig{ @@ -510,8 +510,8 @@ func grpcSentryServer(ctx context.Context, sentryAddr string, ss *SentryServerIm return grpcServer, nil } -func NewSentryServer(ctx context.Context, dialCandidates enode.Iterator, readNodeInfo func() *eth.NodeInfo, cfg *p2p.Config, protocol uint) *SentryServerImpl { - ss := &SentryServerImpl{ +func NewGrpcServer(ctx context.Context, dialCandidates enode.Iterator, readNodeInfo func() *eth.NodeInfo, cfg *p2p.Config, protocol uint) *GrpcServer { + ss := &GrpcServer{ ctx: ctx, p2p: cfg, peersStreams: NewPeersStreams(), @@ -576,7 +576,7 @@ func NewSentryServer(ctx context.Context, dialCandidates enode.Iterator, readNod // Sentry creates and runs standalone sentry func Sentry(ctx context.Context, datadir string, sentryAddr string, discoveryDNS []string, cfg *p2p.Config, protocolVersion uint, healthCheck bool) error { dir.MustExist(datadir) - sentryServer := NewSentryServer(ctx, nil, func() *eth.NodeInfo { return nil }, cfg, protocolVersion) + sentryServer := NewGrpcServer(ctx, nil, func() *eth.NodeInfo { return nil }, cfg, protocolVersion) sentryServer.discoveryDNS = discoveryDNS grpcServer, err := grpcSentryServer(ctx, sentryAddr, sentryServer, healthCheck) @@ -590,7 +590,7 @@ func Sentry(ctx context.Context, datadir string, sentryAddr string, discoveryDNS return nil } -type SentryServerImpl struct { +type GrpcServer struct { proto_sentry.UnimplementedSentryServer ctx context.Context Protocol p2p.Protocol @@ -607,7 +607,7 @@ type SentryServerImpl struct { p2p *p2p.Config } -func (ss *SentryServerImpl) rangePeers(f func(peerInfo *PeerInfo) bool) { +func (ss *GrpcServer) rangePeers(f func(peerInfo *PeerInfo) bool) { ss.GoodPeers.Range(func(key, value interface{}) bool { peerInfo, _ := value.(*PeerInfo) if peerInfo == nil { @@ -617,7 +617,7 @@ func (ss *SentryServerImpl) rangePeers(f func(peerInfo *PeerInfo) bool) { }) } -func (ss *SentryServerImpl) getPeer(peerID [64]byte) (peerInfo *PeerInfo) { +func (ss *GrpcServer) getPeer(peerID [64]byte) (peerInfo *PeerInfo) { if value, ok := ss.GoodPeers.Load(peerID); ok { peerInfo := value.(*PeerInfo) if peerInfo != nil { @@ -628,7 +628,7 @@ func (ss *SentryServerImpl) getPeer(peerID [64]byte) (peerInfo *PeerInfo) { return nil } -func (ss *SentryServerImpl) removePeer(peerID [64]byte) { +func (ss *GrpcServer) removePeer(peerID [64]byte) { if value, ok := ss.GoodPeers.LoadAndDelete(peerID); ok { peerInfo := value.(*PeerInfo) if peerInfo != nil { @@ -637,7 +637,7 @@ func (ss *SentryServerImpl) removePeer(peerID [64]byte) { } } -func (ss *SentryServerImpl) writePeer(logPrefix string, peerInfo *PeerInfo, msgcode uint64, data []byte, ttl time.Duration) { +func (ss *GrpcServer) writePeer(logPrefix string, peerInfo *PeerInfo, msgcode uint64, data []byte, ttl time.Duration) { peerInfo.Async(func() { err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(data)), Payload: bytes.NewReader(data)}) if err != nil { @@ -652,7 +652,7 @@ func (ss *SentryServerImpl) writePeer(logPrefix string, peerInfo *PeerInfo, msgc }) } -func (ss *SentryServerImpl) startSync(ctx context.Context, bestHash common.Hash, peerID [64]byte) error { +func (ss *GrpcServer) startSync(ctx context.Context, bestHash common.Hash, peerID [64]byte) error { switch ss.Protocol.Version { case eth.ETH66: b, err := rlp.EncodeToBytes(ð.GetBlockHeadersPacket66{ @@ -680,14 +680,14 @@ func (ss *SentryServerImpl) startSync(ctx context.Context, bestHash common.Hash, return nil } -func (ss *SentryServerImpl) PenalizePeer(_ context.Context, req *proto_sentry.PenalizePeerRequest) (*emptypb.Empty, error) { +func (ss *GrpcServer) PenalizePeer(_ context.Context, req *proto_sentry.PenalizePeerRequest) (*emptypb.Empty, error) { //log.Warn("Received penalty", "kind", req.GetPenalty().Descriptor().FullName, "from", fmt.Sprintf("%s", req.GetPeerId())) peerID := ConvertH512ToPeerID(req.PeerId) ss.removePeer(peerID) return &emptypb.Empty{}, nil } -func (ss *SentryServerImpl) PeerMinBlock(_ context.Context, req *proto_sentry.PeerMinBlockRequest) (*emptypb.Empty, error) { +func (ss *GrpcServer) PeerMinBlock(_ context.Context, req *proto_sentry.PeerMinBlockRequest) (*emptypb.Empty, error) { peerID := ConvertH512ToPeerID(req.PeerId) if peerInfo := ss.getPeer(peerID); peerInfo != nil { peerInfo.SetIncreasedHeight(req.MinBlock) @@ -695,7 +695,7 @@ func (ss *SentryServerImpl) PeerMinBlock(_ context.Context, req *proto_sentry.Pe return &emptypb.Empty{}, nil } -func (ss *SentryServerImpl) findPeer(minBlock uint64) (*PeerInfo, bool) { +func (ss *GrpcServer) findPeer(minBlock uint64) (*PeerInfo, bool) { // Choose a peer that we can send this request to, with maximum number of permits var foundPeerInfo *PeerInfo var maxPermits int @@ -717,7 +717,7 @@ func (ss *SentryServerImpl) findPeer(minBlock uint64) (*PeerInfo, bool) { return foundPeerInfo, maxPermits > 0 } -func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *proto_sentry.SendMessageByMinBlockRequest) (*proto_sentry.SentPeers, error) { +func (ss *GrpcServer) SendMessageByMinBlock(_ context.Context, inreq *proto_sentry.SendMessageByMinBlockRequest) (*proto_sentry.SentPeers, error) { reply := &proto_sentry.SentPeers{} msgcode := eth.FromProto[ss.Protocol.Version][inreq.Data.Id] if msgcode != eth.GetBlockHeadersMsg && @@ -738,7 +738,7 @@ func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *prot return reply, lastErr } -func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sentry.SendMessageByIdRequest) (*proto_sentry.SentPeers, error) { +func (ss *GrpcServer) SendMessageById(_ context.Context, inreq *proto_sentry.SendMessageByIdRequest) (*proto_sentry.SentPeers, error) { reply := &proto_sentry.SentPeers{} msgcode := eth.FromProto[ss.Protocol.Version][inreq.Data.Id] if msgcode != eth.GetBlockHeadersMsg && @@ -765,7 +765,7 @@ func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sent return reply, nil } -func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *proto_sentry.SendMessageToRandomPeersRequest) (*proto_sentry.SentPeers, error) { +func (ss *GrpcServer) SendMessageToRandomPeers(ctx context.Context, req *proto_sentry.SendMessageToRandomPeersRequest) (*proto_sentry.SentPeers, error) { reply := &proto_sentry.SentPeers{} msgcode := eth.FromProto[ss.Protocol.Version][req.Data.Id] @@ -798,7 +798,7 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p return reply, lastErr } -func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sentry.OutboundMessageData) (*proto_sentry.SentPeers, error) { +func (ss *GrpcServer) SendMessageToAll(ctx context.Context, req *proto_sentry.OutboundMessageData) (*proto_sentry.SentPeers, error) { reply := &proto_sentry.SentPeers{} msgcode := eth.FromProto[ss.Protocol.Version][req.Id] @@ -817,7 +817,7 @@ func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sen return reply, lastErr } -func (ss *SentryServerImpl) HandShake(context.Context, *emptypb.Empty) (*proto_sentry.HandShakeReply, error) { +func (ss *GrpcServer) HandShake(context.Context, *emptypb.Empty) (*proto_sentry.HandShakeReply, error) { reply := &proto_sentry.HandShakeReply{} switch ss.Protocol.Version { case eth.ETH66: @@ -826,7 +826,7 @@ func (ss *SentryServerImpl) HandShake(context.Context, *emptypb.Empty) (*proto_s return reply, nil } -func (ss *SentryServerImpl) SetStatus(ctx context.Context, statusData *proto_sentry.StatusData) (*proto_sentry.SetStatusReply, error) { +func (ss *GrpcServer) SetStatus(ctx context.Context, statusData *proto_sentry.StatusData) (*proto_sentry.SetStatusReply, error) { genesisHash := gointerfaces.ConvertH256ToHash(statusData.ForkData.Genesis) ss.lock.Lock() @@ -869,7 +869,7 @@ func (ss *SentryServerImpl) SetStatus(ctx context.Context, statusData *proto_sen return reply, nil } -func (ss *SentryServerImpl) Peers(_ context.Context, _ *emptypb.Empty) (*proto_sentry.PeersReply, error) { +func (ss *GrpcServer) Peers(_ context.Context, _ *emptypb.Empty) (*proto_sentry.PeersReply, error) { if ss.P2pServer == nil { return nil, errors.New("p2p server was not started") } @@ -898,7 +898,7 @@ func (ss *SentryServerImpl) Peers(_ context.Context, _ *emptypb.Empty) (*proto_s return &reply, nil } -func (ss *SentryServerImpl) SimplePeerCount() (pc int) { +func (ss *GrpcServer) SimplePeerCount() (pc int) { ss.rangePeers(func(peerInfo *PeerInfo) bool { pc++ return true @@ -906,7 +906,7 @@ func (ss *SentryServerImpl) SimplePeerCount() (pc int) { return pc } -func (ss *SentryServerImpl) PeerCount(_ context.Context, req *proto_sentry.PeerCountRequest) (*proto_sentry.PeerCountReply, error) { +func (ss *GrpcServer) PeerCount(_ context.Context, req *proto_sentry.PeerCountRequest) (*proto_sentry.PeerCountReply, error) { return &proto_sentry.PeerCountReply{Count: uint64(ss.SimplePeerCount())}, nil } @@ -920,13 +920,13 @@ func setupDiscovery(urls []string) (enode.Iterator, error) { return client.NewIterator(urls...) } -func (ss *SentryServerImpl) GetStatus() *proto_sentry.StatusData { +func (ss *GrpcServer) GetStatus() *proto_sentry.StatusData { ss.lock.RLock() defer ss.lock.RUnlock() return ss.statusData } -func (ss *SentryServerImpl) send(msgID proto_sentry.MessageId, peerID [64]byte, b []byte) { +func (ss *GrpcServer) send(msgID proto_sentry.MessageId, peerID [64]byte, b []byte) { ss.messageStreamsLock.RLock() defer ss.messageStreamsLock.RUnlock() req := &proto_sentry.InboundMessage{ @@ -950,14 +950,14 @@ func (ss *SentryServerImpl) send(msgID proto_sentry.MessageId, peerID [64]byte, } } -func (ss *SentryServerImpl) hasSubscribers(msgID proto_sentry.MessageId) bool { +func (ss *GrpcServer) hasSubscribers(msgID proto_sentry.MessageId) bool { ss.messageStreamsLock.RLock() defer ss.messageStreamsLock.RUnlock() return ss.messageStreams[msgID] != nil && len(ss.messageStreams[msgID]) > 0 // log.Error("Sending msg to core P2P failed", "msg", proto_sentry.MessageId_name[int32(streamMsg.msgId)], "err", err) } -func (ss *SentryServerImpl) addMessagesStream(ids []proto_sentry.MessageId, ch chan *proto_sentry.InboundMessage) func() { +func (ss *GrpcServer) addMessagesStream(ids []proto_sentry.MessageId, ch chan *proto_sentry.InboundMessage) func() { ss.messageStreamsLock.Lock() defer ss.messageStreamsLock.Unlock() if ss.messageStreams == nil { @@ -985,7 +985,7 @@ func (ss *SentryServerImpl) addMessagesStream(ids []proto_sentry.MessageId, ch c } const MessagesQueueSize = 1024 // one such queue per client of .Messages stream -func (ss *SentryServerImpl) Messages(req *proto_sentry.MessagesRequest, server proto_sentry.Sentry_MessagesServer) error { +func (ss *GrpcServer) Messages(req *proto_sentry.MessagesRequest, server proto_sentry.Sentry_MessagesServer) error { log.Trace("[Messages] new subscriber", "to", req.Ids) ch := make(chan *proto_sentry.InboundMessage, MessagesQueueSize) defer close(ch) @@ -1008,25 +1008,25 @@ func (ss *SentryServerImpl) Messages(req *proto_sentry.MessagesRequest, server p } // Close performs cleanup operations for the sentry -func (ss *SentryServerImpl) Close() { +func (ss *GrpcServer) Close() { if ss.P2pServer != nil { ss.P2pServer.Stop() } } -func (ss *SentryServerImpl) sendNewPeerToClients(peerID *proto_types.H512) { +func (ss *GrpcServer) sendNewPeerToClients(peerID *proto_types.H512) { if err := ss.peersStreams.Broadcast(&proto_sentry.PeerEvent{PeerId: peerID, EventId: proto_sentry.PeerEvent_Connect}); err != nil { log.Warn("Sending new peer notice to core P2P failed", "err", err) } } -func (ss *SentryServerImpl) sendGonePeerToClients(peerID *proto_types.H512) { +func (ss *GrpcServer) sendGonePeerToClients(peerID *proto_types.H512) { if err := ss.peersStreams.Broadcast(&proto_sentry.PeerEvent{PeerId: peerID, EventId: proto_sentry.PeerEvent_Disconnect}); err != nil { log.Warn("Sending gone peer notice to core P2P failed", "err", err) } } -func (ss *SentryServerImpl) PeerEvents(req *proto_sentry.PeerEventsRequest, server proto_sentry.Sentry_PeerEventsServer) error { +func (ss *GrpcServer) PeerEvents(req *proto_sentry.PeerEventsRequest, server proto_sentry.Sentry_PeerEventsServer) error { clean := ss.peersStreams.Add(server) defer clean() select { @@ -1037,7 +1037,7 @@ func (ss *SentryServerImpl) PeerEvents(req *proto_sentry.PeerEventsRequest, serv } } -func (ss *SentryServerImpl) NodeInfo(_ context.Context, _ *emptypb.Empty) (*proto_types.NodeInfoReply, error) { +func (ss *GrpcServer) NodeInfo(_ context.Context, _ *emptypb.Empty) (*proto_types.NodeInfoReply, error) { if ss.P2pServer == nil { return nil, errors.New("p2p server was not started") } diff --git a/cmd/sentry/sentry/sentry_test.go b/cmd/sentry/sentry/sentry_grpc_server_test.go similarity index 97% rename from cmd/sentry/sentry/sentry_test.go rename to cmd/sentry/sentry/sentry_grpc_server_test.go index 5cdd926c808..5e397abb237 100644 --- a/cmd/sentry/sentry/sentry_test.go +++ b/cmd/sentry/sentry/sentry_grpc_server_test.go @@ -21,8 +21,8 @@ import ( "github.com/stretchr/testify/require" ) -func testSentryServer(db kv.Getter, genesis *core.Genesis, genesisHash common.Hash) *SentryServerImpl { - s := &SentryServerImpl{ +func testSentryServer(db kv.Getter, genesis *core.Genesis, genesisHash common.Hash) *GrpcServer { + s := &GrpcServer{ ctx: context.Background(), } @@ -74,7 +74,7 @@ func testForkIDSplit(t *testing.T, protocol uint) { genesisProFork = gspecProFork.MustCommit(dbProFork) ) - var s1, s2 *SentryServerImpl + var s1, s2 *GrpcServer err := dbNoFork.Update(context.Background(), func(tx kv.RwTx) error { s1 = testSentryServer(tx, gspecNoFork, genesisNoFork.Hash()) @@ -160,7 +160,7 @@ func TestSentryServerImpl_SetStatusInitPanic(t *testing.T) { dbNoFork := memdb.NewTestDB(t) gspecNoFork := &core.Genesis{Config: configNoFork} genesisNoFork := gspecNoFork.MustCommit(dbNoFork) - ss := &SentryServerImpl{p2p: &p2p.Config{}} + ss := &GrpcServer{p2p: &p2p.Config{}} _, err := ss.SetStatus(context.Background(), &proto_sentry.StatusData{ ForkData: &proto_sentry.Forks{Genesis: gointerfaces.ConvertHashToH256(genesisNoFork.Hash())}, diff --git a/cmd/sentry/sentry/downloader.go b/cmd/sentry/sentry/sentry_multy_client.go similarity index 92% rename from cmd/sentry/sentry/downloader.go rename to cmd/sentry/sentry/sentry_multy_client.go index cc2296bd1cf..55f0bd9a7c3 100644 --- a/cmd/sentry/sentry/downloader.go +++ b/cmd/sentry/sentry/sentry_multy_client.go @@ -41,7 +41,7 @@ import ( func RecvUploadMessageLoop(ctx context.Context, sentry direct.SentryClient, - cs *ControlServerImpl, + cs *MultyClient, wg *sync.WaitGroup, ) { for { @@ -129,7 +129,7 @@ func RecvUploadMessage(ctx context.Context, func RecvUploadHeadersMessageLoop(ctx context.Context, sentry direct.SentryClient, - cs *ControlServerImpl, + cs *MultyClient, wg *sync.WaitGroup, ) { for { @@ -215,7 +215,7 @@ func RecvUploadHeadersMessage(ctx context.Context, func RecvMessageLoop(ctx context.Context, sentry direct.SentryClient, - cs *ControlServerImpl, + cs *MultyClient, wg *sync.WaitGroup, ) { for { @@ -320,12 +320,14 @@ func RecvMessage( } } -func SentrySetStatus(ctx context.Context, sentry direct.SentryClient, controlServer *ControlServerImpl) error { +func SentrySetStatus(ctx context.Context, sentry direct.SentryClient, controlServer *MultyClient) error { _, err := sentry.SetStatus(ctx, makeStatusData(controlServer)) return err } -type ControlServerImpl struct { +// MultyClient - does handle request/response/subscriptions to multiple sentries +// each sentry may support same or different p2p protocol +type MultyClient struct { lock sync.RWMutex Hd *headerdownload.HeaderDownload Bd *bodydownload.BodyDownload @@ -343,9 +345,9 @@ type ControlServerImpl struct { blockReader interfaces.HeaderAndCanonicalReader } -func NewControlServer(db kv.RwDB, nodeName string, chainConfig *params.ChainConfig, +func NewMultyClient(db kv.RwDB, nodeName string, chainConfig *params.ChainConfig, genesisHash common.Hash, engine consensus.Engine, networkID uint64, sentries []direct.SentryClient, - window int, blockReader interfaces.HeaderAndCanonicalReader) (*ControlServerImpl, error) { + window int, blockReader interfaces.HeaderAndCanonicalReader) (*MultyClient, error) { hd := headerdownload.NewHeaderDownload( 512, /* anchorLimit */ 1024*1024, /* linkLimit */ @@ -358,7 +360,7 @@ func NewControlServer(db kv.RwDB, nodeName string, chainConfig *params.ChainConf } bd := bodydownload.NewBodyDownload(window /* outstandingLimit */, engine) - cs := &ControlServerImpl{ + cs := &MultyClient{ nodeName: nodeName, Hd: hd, Bd: bd, @@ -379,7 +381,9 @@ func NewControlServer(db kv.RwDB, nodeName string, chainConfig *params.ChainConf return cs, err } -func (cs *ControlServerImpl) newBlockHashes66(ctx context.Context, req *proto_sentry.InboundMessage, sentry direct.SentryClient) error { +func (cs *MultyClient) Sentries() []direct.SentryClient { return cs.sentries } + +func (cs *MultyClient) newBlockHashes66(ctx context.Context, req *proto_sentry.InboundMessage, sentry direct.SentryClient) error { if !cs.Hd.RequestChaining() && !cs.Hd.FetchingNew() { return nil } @@ -424,7 +428,7 @@ func (cs *ControlServerImpl) newBlockHashes66(ctx context.Context, req *proto_se return nil } -func (cs *ControlServerImpl) blockHeaders66(ctx context.Context, in *proto_sentry.InboundMessage, sentry direct.SentryClient) error { +func (cs *MultyClient) blockHeaders66(ctx context.Context, in *proto_sentry.InboundMessage, sentry direct.SentryClient) error { // Parse the entire packet from scratch var pkt eth.BlockHeadersPacket66 if err := rlp.DecodeBytes(in.Data, &pkt); err != nil { @@ -444,7 +448,7 @@ func (cs *ControlServerImpl) blockHeaders66(ctx context.Context, in *proto_sentr return cs.blockHeaders(ctx, pkt.BlockHeadersPacket, rlpStream, in.PeerId, sentry) } -func (cs *ControlServerImpl) blockHeaders(ctx context.Context, pkt eth.BlockHeadersPacket, rlpStream *rlp.Stream, peerID *proto_types.H512, sentry direct.SentryClient) error { +func (cs *MultyClient) blockHeaders(ctx context.Context, pkt eth.BlockHeadersPacket, rlpStream *rlp.Stream, peerID *proto_types.H512, sentry direct.SentryClient) error { // Stream is at the BlockHeadersPacket, which is list of headers if _, err := rlpStream.List(); err != nil { return fmt.Errorf("decode 2 BlockHeadersPacket66: %w", err) @@ -532,7 +536,7 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, pkt eth.BlockHead return nil } -func (cs *ControlServerImpl) newBlock66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { +func (cs *MultyClient) newBlock66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { // Extract header from the block rlpStream := rlp.NewStream(bytes.NewReader(inreq.Data), uint64(len(inreq.Data))) _, err := rlpStream.List() // Now stream is at the beginning of the block record @@ -588,7 +592,7 @@ func (cs *ControlServerImpl) newBlock66(ctx context.Context, inreq *proto_sentry return nil } -func (cs *ControlServerImpl) blockBodies66(inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { +func (cs *MultyClient) blockBodies66(inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { var request eth.BlockRawBodiesPacket66 if err := rlp.DecodeBytes(inreq.Data, &request); err != nil { return fmt.Errorf("decode BlockBodiesPacket66: %w", err) @@ -598,11 +602,11 @@ func (cs *ControlServerImpl) blockBodies66(inreq *proto_sentry.InboundMessage, s return nil } -func (cs *ControlServerImpl) receipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { +func (cs *MultyClient) receipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { return nil } -func (cs *ControlServerImpl) getBlockHeaders66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { +func (cs *MultyClient) getBlockHeaders66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { var query eth.GetBlockHeadersPacket66 if err := rlp.DecodeBytes(inreq.Data, &query); err != nil { return fmt.Errorf("decoding getBlockHeaders66: %w, data: %x", err, inreq.Data) @@ -643,7 +647,7 @@ func (cs *ControlServerImpl) getBlockHeaders66(ctx context.Context, inreq *proto return nil } -func (cs *ControlServerImpl) getBlockBodies66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { +func (cs *MultyClient) getBlockBodies66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { var query eth.GetBlockBodiesPacket66 if err := rlp.DecodeBytes(inreq.Data, &query); err != nil { return fmt.Errorf("decoding getBlockBodies66: %w, data: %x", err, inreq.Data) @@ -680,7 +684,7 @@ func (cs *ControlServerImpl) getBlockBodies66(ctx context.Context, inreq *proto_ return nil } -func (cs *ControlServerImpl) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { +func (cs *MultyClient) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { var query eth.GetReceiptsPacket66 if err := rlp.DecodeBytes(inreq.Data, &query); err != nil { return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data) @@ -720,7 +724,7 @@ func (cs *ControlServerImpl) getReceipts66(ctx context.Context, inreq *proto_sen return nil } -func (cs *ControlServerImpl) HandleInboundMessage(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { +func (cs *MultyClient) HandleInboundMessage(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error { switch inreq.Id { // ========= eth 66 ========== @@ -745,7 +749,7 @@ func (cs *ControlServerImpl) HandleInboundMessage(ctx context.Context, inreq *pr } } -func makeStatusData(s *ControlServerImpl) *proto_sentry.StatusData { +func makeStatusData(s *MultyClient) *proto_sentry.StatusData { return &proto_sentry.StatusData{ NetworkId: s.networkId, TotalDifficulty: gointerfaces.ConvertUint256IntToH256(s.headTd), diff --git a/eth/backend.go b/eth/backend.go index 5a1b7ff9604..1c2bd5b9389 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -116,11 +116,10 @@ type Ethereum struct { minedBlocks chan *types.Block // downloader fields - sentryCtx context.Context - sentryCancel context.CancelFunc - sentryControlServer *sentry.ControlServerImpl - sentryServers []*sentry.SentryServerImpl - sentries []direct.SentryClient + sentryCtx context.Context + sentryCancel context.CancelFunc + sentriesClient *sentry.MultyClient + sentryServers []*sentry.GrpcServer stagedSync *stagedsync.Sync @@ -209,7 +208,6 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l genesisHash: genesis.Hash(), waitForStageLoopStop: make(chan struct{}), waitForMiningStop: make(chan struct{}), - sentries: []direct.SentryClient{}, notifications: &stagedsync.Notifications{ Events: privateapi.NewEvents(), Accumulator: shards.NewAccumulator(chainConfig), @@ -222,13 +220,14 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal) } + var sentries []direct.SentryClient if len(stack.Config().P2P.SentryAddr) > 0 { for _, addr := range stack.Config().P2P.SentryAddr { sentryClient, err := sentry.GrpcClient(backend.sentryCtx, addr) if err != nil { return nil, err } - backend.sentries = append(backend.sentries, sentryClient) + sentries = append(sentries, sentryClient) } } else { var readNodeInfo = func() *eth.NodeInfo { @@ -248,9 +247,9 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l cfg66 := stack.Config().P2P cfg66.NodeDatabase = filepath.Join(stack.Config().DataDir, "nodes", "eth66") - server66 := sentry.NewSentryServer(backend.sentryCtx, d66, readNodeInfo, &cfg66, eth.ETH66) + server66 := sentry.NewGrpcServer(backend.sentryCtx, d66, readNodeInfo, &cfg66, eth.ETH66) backend.sentryServers = append(backend.sentryServers, server66) - backend.sentries = []direct.SentryClient{direct.NewSentryClientDirect(eth.ETH66, server66)} + sentries = []direct.SentryClient{direct.NewSentryClientDirect(eth.ETH66, server66)} go func() { logEvery := time.NewTicker(120 * time.Second) @@ -341,7 +340,7 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l return nil, err } - backend.sentryControlServer, err = sentry.NewControlServer(chainKv, stack.Config().NodeName(), chainConfig, genesis.Hash(), backend.engine, backend.config.NetworkID, backend.sentries, config.BlockDownloaderWindow, blockReader) + backend.sentriesClient, err = sentry.NewMultyClient(chainKv, stack.Config().NodeName(), chainConfig, genesis.Hash(), backend.engine, backend.config.NetworkID, sentries, config.BlockDownloaderWindow, blockReader) if err != nil { return nil, err } @@ -358,7 +357,7 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l backend.newTxs2 = make(chan types2.Hashes, 1024) //defer close(newTxs) backend.txPool2DB, backend.txPool2, backend.txPool2Fetch, backend.txPool2Send, backend.txPool2GrpcServer, err = txpooluitl.AllComponents( - ctx, txpoolCfg, kvcache.NewDummy(), backend.newTxs2, backend.chainDB, backend.sentries, stateDiffClient, + ctx, txpoolCfg, kvcache.NewDummy(), backend.newTxs2, backend.chainDB, backend.sentriesClient.Sentries(), stateDiffClient, ) if err != nil { return nil, err @@ -411,7 +410,7 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l // Initialize ethbackend ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.chainDB, backend.notifications.Events, - blockReader, chainConfig, backend.sentryControlServer.Hd.BeaconRequestList, backend.sentryControlServer.Hd.PayloadStatusCh, + blockReader, chainConfig, backend.sentriesClient.Hd.BeaconRequestList, backend.sentriesClient.Hd.PayloadStatusCh, assembleBlockPOS, config.Miner.EnabledPOS) miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi) // If we enabled the proposer flag we initiates the block proposing thread @@ -463,15 +462,15 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l select { case b := <-backend.minedBlocks: //p2p - //backend.sentryControlServer.BroadcastNewBlock(context.Background(), b, b.Difficulty()) + //backend.sentriesClient.BroadcastNewBlock(context.Background(), b, b.Difficulty()) //rpcdaemon if err := miningRPC.(*privateapi.MiningServer).BroadcastMinedBlock(b); err != nil { log.Error("txpool rpc mined block broadcast", "err", err) } - if err := backend.sentryControlServer.Hd.AddMinedHeader(b.Header()); err != nil { + if err := backend.sentriesClient.Hd.AddMinedHeader(b.Header()); err != nil { log.Error("add mined block to header downloader", "err", err) } - if err := backend.sentryControlServer.Bd.AddMinedBlock(b); err != nil { + if err := backend.sentriesClient.Bd.AddMinedBlock(b); err != nil { log.Error("add mined block to body downloader", "err", err) } @@ -496,13 +495,13 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l backend.stagedSync, err = stages2.NewStagedSync(backend.sentryCtx, backend.log, backend.chainDB, stack.Config().P2P, *config, chainConfig.TerminalTotalDifficulty, - backend.sentryControlServer, tmpdir, backend.notifications, + backend.sentriesClient, tmpdir, backend.notifications, backend.downloaderClient, allSnapshots, config.SnapshotDir, headCh) if err != nil { return nil, err } - backend.sentryControlServer.Hd.StartPoSDownloader(backend.sentryCtx, backend.sentryControlServer.SendHeaderRequest, backend.sentryControlServer.Penalize) + backend.sentriesClient.Hd.StartPoSDownloader(backend.sentryCtx, backend.sentriesClient.SendHeaderRequest, backend.sentriesClient.Penalize) emptyBadHash := config.BadBlockHash == common.Hash{} if !emptyBadHash { @@ -658,7 +657,7 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy defer skipCycleEvery.Stop() for range skipCycleEvery.C { select { - case s.sentryControlServer.Hd.SkipCycleHack <- struct{}{}: + case s.sentriesClient.Hd.SkipCycleHack <- struct{}{}: default: } } @@ -735,7 +734,7 @@ func (s *Ethereum) NetPeerCount() (uint64, error) { var sentryPc uint64 = 0 log.Trace("sentry", "peer count", sentryPc) - for _, sc := range s.sentries { + for _, sc := range s.sentriesClient.Sentries() { ctx := context.Background() reply, err := sc.PeerCount(ctx, &proto_sentry.PeerCountRequest{}) if err != nil { @@ -749,13 +748,13 @@ func (s *Ethereum) NetPeerCount() (uint64, error) { } func (s *Ethereum) NodesInfo(limit int) (*remote.NodesInfoReply, error) { - if limit == 0 || limit > len(s.sentries) { - limit = len(s.sentries) + if limit == 0 || limit > len(s.sentriesClient.Sentries()) { + limit = len(s.sentriesClient.Sentries()) } nodes := make([]*prototypes.NodeInfoReply, 0, limit) for i := 0; i < limit; i++ { - sc := s.sentries[i] + sc := s.sentriesClient.Sentries()[i] nodeInfo, err := sc.NodeInfo(context.Background(), nil) if err != nil { @@ -773,10 +772,10 @@ func (s *Ethereum) NodesInfo(limit int) (*remote.NodesInfoReply, error) { func (s *Ethereum) Peers(ctx context.Context) (*remote.PeersReply, error) { var reply remote.PeersReply - for _, sentryClient := range s.sentries { + for _, sentryClient := range s.sentriesClient.Sentries() { peers, err := sentryClient.Peers(ctx, &emptypb.Empty{}) if err != nil { - return nil, fmt.Errorf("Ethereum backend SentryClient.Peers error: %w", err) + return nil, fmt.Errorf("Ethereum backend MultyClient.Peers error: %w", err) } reply.Peers = append(reply.Peers, peers.Peers...) } @@ -796,20 +795,21 @@ func (s *Ethereum) Protocols() []p2p.Protocol { // Start implements node.Lifecycle, starting all internal goroutines needed by the // Ethereum protocol implementation. func (s *Ethereum) Start() error { - for i := range s.sentries { + sentries := s.sentriesClient.Sentries() + for i := range sentries { go func(i int) { - sentry.RecvMessageLoop(s.sentryCtx, s.sentries[i], s.sentryControlServer, nil) + sentry.RecvMessageLoop(s.sentryCtx, sentries[i], s.sentriesClient, nil) }(i) go func(i int) { - sentry.RecvUploadMessageLoop(s.sentryCtx, s.sentries[i], s.sentryControlServer, nil) + sentry.RecvUploadMessageLoop(s.sentryCtx, sentries[i], s.sentriesClient, nil) }(i) go func(i int) { - sentry.RecvUploadHeadersMessageLoop(s.sentryCtx, s.sentries[i], s.sentryControlServer, nil) + sentry.RecvUploadHeadersMessageLoop(s.sentryCtx, sentries[i], s.sentriesClient, nil) }(i) } time.Sleep(10 * time.Millisecond) // just to reduce logs order confusion - go stages2.StageLoop(s.sentryCtx, s.chainDB, s.stagedSync, s.sentryControlServer.Hd, s.notifications, s.sentryControlServer.UpdateHead, s.waitForStageLoopStop, s.config.SyncLoopThrottle) + go stages2.StageLoop(s.sentryCtx, s.chainDB, s.stagedSync, s.sentriesClient.Hd, s.notifications, s.sentriesClient.UpdateHead, s.waitForStageLoopStop, s.config.SyncLoopThrottle) return nil } @@ -870,6 +870,6 @@ func (s *Ethereum) SentryCtx() context.Context { return s.sentryCtx } -func (s *Ethereum) SentryControlServer() *sentry.ControlServerImpl { - return s.sentryControlServer +func (s *Ethereum) SentryControlServer() *sentry.MultyClient { + return s.sentriesClient } diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index fa0dc8a7241..bc440e34238 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -52,7 +52,7 @@ const ( // Service implements an Ethereum netstats reporting daemon that pushes local // chain statistics up to a monitoring server. type Service struct { - servers []*sentry.SentryServerImpl // Peer-to-peer server to retrieve networking infos + servers []*sentry.GrpcServer // Peer-to-peer server to retrieve networking infos chaindb kv.RoDB networkid uint64 engine consensus.Engine // Consensus engine to retrieve variadic block fields @@ -118,7 +118,7 @@ func (w *connWrapper) Close() error { } // New returns a monitoring service ready for stats reporting. -func New(node *node.Node, servers []*sentry.SentryServerImpl, chainDB kv.RoDB, engine consensus.Engine, url string, networkid uint64, quitCh <-chan struct{}, headCh chan *types.Block) error { +func New(node *node.Node, servers []*sentry.GrpcServer, chainDB kv.RoDB, engine consensus.Engine, url string, networkid uint64, quitCh <-chan struct{}, headCh chan *types.Block) error { // Parse the netstats connection url re := regexp.MustCompile("([^:@]*)(:([^@]*))?@(.+)") parts := re.FindStringSubmatch(url) diff --git a/turbo/stages/mock_sentry.go b/turbo/stages/mock_sentry.go index 618b2320ee9..1b9330dd8bb 100644 --- a/turbo/stages/mock_sentry.go +++ b/turbo/stages/mock_sentry.go @@ -51,30 +51,30 @@ import ( type MockSentry struct { proto_sentry.UnimplementedSentryServer - Ctx context.Context - Log log.Logger - t *testing.T - cancel context.CancelFunc - DB kv.RwDB - tmpdir string - snapshotDir string - Engine consensus.Engine - ChainConfig *params.ChainConfig - Sync *stagedsync.Sync - MiningSync *stagedsync.Sync - PendingBlocks chan *types.Block - MinedBlocks chan *types.Block - downloader *sentry.ControlServerImpl - Key *ecdsa.PrivateKey - Genesis *types.Block - SentryClient direct.SentryClient - PeerId *ptypes.H512 - UpdateHead func(Ctx context.Context, head uint64, hash common.Hash, td *uint256.Int) - streams map[proto_sentry.MessageId][]proto_sentry.Sentry_MessagesServer - sentMessages []*proto_sentry.OutboundMessageData - StreamWg sync.WaitGroup - ReceiveWg sync.WaitGroup - Address common.Address + Ctx context.Context + Log log.Logger + t *testing.T + cancel context.CancelFunc + DB kv.RwDB + tmpdir string + snapshotDir string + Engine consensus.Engine + ChainConfig *params.ChainConfig + Sync *stagedsync.Sync + MiningSync *stagedsync.Sync + PendingBlocks chan *types.Block + MinedBlocks chan *types.Block + sentriesClient *sentry.MultyClient + Key *ecdsa.PrivateKey + Genesis *types.Block + SentryClient direct.SentryClient + PeerId *ptypes.H512 + UpdateHead func(Ctx context.Context, head uint64, hash common.Hash, td *uint256.Int) + streams map[proto_sentry.MessageId][]proto_sentry.Sentry_MessagesServer + sentMessages []*proto_sentry.OutboundMessageData + StreamWg sync.WaitGroup + ReceiveWg sync.WaitGroup + Address common.Address Notifications *stagedsync.Notifications @@ -267,7 +267,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey blockDownloaderWindow := 65536 networkID := uint64(1) - mock.downloader, err = sentry.NewControlServer(mock.DB, "mock", mock.ChainConfig, mock.Genesis.Hash(), mock.Engine, networkID, sentries, blockDownloaderWindow, blockReader) + mock.sentriesClient, err = sentry.NewMultyClient(mock.DB, "mock", mock.ChainConfig, mock.Genesis.Hash(), mock.Engine, networkID, sentries, blockDownloaderWindow, blockReader) if err != nil { if t != nil { t.Fatal(err) @@ -283,12 +283,12 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey mock.Sync = stagedsync.New( stagedsync.DefaultStages(mock.Ctx, prune, - stagedsync.StageHeadersCfg(mock.DB, mock.downloader.Hd, mock.downloader.Bd, *mock.ChainConfig, sendHeaderRequest, propagateNewBlockHashes, penalize, cfg.BatchSize, false, allSnapshots, snapshotsDownloader, blockReader, mock.tmpdir, mock.Notifications.Events), + stagedsync.StageHeadersCfg(mock.DB, mock.sentriesClient.Hd, mock.sentriesClient.Bd, *mock.ChainConfig, sendHeaderRequest, propagateNewBlockHashes, penalize, cfg.BatchSize, false, allSnapshots, snapshotsDownloader, blockReader, mock.tmpdir, mock.Notifications.Events), stagedsync.StageCumulativeIndexCfg(mock.DB), stagedsync.StageBlockHashesCfg(mock.DB, mock.tmpdir, mock.ChainConfig), stagedsync.StageBodiesCfg( mock.DB, - mock.downloader.Bd, + mock.sentriesClient.Bd, sendBodyRequest, penalize, blockPropagator, @@ -325,7 +325,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey stagedsync.DefaultPruneOrder, ) - mock.downloader.Hd.StartPoSDownloader(mock.Ctx, sendHeaderRequest, penalize) + mock.sentriesClient.Hd.StartPoSDownloader(mock.Ctx, sendHeaderRequest, penalize) miningConfig := cfg.Miner miningConfig.Enabled = true @@ -349,13 +349,13 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey ) mock.StreamWg.Add(1) - go sentry.RecvMessageLoop(mock.Ctx, mock.SentryClient, mock.downloader, &mock.ReceiveWg) + go sentry.RecvMessageLoop(mock.Ctx, mock.SentryClient, mock.sentriesClient, &mock.ReceiveWg) mock.StreamWg.Wait() mock.StreamWg.Add(1) - go sentry.RecvUploadMessageLoop(mock.Ctx, mock.SentryClient, mock.downloader, &mock.ReceiveWg) + go sentry.RecvUploadMessageLoop(mock.Ctx, mock.SentryClient, mock.sentriesClient, &mock.ReceiveWg) mock.StreamWg.Wait() mock.StreamWg.Add(1) - go sentry.RecvUploadHeadersMessageLoop(mock.Ctx, mock.SentryClient, mock.downloader, &mock.ReceiveWg) + go sentry.RecvUploadHeadersMessageLoop(mock.Ctx, mock.SentryClient, mock.sentriesClient, &mock.ReceiveWg) mock.StreamWg.Wait() return mock @@ -488,20 +488,20 @@ func (ms *MockSentry) InsertChain(chain *core.ChainPack) error { }); err != nil { return err } - if ms.downloader.Hd.IsBadHeader(chain.TopBlock.Hash()) { + if ms.sentriesClient.Hd.IsBadHeader(chain.TopBlock.Hash()) { return fmt.Errorf("block %d %x was invalid", chain.TopBlock.NumberU64(), chain.TopBlock.Hash()) } return nil } func (ms *MockSentry) SendPayloadRequest(message *engineapi.PayloadMessage) { - ms.downloader.Hd.BeaconRequestList.AddPayloadRequest(message) + ms.sentriesClient.Hd.BeaconRequestList.AddPayloadRequest(message) } func (ms *MockSentry) SendForkChoiceRequest(message *engineapi.ForkChoiceMessage) { - ms.downloader.Hd.BeaconRequestList.AddForkChoiceRequest(message) + ms.sentriesClient.Hd.BeaconRequestList.AddForkChoiceRequest(message) } func (ms *MockSentry) ReceivePayloadStatus() privateapi.PayloadStatus { - return <-ms.downloader.Hd.PayloadStatusCh + return <-ms.sentriesClient.Hd.PayloadStatusCh } diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 49bd1084767..ad89b0386ce 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -84,7 +84,7 @@ func StageLoop( log.Error("Staged Sync", "err", err) if recoveryErr := hd.RecoverFromDb(db); recoveryErr != nil { - log.Error("Failed to recover header downloader", "err", recoveryErr) + log.Error("Failed to recover header sentriesClient", "err", recoveryErr) } time.Sleep(500 * time.Millisecond) // just to avoid too much similar errors in logs continue @@ -247,7 +247,7 @@ func NewStagedSync( p2pCfg p2p.Config, cfg ethconfig.Config, terminalTotalDifficulty *big.Int, - controlServer *sentry.ControlServerImpl, + controlServer *sentry.MultyClient, tmpdir string, notifications *stagedsync.Notifications, snapshotDownloader proto_downloader.DownloaderClient,