Skip to content

Commit

Permalink
Rename sentry.ControlServerImpl to sentry.MultyClient and sentry.Sent…
Browse files Browse the repository at this point in the history
…ryServerImpl to sentry.GrpcServer #444
  • Loading branch information
AskAlexSharov authored May 10, 2022
1 parent 3e4fb5d commit 2c26583
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 137 deletions.
File renamed without changes.
2 changes: 1 addition & 1 deletion cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)

br := getBlockReader(chainConfig)
blockDownloaderWindow := 65536
sentryControlServer, err := sentry.NewControlServer(db, "", chainConfig, genesisBlock.Hash(), engine, 1, nil, blockDownloaderWindow, br)
sentryControlServer, err := sentry.NewMultyClient(db, "", chainConfig, genesisBlock.Hash(), engine, 1, nil, blockDownloaderWindow, br)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/commands/send_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestSendRawTransaction(t *testing.T) {
//TODO: make propagation easy to test - now race
//time.Sleep(time.Second)
//sent := m.SentMessage(0)
//require.Equal(eth.ToProto[m.SentryClient.Protocol()][eth.NewPooledTransactionHashesMsg], sent.Id)
//require.Equal(eth.ToProto[m.MultyClient.Protocol()][eth.NewPooledTransactionHashesMsg], sent.Id)
}

func transaction(nonce uint64, gaslimit uint64, key *ecdsa.PrivateKey) types.Transaction {
Expand Down
10 changes: 5 additions & 5 deletions cmd/sentry/sentry/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
maxTxPacketSize = 100 * 1024
)

func (cs *ControlServerImpl) PropagateNewBlockHashes(ctx context.Context, announces []headerdownload.Announce) {
func (cs *MultyClient) PropagateNewBlockHashes(ctx context.Context, announces []headerdownload.Announce) {
cs.lock.RLock()
defer cs.lock.RUnlock()
typedRequest := make(eth.NewBlockHashesPacket, len(announces))
Expand Down Expand Up @@ -72,7 +72,7 @@ func (cs *ControlServerImpl) PropagateNewBlockHashes(ctx context.Context, announ
}
}

func (cs *ControlServerImpl) BroadcastNewBlock(ctx context.Context, block *types.Block, td *big.Int) {
func (cs *MultyClient) BroadcastNewBlock(ctx context.Context, block *types.Block, td *big.Int) {
cs.lock.RLock()
defer cs.lock.RUnlock()
data, err := rlp.EncodeToBytes(&eth.NewBlockPacket{
Expand Down Expand Up @@ -116,7 +116,7 @@ func (cs *ControlServerImpl) BroadcastNewBlock(ctx context.Context, block *types
}
}

func (cs *ControlServerImpl) BroadcastLocalPooledTxs(ctx context.Context, txs []common.Hash) {
func (cs *MultyClient) BroadcastLocalPooledTxs(ctx context.Context, txs []common.Hash) {
if len(txs) == 0 {
return
}
Expand Down Expand Up @@ -179,7 +179,7 @@ func (cs *ControlServerImpl) BroadcastLocalPooledTxs(ctx context.Context, txs []
}
}

func (cs *ControlServerImpl) BroadcastRemotePooledTxs(ctx context.Context, txs []common.Hash) {
func (cs *MultyClient) BroadcastRemotePooledTxs(ctx context.Context, txs []common.Hash) {
if len(txs) == 0 {
return
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func (cs *ControlServerImpl) BroadcastRemotePooledTxs(ctx context.Context, txs [
}
}

func (cs *ControlServerImpl) PropagatePooledTxsToPeersList(ctx context.Context, peers []*types2.H512, txs []common.Hash) {
func (cs *MultyClient) PropagatePooledTxsToPeersList(ctx context.Context, peers []*types2.H512, txs []common.Hash) {
if len(txs) == 0 {
return
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/sentry/sentry/sentry_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

// Methods of sentry called by Core

func (cs *ControlServerImpl) UpdateHead(ctx context.Context, height uint64, hash common.Hash, td *uint256.Int) {
func (cs *MultyClient) UpdateHead(ctx context.Context, height uint64, hash common.Hash, td *uint256.Int) {
cs.lock.Lock()
defer cs.lock.Unlock()
cs.headHeight = height
Expand All @@ -36,7 +36,7 @@ func (cs *ControlServerImpl) UpdateHead(ctx context.Context, height uint64, hash
}
}

func (cs *ControlServerImpl) SendBodyRequest(ctx context.Context, req *bodydownload.BodyRequest) (peerID [64]byte, ok bool) {
func (cs *MultyClient) SendBodyRequest(ctx context.Context, req *bodydownload.BodyRequest) (peerID [64]byte, ok bool) {
// if sentry not found peers to send such message, try next one. stop if found.
for i, ok, next := cs.randSentryIndex(); ok; i, ok = next() {
if !cs.sentries[i].Ready() {
Expand Down Expand Up @@ -78,7 +78,7 @@ func (cs *ControlServerImpl) SendBodyRequest(ctx context.Context, req *bodydownl
return [64]byte{}, false
}

func (cs *ControlServerImpl) SendHeaderRequest(ctx context.Context, req *headerdownload.HeaderRequest) (peerID [64]byte, ok bool) {
func (cs *MultyClient) SendHeaderRequest(ctx context.Context, req *headerdownload.HeaderRequest) (peerID [64]byte, ok bool) {
// if sentry not found peers to send such message, try next one. stop if found.
for i, ok, next := cs.randSentryIndex(); ok; i, ok = next() {
if !cs.sentries[i].Ready() {
Expand Down Expand Up @@ -130,7 +130,7 @@ func (cs *ControlServerImpl) SendHeaderRequest(ctx context.Context, req *headerd
return [64]byte{}, false
}

func (cs *ControlServerImpl) randSentryIndex() (int, bool, func() (int, bool)) {
func (cs *MultyClient) randSentryIndex() (int, bool, func() (int, bool)) {
var i int
if len(cs.sentries) > 1 {
i = rand.Intn(len(cs.sentries) - 1)
Expand All @@ -143,7 +143,7 @@ func (cs *ControlServerImpl) randSentryIndex() (int, bool, func() (int, bool)) {
}

// sending list of penalties to all sentries
func (cs *ControlServerImpl) Penalize(ctx context.Context, penalties []headerdownload.PenaltyItem) {
func (cs *MultyClient) Penalize(ctx context.Context, penalties []headerdownload.PenaltyItem) {
for i := range penalties {
outreq := proto_sentry.PenalizePeerRequest{
PeerId: gointerfaces.ConvertHashToH512(penalties[i].PeerID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func runPeer(
}
}

func grpcSentryServer(ctx context.Context, sentryAddr string, ss *SentryServerImpl, healthCheck bool) (*grpc.Server, error) {
func grpcSentryServer(ctx context.Context, sentryAddr string, ss *GrpcServer, healthCheck bool) (*grpc.Server, error) {
// STARTING GRPC SERVER
log.Info("Starting Sentry gRPC server", "on", sentryAddr)
listenConfig := net.ListenConfig{
Expand Down Expand Up @@ -510,8 +510,8 @@ func grpcSentryServer(ctx context.Context, sentryAddr string, ss *SentryServerIm
return grpcServer, nil
}

func NewSentryServer(ctx context.Context, dialCandidates enode.Iterator, readNodeInfo func() *eth.NodeInfo, cfg *p2p.Config, protocol uint) *SentryServerImpl {
ss := &SentryServerImpl{
func NewGrpcServer(ctx context.Context, dialCandidates enode.Iterator, readNodeInfo func() *eth.NodeInfo, cfg *p2p.Config, protocol uint) *GrpcServer {
ss := &GrpcServer{
ctx: ctx,
p2p: cfg,
peersStreams: NewPeersStreams(),
Expand Down Expand Up @@ -576,7 +576,7 @@ func NewSentryServer(ctx context.Context, dialCandidates enode.Iterator, readNod
// Sentry creates and runs standalone sentry
func Sentry(ctx context.Context, datadir string, sentryAddr string, discoveryDNS []string, cfg *p2p.Config, protocolVersion uint, healthCheck bool) error {
dir.MustExist(datadir)
sentryServer := NewSentryServer(ctx, nil, func() *eth.NodeInfo { return nil }, cfg, protocolVersion)
sentryServer := NewGrpcServer(ctx, nil, func() *eth.NodeInfo { return nil }, cfg, protocolVersion)
sentryServer.discoveryDNS = discoveryDNS

grpcServer, err := grpcSentryServer(ctx, sentryAddr, sentryServer, healthCheck)
Expand All @@ -590,7 +590,7 @@ func Sentry(ctx context.Context, datadir string, sentryAddr string, discoveryDNS
return nil
}

type SentryServerImpl struct {
type GrpcServer struct {
proto_sentry.UnimplementedSentryServer
ctx context.Context
Protocol p2p.Protocol
Expand All @@ -607,7 +607,7 @@ type SentryServerImpl struct {
p2p *p2p.Config
}

func (ss *SentryServerImpl) rangePeers(f func(peerInfo *PeerInfo) bool) {
func (ss *GrpcServer) rangePeers(f func(peerInfo *PeerInfo) bool) {
ss.GoodPeers.Range(func(key, value interface{}) bool {
peerInfo, _ := value.(*PeerInfo)
if peerInfo == nil {
Expand All @@ -617,7 +617,7 @@ func (ss *SentryServerImpl) rangePeers(f func(peerInfo *PeerInfo) bool) {
})
}

func (ss *SentryServerImpl) getPeer(peerID [64]byte) (peerInfo *PeerInfo) {
func (ss *GrpcServer) getPeer(peerID [64]byte) (peerInfo *PeerInfo) {
if value, ok := ss.GoodPeers.Load(peerID); ok {
peerInfo := value.(*PeerInfo)
if peerInfo != nil {
Expand All @@ -628,7 +628,7 @@ func (ss *SentryServerImpl) getPeer(peerID [64]byte) (peerInfo *PeerInfo) {
return nil
}

func (ss *SentryServerImpl) removePeer(peerID [64]byte) {
func (ss *GrpcServer) removePeer(peerID [64]byte) {
if value, ok := ss.GoodPeers.LoadAndDelete(peerID); ok {
peerInfo := value.(*PeerInfo)
if peerInfo != nil {
Expand All @@ -637,7 +637,7 @@ func (ss *SentryServerImpl) removePeer(peerID [64]byte) {
}
}

func (ss *SentryServerImpl) writePeer(logPrefix string, peerInfo *PeerInfo, msgcode uint64, data []byte, ttl time.Duration) {
func (ss *GrpcServer) writePeer(logPrefix string, peerInfo *PeerInfo, msgcode uint64, data []byte, ttl time.Duration) {
peerInfo.Async(func() {
err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(data)), Payload: bytes.NewReader(data)})
if err != nil {
Expand All @@ -652,7 +652,7 @@ func (ss *SentryServerImpl) writePeer(logPrefix string, peerInfo *PeerInfo, msgc
})
}

func (ss *SentryServerImpl) startSync(ctx context.Context, bestHash common.Hash, peerID [64]byte) error {
func (ss *GrpcServer) startSync(ctx context.Context, bestHash common.Hash, peerID [64]byte) error {
switch ss.Protocol.Version {
case eth.ETH66:
b, err := rlp.EncodeToBytes(&eth.GetBlockHeadersPacket66{
Expand Down Expand Up @@ -680,22 +680,22 @@ func (ss *SentryServerImpl) startSync(ctx context.Context, bestHash common.Hash,
return nil
}

func (ss *SentryServerImpl) PenalizePeer(_ context.Context, req *proto_sentry.PenalizePeerRequest) (*emptypb.Empty, error) {
func (ss *GrpcServer) PenalizePeer(_ context.Context, req *proto_sentry.PenalizePeerRequest) (*emptypb.Empty, error) {
//log.Warn("Received penalty", "kind", req.GetPenalty().Descriptor().FullName, "from", fmt.Sprintf("%s", req.GetPeerId()))
peerID := ConvertH512ToPeerID(req.PeerId)
ss.removePeer(peerID)
return &emptypb.Empty{}, nil
}

func (ss *SentryServerImpl) PeerMinBlock(_ context.Context, req *proto_sentry.PeerMinBlockRequest) (*emptypb.Empty, error) {
func (ss *GrpcServer) PeerMinBlock(_ context.Context, req *proto_sentry.PeerMinBlockRequest) (*emptypb.Empty, error) {
peerID := ConvertH512ToPeerID(req.PeerId)
if peerInfo := ss.getPeer(peerID); peerInfo != nil {
peerInfo.SetIncreasedHeight(req.MinBlock)
}
return &emptypb.Empty{}, nil
}

func (ss *SentryServerImpl) findPeer(minBlock uint64) (*PeerInfo, bool) {
func (ss *GrpcServer) findPeer(minBlock uint64) (*PeerInfo, bool) {
// Choose a peer that we can send this request to, with maximum number of permits
var foundPeerInfo *PeerInfo
var maxPermits int
Expand All @@ -717,7 +717,7 @@ func (ss *SentryServerImpl) findPeer(minBlock uint64) (*PeerInfo, bool) {
return foundPeerInfo, maxPermits > 0
}

func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *proto_sentry.SendMessageByMinBlockRequest) (*proto_sentry.SentPeers, error) {
func (ss *GrpcServer) SendMessageByMinBlock(_ context.Context, inreq *proto_sentry.SendMessageByMinBlockRequest) (*proto_sentry.SentPeers, error) {
reply := &proto_sentry.SentPeers{}
msgcode := eth.FromProto[ss.Protocol.Version][inreq.Data.Id]
if msgcode != eth.GetBlockHeadersMsg &&
Expand All @@ -738,7 +738,7 @@ func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *prot
return reply, lastErr
}

func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sentry.SendMessageByIdRequest) (*proto_sentry.SentPeers, error) {
func (ss *GrpcServer) SendMessageById(_ context.Context, inreq *proto_sentry.SendMessageByIdRequest) (*proto_sentry.SentPeers, error) {
reply := &proto_sentry.SentPeers{}
msgcode := eth.FromProto[ss.Protocol.Version][inreq.Data.Id]
if msgcode != eth.GetBlockHeadersMsg &&
Expand All @@ -765,7 +765,7 @@ func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sent
return reply, nil
}

func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *proto_sentry.SendMessageToRandomPeersRequest) (*proto_sentry.SentPeers, error) {
func (ss *GrpcServer) SendMessageToRandomPeers(ctx context.Context, req *proto_sentry.SendMessageToRandomPeersRequest) (*proto_sentry.SentPeers, error) {
reply := &proto_sentry.SentPeers{}

msgcode := eth.FromProto[ss.Protocol.Version][req.Data.Id]
Expand Down Expand Up @@ -798,7 +798,7 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p
return reply, lastErr
}

func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sentry.OutboundMessageData) (*proto_sentry.SentPeers, error) {
func (ss *GrpcServer) SendMessageToAll(ctx context.Context, req *proto_sentry.OutboundMessageData) (*proto_sentry.SentPeers, error) {
reply := &proto_sentry.SentPeers{}

msgcode := eth.FromProto[ss.Protocol.Version][req.Id]
Expand All @@ -817,7 +817,7 @@ func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sen
return reply, lastErr
}

func (ss *SentryServerImpl) HandShake(context.Context, *emptypb.Empty) (*proto_sentry.HandShakeReply, error) {
func (ss *GrpcServer) HandShake(context.Context, *emptypb.Empty) (*proto_sentry.HandShakeReply, error) {
reply := &proto_sentry.HandShakeReply{}
switch ss.Protocol.Version {
case eth.ETH66:
Expand All @@ -826,7 +826,7 @@ func (ss *SentryServerImpl) HandShake(context.Context, *emptypb.Empty) (*proto_s
return reply, nil
}

func (ss *SentryServerImpl) SetStatus(ctx context.Context, statusData *proto_sentry.StatusData) (*proto_sentry.SetStatusReply, error) {
func (ss *GrpcServer) SetStatus(ctx context.Context, statusData *proto_sentry.StatusData) (*proto_sentry.SetStatusReply, error) {
genesisHash := gointerfaces.ConvertH256ToHash(statusData.ForkData.Genesis)

ss.lock.Lock()
Expand Down Expand Up @@ -869,7 +869,7 @@ func (ss *SentryServerImpl) SetStatus(ctx context.Context, statusData *proto_sen
return reply, nil
}

func (ss *SentryServerImpl) Peers(_ context.Context, _ *emptypb.Empty) (*proto_sentry.PeersReply, error) {
func (ss *GrpcServer) Peers(_ context.Context, _ *emptypb.Empty) (*proto_sentry.PeersReply, error) {
if ss.P2pServer == nil {
return nil, errors.New("p2p server was not started")
}
Expand Down Expand Up @@ -898,15 +898,15 @@ func (ss *SentryServerImpl) Peers(_ context.Context, _ *emptypb.Empty) (*proto_s
return &reply, nil
}

func (ss *SentryServerImpl) SimplePeerCount() (pc int) {
func (ss *GrpcServer) SimplePeerCount() (pc int) {
ss.rangePeers(func(peerInfo *PeerInfo) bool {
pc++
return true
})
return pc
}

func (ss *SentryServerImpl) PeerCount(_ context.Context, req *proto_sentry.PeerCountRequest) (*proto_sentry.PeerCountReply, error) {
func (ss *GrpcServer) PeerCount(_ context.Context, req *proto_sentry.PeerCountRequest) (*proto_sentry.PeerCountReply, error) {
return &proto_sentry.PeerCountReply{Count: uint64(ss.SimplePeerCount())}, nil
}

Expand All @@ -920,13 +920,13 @@ func setupDiscovery(urls []string) (enode.Iterator, error) {
return client.NewIterator(urls...)
}

func (ss *SentryServerImpl) GetStatus() *proto_sentry.StatusData {
func (ss *GrpcServer) GetStatus() *proto_sentry.StatusData {
ss.lock.RLock()
defer ss.lock.RUnlock()
return ss.statusData
}

func (ss *SentryServerImpl) send(msgID proto_sentry.MessageId, peerID [64]byte, b []byte) {
func (ss *GrpcServer) send(msgID proto_sentry.MessageId, peerID [64]byte, b []byte) {
ss.messageStreamsLock.RLock()
defer ss.messageStreamsLock.RUnlock()
req := &proto_sentry.InboundMessage{
Expand All @@ -950,14 +950,14 @@ func (ss *SentryServerImpl) send(msgID proto_sentry.MessageId, peerID [64]byte,
}
}

func (ss *SentryServerImpl) hasSubscribers(msgID proto_sentry.MessageId) bool {
func (ss *GrpcServer) hasSubscribers(msgID proto_sentry.MessageId) bool {
ss.messageStreamsLock.RLock()
defer ss.messageStreamsLock.RUnlock()
return ss.messageStreams[msgID] != nil && len(ss.messageStreams[msgID]) > 0
// log.Error("Sending msg to core P2P failed", "msg", proto_sentry.MessageId_name[int32(streamMsg.msgId)], "err", err)
}

func (ss *SentryServerImpl) addMessagesStream(ids []proto_sentry.MessageId, ch chan *proto_sentry.InboundMessage) func() {
func (ss *GrpcServer) addMessagesStream(ids []proto_sentry.MessageId, ch chan *proto_sentry.InboundMessage) func() {
ss.messageStreamsLock.Lock()
defer ss.messageStreamsLock.Unlock()
if ss.messageStreams == nil {
Expand Down Expand Up @@ -985,7 +985,7 @@ func (ss *SentryServerImpl) addMessagesStream(ids []proto_sentry.MessageId, ch c
}

const MessagesQueueSize = 1024 // one such queue per client of .Messages stream
func (ss *SentryServerImpl) Messages(req *proto_sentry.MessagesRequest, server proto_sentry.Sentry_MessagesServer) error {
func (ss *GrpcServer) Messages(req *proto_sentry.MessagesRequest, server proto_sentry.Sentry_MessagesServer) error {
log.Trace("[Messages] new subscriber", "to", req.Ids)
ch := make(chan *proto_sentry.InboundMessage, MessagesQueueSize)
defer close(ch)
Expand All @@ -1008,25 +1008,25 @@ func (ss *SentryServerImpl) Messages(req *proto_sentry.MessagesRequest, server p
}

// Close performs cleanup operations for the sentry
func (ss *SentryServerImpl) Close() {
func (ss *GrpcServer) Close() {
if ss.P2pServer != nil {
ss.P2pServer.Stop()
}
}

func (ss *SentryServerImpl) sendNewPeerToClients(peerID *proto_types.H512) {
func (ss *GrpcServer) sendNewPeerToClients(peerID *proto_types.H512) {
if err := ss.peersStreams.Broadcast(&proto_sentry.PeerEvent{PeerId: peerID, EventId: proto_sentry.PeerEvent_Connect}); err != nil {
log.Warn("Sending new peer notice to core P2P failed", "err", err)
}
}

func (ss *SentryServerImpl) sendGonePeerToClients(peerID *proto_types.H512) {
func (ss *GrpcServer) sendGonePeerToClients(peerID *proto_types.H512) {
if err := ss.peersStreams.Broadcast(&proto_sentry.PeerEvent{PeerId: peerID, EventId: proto_sentry.PeerEvent_Disconnect}); err != nil {
log.Warn("Sending gone peer notice to core P2P failed", "err", err)
}
}

func (ss *SentryServerImpl) PeerEvents(req *proto_sentry.PeerEventsRequest, server proto_sentry.Sentry_PeerEventsServer) error {
func (ss *GrpcServer) PeerEvents(req *proto_sentry.PeerEventsRequest, server proto_sentry.Sentry_PeerEventsServer) error {
clean := ss.peersStreams.Add(server)
defer clean()
select {
Expand All @@ -1037,7 +1037,7 @@ func (ss *SentryServerImpl) PeerEvents(req *proto_sentry.PeerEventsRequest, serv
}
}

func (ss *SentryServerImpl) NodeInfo(_ context.Context, _ *emptypb.Empty) (*proto_types.NodeInfoReply, error) {
func (ss *GrpcServer) NodeInfo(_ context.Context, _ *emptypb.Empty) (*proto_types.NodeInfoReply, error) {
if ss.P2pServer == nil {
return nil, errors.New("p2p server was not started")
}
Expand Down
Loading

0 comments on commit 2c26583

Please sign in to comment.