Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

les: wait all task routines before drop the peer #20010

Merged
merged 3 commits into from
Aug 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion les/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"math/big"
"math/rand"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -312,7 +313,7 @@ func (h *serverHandler) measure(setup *benchmarkSetup, count int) error {
}()
go func() {
for i := 0; i < count; i++ {
if err := h.handleMsg(serverPeer); err != nil {
if err := h.handleMsg(serverPeer, &sync.WaitGroup{}); err != nil {
errCh <- err
return
}
Expand Down
68 changes: 38 additions & 30 deletions les/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,52 +181,53 @@ func (f *clientPool) stop() {
f.lock.Unlock()
}

// registerPeer implements peerSetNotify
func (f *clientPool) registerPeer(p *peer) {
c := f.connect(p, 0)
if c != nil {
p.balanceTracker = &c.balanceTracker
}
}

// connect should be called after a successful handshake. If the connection was
// rejected, there is no need to call disconnect.
func (f *clientPool) connect(peer clientPeer, capacity uint64) *clientInfo {
func (f *clientPool) connect(peer clientPeer, capacity uint64) bool {
f.lock.Lock()
defer f.lock.Unlock()

// Short circuit is clientPool is already closed.
if f.closed {
return nil
return false
}
address := peer.freeClientId()
id := peer.ID()
idStr := peerIdToString(id)
// Dedup connected peers.
id, freeID := peer.ID(), peer.freeClientId()
if _, ok := f.connectedMap[id]; ok {
clientRejectedMeter.Mark(1)
log.Debug("Client already connected", "address", address, "id", idStr)
return nil
log.Debug("Client already connected", "address", freeID, "id", peerIdToString(id))
return false
}
// Create a clientInfo but do not add it yet
now := f.clock.Now()
// create a clientInfo but do not add it yet
e := &clientInfo{pool: f, peer: peer, address: address, queueIndex: -1, id: id}
posBalance := f.getPosBalance(id).value
e.priority = posBalance != 0
e := &clientInfo{pool: f, peer: peer, address: freeID, queueIndex: -1, id: id, priority: posBalance != 0}

var negBalance uint64
nb := f.negBalanceMap[address]
nb := f.negBalanceMap[freeID]
if nb != nil {
negBalance = uint64(math.Exp(float64(nb.logValue-f.logOffset(now)) / fixedPointMultiplier))
}
// If the client is a free client, assign with a low free capacity,
// Otherwise assign with the given value(priority client)
if !e.priority {
capacity = f.freeClientCap
}
// check whether it fits into connectedQueue
// Ensure the capacity will never lower than the free capacity.
if capacity < f.freeClientCap {
capacity = f.freeClientCap
}
e.capacity = capacity

e.balanceTracker.init(f.clock, capacity)
e.balanceTracker.setBalance(posBalance, negBalance)
f.setClientPriceFactors(e)

// If the number of clients already connected in the clientpool exceeds its
// capacity, evict some clients with lowest priority.
//
// If the priority of the newly added client is lower than the priority of
// all connected clients, the client is rejected.
newCapacity := f.connectedCapacity + capacity
newCount := f.connectedQueue.Size() + 1
if newCapacity > f.capacityLimit || newCount > f.countLimit {
Expand All @@ -248,8 +249,8 @@ func (f *clientPool) connect(peer clientPeer, capacity uint64) *clientInfo {
f.connectedQueue.Push(c)
}
clientRejectedMeter.Mark(1)
log.Debug("Client rejected", "address", address, "id", idStr)
return nil
log.Debug("Client rejected", "address", freeID, "id", peerIdToString(id))
return false
}
// accept new client, drop old ones
for _, c := range kickList {
Expand All @@ -258,7 +259,7 @@ func (f *clientPool) connect(peer clientPeer, capacity uint64) *clientInfo {
}
// client accepted, finish setting it up
if nb != nil {
delete(f.negBalanceMap, address)
delete(f.negBalanceMap, freeID)
f.negBalanceQueue.Remove(nb.queueIndex)
}
if e.priority {
Expand All @@ -272,13 +273,8 @@ func (f *clientPool) connect(peer clientPeer, capacity uint64) *clientInfo {
e.peer.updateCapacity(e.capacity)
}
clientConnectedMeter.Mark(1)
log.Debug("Client accepted", "address", address)
return e
}

// unregisterPeer implements peerSetNotify
func (f *clientPool) unregisterPeer(p *peer) {
f.disconnect(p)
log.Debug("Client accepted", "address", freeID)
return true
}

// disconnect should be called when a connection is terminated. If the disconnection
Expand Down Expand Up @@ -378,6 +374,18 @@ func (f *clientPool) setLimits(count int, totalCap uint64) {
})
}

// requestCost feeds request cost after serving a request from the given peer.
func (f *clientPool) requestCost(p *peer, cost uint64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove the balanceTracker link from peer and use this function instead? We can do this if there is a good reason, I'm just curious why you needed this extra function and extra map lookup every time the stats are updated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think balanceTracker is just to collect stat for clientpool, so other components other than clientpool should not know and maintain these things.

Although we introduce another map lookup, it's really trivial. I think it's easier to maintain.

f.lock.Lock()
defer f.lock.Unlock()

info, exist := f.connectedMap[p.ID()]
if !exist || f.closed {
return
}
info.balanceTracker.requestCost(cost)
}

// logOffset calculates the time-dependent offset for the logarithmic
// representation of negative balance
func (f *clientPool) logOffset(now mclock.AbsTime) int64 {
Expand Down
10 changes: 5 additions & 5 deletions les/clientpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ func testClientPool(t *testing.T, connLimit, clientCount, paidCount int, randomD

// pool should accept new peers up to its connected limit
for i := 0; i < connLimit; i++ {
if pool.connect(poolTestPeer(i), 0) != nil {
if pool.connect(poolTestPeer(i), 0) {
connected[i] = true
} else {
t.Fatalf("Test peer #%d rejected", i)
}
}
// since all accepted peers are new and should not be kicked out, the next one should be rejected
if pool.connect(poolTestPeer(connLimit), 0) != nil {
if pool.connect(poolTestPeer(connLimit), 0) {
connected[connLimit] = true
t.Fatalf("Peer accepted over connected limit")
}
Expand All @@ -116,7 +116,7 @@ func testClientPool(t *testing.T, connLimit, clientCount, paidCount int, randomD
connTicks[i] += tickCounter
}
} else {
if pool.connect(poolTestPeer(i), 0) != nil {
if pool.connect(poolTestPeer(i), 0) {
connected[i] = true
connTicks[i] -= tickCounter
}
Expand Down Expand Up @@ -159,7 +159,7 @@ func testClientPool(t *testing.T, connLimit, clientCount, paidCount int, randomD
}

// a previously unknown peer should be accepted now
if pool.connect(poolTestPeer(54321), 0) == nil {
if !pool.connect(poolTestPeer(54321), 0) {
t.Fatalf("Previously unknown peer rejected")
}

Expand All @@ -173,7 +173,7 @@ func testClientPool(t *testing.T, connLimit, clientCount, paidCount int, randomD
pool.connect(poolTestPeer(i), 0)
}
// expect pool to remember known nodes and kick out one of them to accept a new one
if pool.connect(poolTestPeer(54322), 0) == nil {
if !pool.connect(poolTestPeer(54322), 0) {
t.Errorf("Previously unknown peer rejected after restarting pool")
}
pool.stop()
Expand Down
10 changes: 5 additions & 5 deletions les/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type peer struct {
sendQueue *execQueue

errCh chan error

// responseLock ensures that responses are queued in the same order as
// RequestProcessed is called
responseLock sync.Mutex
Expand All @@ -107,11 +108,10 @@ type peer struct {
updateTime mclock.AbsTime
frozen uint32 // 1 if client is in frozen state

fcClient *flowcontrol.ClientNode // nil if the peer is server only
fcServer *flowcontrol.ServerNode // nil if the peer is client only
fcParams flowcontrol.ServerParams
fcCosts requestCostTable
balanceTracker *balanceTracker // set by clientPool.connect, used and removed by serverHandler.
fcClient *flowcontrol.ClientNode // nil if the peer is server only
fcServer *flowcontrol.ServerNode // nil if the peer is client only
fcParams flowcontrol.ServerParams
fcCosts requestCostTable

trusted bool
onlyAnnounce bool
Expand Down
2 changes: 0 additions & 2 deletions les/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,7 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) {
maxCapacity = totalRecharge
}
srv.fcManager.SetCapacityLimits(srv.freeCapacity, maxCapacity, srv.freeCapacity*2)

srv.clientPool = newClientPool(srv.chainDb, srv.freeCapacity, 10000, mclock.System{}, func(id enode.ID) { go srv.peers.Unregister(peerIdToString(id)) })
srv.peers.notify(srv.clientPool)

checkpoint := srv.latestLocalCheckpoint()
if !checkpoint.Empty() {
Expand Down
42 changes: 32 additions & 10 deletions les/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ const (
MaxTxStatus = 256 // Amount of transactions to queried per request
)

var errTooManyInvalidRequest = errors.New("too many invalid requests made")
var (
errTooManyInvalidRequest = errors.New("too many invalid requests made")
errFullClientPool = errors.New("client pool is full")
)

// serverHandler is responsible for serving light client and process
// all incoming light requests.
Expand Down Expand Up @@ -124,23 +127,26 @@ func (h *serverHandler) handle(p *peer) error {
}
defer p.fcClient.Disconnect()

// Disconnect the inbound peer if it's rejected by clientPool
if !h.server.clientPool.connect(p, 0) {
p.Log().Debug("Light Ethereum peer registration failed", "err", errFullClientPool)
return errFullClientPool
}
// Register the peer locally
if err := h.server.peers.Register(p); err != nil {
h.server.clientPool.disconnect(p)
p.Log().Error("Light Ethereum peer registration failed", "err", err)
return err
}
clientConnectionGauge.Update(int64(h.server.peers.Len()))

// add dummy balance tracker for tests
if p.balanceTracker == nil {
p.balanceTracker = &balanceTracker{}
p.balanceTracker.init(&mclock.System{}, 1)
}
var wg sync.WaitGroup // Wait group used to track all in-flight task routines.

connectedAt := mclock.Now()
defer func() {
p.balanceTracker = nil
wg.Wait() // Ensure all background task routines have exited.
h.server.peers.Unregister(p.id)
h.server.clientPool.disconnect(p)
clientConnectionGauge.Update(int64(h.server.peers.Len()))
connectionTimer.Update(time.Duration(mclock.Now() - connectedAt))
}()
Expand All @@ -153,7 +159,7 @@ func (h *serverHandler) handle(p *peer) error {
return err
default:
}
if err := h.handleMsg(p); err != nil {
if err := h.handleMsg(p, &wg); err != nil {
p.Log().Debug("Light Ethereum message handling failed", "err", err)
return err
}
Expand All @@ -162,7 +168,7 @@ func (h *serverHandler) handle(p *peer) error {

// handleMsg is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
func (h *serverHandler) handleMsg(p *peer) error {
func (h *serverHandler) handleMsg(p *peer, wg *sync.WaitGroup) error {
// Read the next message from the remote peer, and ensure it's fully consumed
msg, err := p.rw.ReadMsg()
if err != nil {
Expand Down Expand Up @@ -243,7 +249,7 @@ func (h *serverHandler) handleMsg(p *peer) error {
// Feed cost tracker request serving statistic.
h.server.costTracker.updateStats(msg.Code, amount, servingTime, realCost)
// Reduce priority "balance" for the specific peer.
p.balanceTracker.requestCost(realCost)
h.server.clientPool.requestCost(p, realCost)
}
if reply != nil {
p.queueSend(func() {
Expand Down Expand Up @@ -273,7 +279,9 @@ func (h *serverHandler) handleMsg(p *peer) error {
}
query := req.Query
if accept(req.ReqID, query.Amount, MaxHeaderFetch) {
wg.Add(1)
go func() {
defer wg.Done()
hashMode := query.Origin.Hash != (common.Hash{})
first := true
maxNonCanonical := uint64(100)
Expand Down Expand Up @@ -387,7 +395,9 @@ func (h *serverHandler) handleMsg(p *peer) error {
)
reqCnt := len(req.Hashes)
if accept(req.ReqID, uint64(reqCnt), MaxBodyFetch) {
wg.Add(1)
go func() {
defer wg.Done()
for i, hash := range req.Hashes {
if i != 0 && !task.waitOrStop() {
sendResponse(req.ReqID, 0, nil, task.servingTime)
Expand Down Expand Up @@ -433,7 +443,9 @@ func (h *serverHandler) handleMsg(p *peer) error {
)
reqCnt := len(req.Reqs)
if accept(req.ReqID, uint64(reqCnt), MaxCodeFetch) {
wg.Add(1)
go func() {
defer wg.Done()
for i, request := range req.Reqs {
if i != 0 && !task.waitOrStop() {
sendResponse(req.ReqID, 0, nil, task.servingTime)
Expand Down Expand Up @@ -502,7 +514,9 @@ func (h *serverHandler) handleMsg(p *peer) error {
)
reqCnt := len(req.Hashes)
if accept(req.ReqID, uint64(reqCnt), MaxReceiptFetch) {
wg.Add(1)
go func() {
defer wg.Done()
for i, hash := range req.Hashes {
if i != 0 && !task.waitOrStop() {
sendResponse(req.ReqID, 0, nil, task.servingTime)
Expand Down Expand Up @@ -557,7 +571,9 @@ func (h *serverHandler) handleMsg(p *peer) error {
)
reqCnt := len(req.Reqs)
if accept(req.ReqID, uint64(reqCnt), MaxProofsFetch) {
wg.Add(1)
go func() {
defer wg.Done()
nodes := light.NewNodeSet()

for i, request := range req.Reqs {
Expand Down Expand Up @@ -658,7 +674,9 @@ func (h *serverHandler) handleMsg(p *peer) error {
)
reqCnt := len(req.Reqs)
if accept(req.ReqID, uint64(reqCnt), MaxHelperTrieProofsFetch) {
wg.Add(1)
go func() {
defer wg.Done()
var (
lastIdx uint64
lastType uint
Expand Down Expand Up @@ -725,7 +743,9 @@ func (h *serverHandler) handleMsg(p *peer) error {
}
reqCnt := len(req.Txs)
if accept(req.ReqID, uint64(reqCnt), MaxTxSend) {
wg.Add(1)
go func() {
defer wg.Done()
stats := make([]light.TxStatus, len(req.Txs))
for i, tx := range req.Txs {
if i != 0 && !task.waitOrStop() {
Expand Down Expand Up @@ -771,7 +791,9 @@ func (h *serverHandler) handleMsg(p *peer) error {
}
reqCnt := len(req.Hashes)
if accept(req.ReqID, uint64(reqCnt), MaxTxStatus) {
wg.Add(1)
go func() {
defer wg.Done()
stats := make([]light.TxStatus, len(req.Hashes))
for i, hash := range req.Hashes {
if i != 0 && !task.waitOrStop() {
Expand Down
2 changes: 2 additions & 0 deletions les/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da
}
server.costTracker, server.freeCapacity = newCostTracker(db, server.config)
server.costTracker.testCostList = testCostList(0) // Disable flow control mechanism.
server.clientPool = newClientPool(db, 1, 10000, clock, nil)
server.clientPool.setLimits(10000, 10000) // Assign enough capacity for clientpool
server.handler = newServerHandler(server, simulation.Blockchain(), db, txpool, func() bool { return true })
if server.oracle != nil {
server.oracle.start(simulation)
Expand Down