Skip to content

Commit

Permalink
les: separate peer into clientPeer and serverPeer (#19991)
Browse files Browse the repository at this point in the history
* les: separate peer into clientPeer and serverPeer

* les: address comments
  • Loading branch information
rjl493456442 authored Feb 26, 2020
1 parent fadf84a commit 4fabd9c
Show file tree
Hide file tree
Showing 25 changed files with 1,210 additions and 1,088 deletions.
53 changes: 26 additions & 27 deletions les/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type requestBenchmark interface {
// init initializes the generator for generating the given number of randomized requests
init(h *serverHandler, count int) error
// request initiates sending a single request to the given peer
request(peer *peer, index int) error
request(peer *serverPeer, index int) error
}

// benchmarkBlockHeaders implements requestBenchmark
Expand Down Expand Up @@ -72,11 +72,11 @@ func (b *benchmarkBlockHeaders) init(h *serverHandler, count int) error {
return nil
}

func (b *benchmarkBlockHeaders) request(peer *peer, index int) error {
func (b *benchmarkBlockHeaders) request(peer *serverPeer, index int) error {
if b.byHash {
return peer.RequestHeadersByHash(0, 0, b.hashes[index], b.amount, b.skip, b.reverse)
return peer.requestHeadersByHash(0, b.hashes[index], b.amount, b.skip, b.reverse)
} else {
return peer.RequestHeadersByNumber(0, 0, uint64(b.offset+rand.Int63n(b.randMax)), b.amount, b.skip, b.reverse)
return peer.requestHeadersByNumber(0, uint64(b.offset+rand.Int63n(b.randMax)), b.amount, b.skip, b.reverse)
}
}

Expand All @@ -95,11 +95,11 @@ func (b *benchmarkBodiesOrReceipts) init(h *serverHandler, count int) error {
return nil
}

func (b *benchmarkBodiesOrReceipts) request(peer *peer, index int) error {
func (b *benchmarkBodiesOrReceipts) request(peer *serverPeer, index int) error {
if b.receipts {
return peer.RequestReceipts(0, 0, []common.Hash{b.hashes[index]})
return peer.requestReceipts(0, []common.Hash{b.hashes[index]})
} else {
return peer.RequestBodies(0, 0, []common.Hash{b.hashes[index]})
return peer.requestBodies(0, []common.Hash{b.hashes[index]})
}
}

Expand All @@ -114,13 +114,13 @@ func (b *benchmarkProofsOrCode) init(h *serverHandler, count int) error {
return nil
}

func (b *benchmarkProofsOrCode) request(peer *peer, index int) error {
func (b *benchmarkProofsOrCode) request(peer *serverPeer, index int) error {
key := make([]byte, 32)
rand.Read(key)
if b.code {
return peer.RequestCode(0, 0, []CodeReq{{BHash: b.headHash, AccKey: key}})
return peer.requestCode(0, []CodeReq{{BHash: b.headHash, AccKey: key}})
} else {
return peer.RequestProofs(0, 0, []ProofReq{{BHash: b.headHash, Key: key}})
return peer.requestProofs(0, []ProofReq{{BHash: b.headHash, Key: key}})
}
}

Expand All @@ -144,7 +144,7 @@ func (b *benchmarkHelperTrie) init(h *serverHandler, count int) error {
return nil
}

func (b *benchmarkHelperTrie) request(peer *peer, index int) error {
func (b *benchmarkHelperTrie) request(peer *serverPeer, index int) error {
reqs := make([]HelperTrieReq, b.reqCount)

if b.bloom {
Expand All @@ -163,7 +163,7 @@ func (b *benchmarkHelperTrie) request(peer *peer, index int) error {
}
}

return peer.RequestHelperTrieProofs(0, 0, reqs)
return peer.requestHelperTrieProofs(0, reqs)
}

// benchmarkTxSend implements requestBenchmark
Expand All @@ -189,9 +189,9 @@ func (b *benchmarkTxSend) init(h *serverHandler, count int) error {
return nil
}

func (b *benchmarkTxSend) request(peer *peer, index int) error {
func (b *benchmarkTxSend) request(peer *serverPeer, index int) error {
enc, _ := rlp.EncodeToBytes(types.Transactions{b.txs[index]})
return peer.SendTxs(0, 0, enc)
return peer.sendTxs(0, enc)
}

// benchmarkTxStatus implements requestBenchmark
Expand All @@ -201,10 +201,10 @@ func (b *benchmarkTxStatus) init(h *serverHandler, count int) error {
return nil
}

func (b *benchmarkTxStatus) request(peer *peer, index int) error {
func (b *benchmarkTxStatus) request(peer *serverPeer, index int) error {
var hash common.Hash
rand.Read(hash[:])
return peer.RequestTxStatus(0, 0, []common.Hash{hash})
return peer.requestTxStatus(0, []common.Hash{hash})
}

// benchmarkSetup stores measurement data for a single benchmark type
Expand Down Expand Up @@ -283,18 +283,17 @@ func (h *serverHandler) measure(setup *benchmarkSetup, count int) error {
var id enode.ID
rand.Read(id[:])

clientPeer := newPeer(lpv2, NetworkId, false, p2p.NewPeer(id, "client", nil), clientMeteredPipe)
serverPeer := newPeer(lpv2, NetworkId, false, p2p.NewPeer(id, "server", nil), serverMeteredPipe)
serverPeer.sendQueue = newExecQueue(count)
serverPeer.announceType = announceTypeNone
serverPeer.fcCosts = make(requestCostTable)
peer1 := newServerPeer(lpv2, NetworkId, false, p2p.NewPeer(id, "client", nil), clientMeteredPipe)
peer2 := newClientPeer(lpv2, NetworkId, p2p.NewPeer(id, "server", nil), serverMeteredPipe)
peer2.announceType = announceTypeNone
peer2.fcCosts = make(requestCostTable)
c := &requestCosts{}
for code := range requests {
serverPeer.fcCosts[code] = c
peer2.fcCosts[code] = c
}
serverPeer.fcParams = flowcontrol.ServerParams{BufLimit: 1, MinRecharge: 1}
serverPeer.fcClient = flowcontrol.NewClientNode(h.server.fcManager, serverPeer.fcParams)
defer serverPeer.fcClient.Disconnect()
peer2.fcParams = flowcontrol.ServerParams{BufLimit: 1, MinRecharge: 1}
peer2.fcClient = flowcontrol.NewClientNode(h.server.fcManager, peer2.fcParams)
defer peer2.fcClient.Disconnect()

if err := setup.req.init(h, count); err != nil {
return err
Expand All @@ -305,15 +304,15 @@ func (h *serverHandler) measure(setup *benchmarkSetup, count int) error {

go func() {
for i := 0; i < count; i++ {
if err := setup.req.request(clientPeer, i); err != nil {
if err := setup.req.request(peer1, i); err != nil {
errCh <- err
return
}
}
}()
go func() {
for i := 0; i < count; i++ {
if err := h.handleMsg(serverPeer, &sync.WaitGroup{}); err != nil {
if err := h.handleMsg(peer2, &sync.WaitGroup{}); err != nil {
errCh <- err
return
}
Expand Down
9 changes: 5 additions & 4 deletions les/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
type LightEthereum struct {
lesCommons

peers *serverPeerSet
reqDist *requestDistributor
retriever *retrieveManager
odr *LesOdr
Expand Down Expand Up @@ -80,17 +81,17 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
}
log.Info("Initialised chain configuration", "config", chainConfig)

peers := newPeerSet()
peers := newServerPeerSet()
leth := &LightEthereum{
lesCommons: lesCommons{
genesis: genesisHash,
config: config,
chainConfig: chainConfig,
iConfig: light.DefaultClientIndexerConfig,
chainDb: chainDb,
peers: peers,
closeCh: make(chan struct{}),
},
peers: peers,
eventMux: ctx.EventMux,
reqDist: newRequestDistributor(peers, &mclock.System{}),
accountManager: ctx.AccountManager,
Expand Down Expand Up @@ -225,7 +226,7 @@ func (s *LightEthereum) EventMux() *event.TypeMux { return s.eventMux
// network protocols to start.
func (s *LightEthereum) Protocols() []p2p.Protocol {
return s.makeProtocols(ClientProtocolVersions, s.handler.runPeer, func(id enode.ID) interface{} {
if p := s.peers.Peer(peerIdToString(id)); p != nil {
if p := s.peers.peer(peerIdToString(id)); p != nil {
return p.Info()
}
return nil
Expand Down Expand Up @@ -253,7 +254,7 @@ func (s *LightEthereum) Start(srvr *p2p.Server) error {
// Ethereum protocol.
func (s *LightEthereum) Stop() error {
close(s.closeCh)
s.peers.Close()
s.peers.close()
s.reqDist.close()
s.odr.Stop()
s.relay.Stop()
Expand Down
61 changes: 31 additions & 30 deletions les/client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func newClientHandler(ulcServers []string, ulcFraction int, checkpoint *params.T
}
handler.fetcher = newLightFetcher(handler)
handler.downloader = downloader.New(height, backend.chainDb, nil, backend.eventMux, nil, backend.blockchain, handler.removePeer)
handler.backend.peers.notify((*downloaderPeerNotify)(handler))
handler.backend.peers.subscribe((*downloaderPeerNotify)(handler))
return handler
}

Expand All @@ -82,7 +82,8 @@ func (h *clientHandler) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter)
if h.ulc != nil {
trusted = h.ulc.trusted(p.ID())
}
peer := newPeer(int(version), h.backend.config.NetworkId, trusted, p, newMeteredMsgWriter(rw, int(version)))
peer := newServerPeer(int(version), h.backend.config.NetworkId, trusted, p, newMeteredMsgWriter(rw, int(version)))
defer peer.close()
peer.poolEntry = h.backend.serverPool.connect(peer, peer.Node())
if peer.poolEntry == nil {
return p2p.DiscRequested
Expand All @@ -94,8 +95,8 @@ func (h *clientHandler) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter)
return err
}

func (h *clientHandler) handle(p *peer) error {
if h.backend.peers.Len() >= h.backend.config.LightPeers && !p.Peer.Info().Network.Trusted {
func (h *clientHandler) handle(p *serverPeer) error {
if h.backend.peers.len() >= h.backend.config.LightPeers && !p.Peer.Info().Network.Trusted {
return p2p.DiscTooManyPeers
}
p.Log().Debug("Light Ethereum peer connected", "name", p.Name())
Expand All @@ -112,20 +113,20 @@ func (h *clientHandler) handle(p *peer) error {
return err
}
// Register the peer locally
if err := h.backend.peers.Register(p); err != nil {
if err := h.backend.peers.register(p); err != nil {
p.Log().Error("Light Ethereum peer registration failed", "err", err)
return err
}
serverConnectionGauge.Update(int64(h.backend.peers.Len()))
serverConnectionGauge.Update(int64(h.backend.peers.len()))

connectedAt := mclock.Now()
defer func() {
h.backend.peers.Unregister(p.id)
h.backend.peers.unregister(p.id)
connectionTimer.Update(time.Duration(mclock.Now() - connectedAt))
serverConnectionGauge.Update(int64(h.backend.peers.Len()))
serverConnectionGauge.Update(int64(h.backend.peers.len()))
}()

h.fetcher.announce(p, p.headInfo)
h.fetcher.announce(p, &announceData{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td})

// pool entry can be nil during the unit test.
if p.poolEntry != nil {
Expand All @@ -143,7 +144,7 @@ func (h *clientHandler) handle(p *peer) error {

// handleMsg is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
func (h *clientHandler) handleMsg(p *peer) error {
func (h *clientHandler) handleMsg(p *serverPeer) error {
// Read the next message from the remote peer, and ensure it's fully consumed
msg, err := p.rw.ReadMsg()
if err != nil {
Expand Down Expand Up @@ -297,7 +298,7 @@ func (h *clientHandler) handleMsg(p *peer) error {
Obj: resp.Status,
}
case StopMsg:
p.freezeServer(true)
p.freeze()
h.backend.retriever.frozen(p)
p.Log().Debug("Service stopped")
case ResumeMsg:
Expand All @@ -306,7 +307,7 @@ func (h *clientHandler) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.ResumeFreeze(bv)
p.freezeServer(false)
p.unfreeze()
p.Log().Debug("Service resumed")
default:
p.Log().Trace("Received invalid message", "code", msg.Code)
Expand All @@ -315,8 +316,8 @@ func (h *clientHandler) handleMsg(p *peer) error {
// Deliver the received response to retriever.
if deliverMsg != nil {
if err := h.backend.retriever.deliver(p, deliverMsg); err != nil {
p.responseErrors++
if p.responseErrors > maxResponseErrors {
p.errCount++
if p.errCount > maxResponseErrors {
return err
}
}
Expand All @@ -325,12 +326,12 @@ func (h *clientHandler) handleMsg(p *peer) error {
}

func (h *clientHandler) removePeer(id string) {
h.backend.peers.Unregister(id)
h.backend.peers.unregister(id)
}

type peerConnection struct {
handler *clientHandler
peer *peer
peer *serverPeer
}

func (pc *peerConnection) Head() (common.Hash, *big.Int) {
Expand All @@ -340,18 +341,18 @@ func (pc *peerConnection) Head() (common.Hash, *big.Int) {
func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
rq := &distReq{
getCost: func(dp distPeer) uint64 {
peer := dp.(*peer)
return peer.GetRequestCost(GetBlockHeadersMsg, amount)
peer := dp.(*serverPeer)
return peer.getRequestCost(GetBlockHeadersMsg, amount)
},
canSend: func(dp distPeer) bool {
return dp.(*peer) == pc.peer
return dp.(*serverPeer) == pc.peer
},
request: func(dp distPeer) func() {
reqID := genReqID()
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
peer := dp.(*serverPeer)
cost := peer.getRequestCost(GetBlockHeadersMsg, amount)
peer.fcServer.QueuedRequest(reqID, cost)
return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
return func() { peer.requestHeadersByHash(reqID, origin, amount, skip, reverse) }
},
}
_, ok := <-pc.handler.backend.reqDist.queue(rq)
Expand All @@ -364,18 +365,18 @@ func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, s
func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
rq := &distReq{
getCost: func(dp distPeer) uint64 {
peer := dp.(*peer)
return peer.GetRequestCost(GetBlockHeadersMsg, amount)
peer := dp.(*serverPeer)
return peer.getRequestCost(GetBlockHeadersMsg, amount)
},
canSend: func(dp distPeer) bool {
return dp.(*peer) == pc.peer
return dp.(*serverPeer) == pc.peer
},
request: func(dp distPeer) func() {
reqID := genReqID()
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
peer := dp.(*serverPeer)
cost := peer.getRequestCost(GetBlockHeadersMsg, amount)
peer.fcServer.QueuedRequest(reqID, cost)
return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
return func() { peer.requestHeadersByNumber(reqID, origin, amount, skip, reverse) }
},
}
_, ok := <-pc.handler.backend.reqDist.queue(rq)
Expand All @@ -388,7 +389,7 @@ func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip
// downloaderPeerNotify implements peerSetNotify
type downloaderPeerNotify clientHandler

func (d *downloaderPeerNotify) registerPeer(p *peer) {
func (d *downloaderPeerNotify) registerPeer(p *serverPeer) {
h := (*clientHandler)(d)
pc := &peerConnection{
handler: h,
Expand All @@ -397,7 +398,7 @@ func (d *downloaderPeerNotify) registerPeer(p *peer) {
h.downloader.RegisterLightPeer(p.id, ethVersion, pc)
}

func (d *downloaderPeerNotify) unregisterPeer(p *peer) {
func (d *downloaderPeerNotify) unregisterPeer(p *serverPeer) {
h := (*clientHandler)(d)
h.downloader.UnregisterPeer(p.id)
}
Loading

0 comments on commit 4fabd9c

Please sign in to comment.