-
Notifications
You must be signed in to change notification settings - Fork 20.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
les: wait all task routines before drop the peer #20010
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -94,6 +94,8 @@ type peer struct { | |
sendQueue *execQueue | ||
|
||
errCh chan error | ||
wg sync.WaitGroup // Wait group used to track all in-flight task routines. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually this does not need to be a part of the peer struct because it is only used in the handler. |
||
|
||
// responseLock ensures that responses are queued in the same order as | ||
// RequestProcessed is called | ||
responseLock sync.Mutex | ||
|
@@ -107,11 +109,10 @@ type peer struct { | |
updateTime mclock.AbsTime | ||
frozen uint32 // 1 if client is in frozen state | ||
|
||
fcClient *flowcontrol.ClientNode // nil if the peer is server only | ||
fcServer *flowcontrol.ServerNode // nil if the peer is client only | ||
fcParams flowcontrol.ServerParams | ||
fcCosts requestCostTable | ||
balanceTracker *balanceTracker // set by clientPool.connect, used and removed by serverHandler. | ||
fcClient *flowcontrol.ClientNode // nil if the peer is server only | ||
fcServer *flowcontrol.ServerNode // nil if the peer is client only | ||
fcParams flowcontrol.ServerParams | ||
fcCosts requestCostTable | ||
|
||
trusted bool | ||
onlyAnnounce bool | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,7 +54,10 @@ const ( | |
MaxTxStatus = 256 // Amount of transactions to queried per request | ||
) | ||
|
||
var errTooManyInvalidRequest = errors.New("too many invalid requests made") | ||
var ( | ||
errTooManyInvalidRequest = errors.New("too many invalid requests made") | ||
errFullClientPool = errors.New("client pool is full") | ||
) | ||
|
||
// serverHandler is responsible for serving light client and process | ||
// all incoming light requests. | ||
|
@@ -129,17 +132,17 @@ func (h *serverHandler) handle(p *peer) error { | |
p.Log().Error("Light Ethereum peer registration failed", "err", err) | ||
return err | ||
} | ||
clientConnectionGauge.Update(int64(h.server.peers.Len())) | ||
|
||
// add dummy balance tracker for tests | ||
if p.balanceTracker == nil { | ||
p.balanceTracker = &balanceTracker{} | ||
p.balanceTracker.init(&mclock.System{}, 1) | ||
// Disconnect the inbound peer if it's rejected by clientPool | ||
if h.server.clientPool.connect(p, 0) { | ||
p.Log().Debug("Light Ethereum peer registration failed", "err", errFullClientPool) | ||
return errFullClientPool | ||
} | ||
clientConnectionGauge.Update(int64(h.server.peers.Len())) | ||
|
||
connectedAt := mclock.Now() | ||
defer func() { | ||
p.balanceTracker = nil | ||
p.wg.Wait() // Ensure all background task routines have exited. | ||
h.server.clientPool.disconnect(p) | ||
h.server.peers.Unregister(p.id) | ||
clientConnectionGauge.Update(int64(h.server.peers.Len())) | ||
connectionTimer.Update(time.Duration(mclock.Now() - connectedAt)) | ||
|
@@ -243,7 +246,7 @@ func (h *serverHandler) handleMsg(p *peer) error { | |
// Feed cost tracker request serving statistic. | ||
h.server.costTracker.updateStats(msg.Code, amount, servingTime, realCost) | ||
// Reduce priority "balance" for the specific peer. | ||
p.balanceTracker.requestCost(realCost) | ||
h.server.clientPool.requestCost(p, realCost) | ||
} | ||
if reply != nil { | ||
p.queueSend(func() { | ||
|
@@ -273,7 +276,9 @@ func (h *serverHandler) handleMsg(p *peer) error { | |
} | ||
query := req.Query | ||
if accept(req.ReqID, query.Amount, MaxHeaderFetch) { | ||
p.wg.Add(1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could be a part of the |
||
go func() { | ||
defer p.wg.Done() | ||
hashMode := query.Origin.Hash != (common.Hash{}) | ||
first := true | ||
maxNonCanonical := uint64(100) | ||
|
@@ -387,7 +392,9 @@ func (h *serverHandler) handleMsg(p *peer) error { | |
) | ||
reqCnt := len(req.Hashes) | ||
if accept(req.ReqID, uint64(reqCnt), MaxBodyFetch) { | ||
p.wg.Add(1) | ||
go func() { | ||
defer p.wg.Done() | ||
for i, hash := range req.Hashes { | ||
if i != 0 && !task.waitOrStop() { | ||
sendResponse(req.ReqID, 0, nil, task.servingTime) | ||
|
@@ -433,7 +440,9 @@ func (h *serverHandler) handleMsg(p *peer) error { | |
) | ||
reqCnt := len(req.Reqs) | ||
if accept(req.ReqID, uint64(reqCnt), MaxCodeFetch) { | ||
p.wg.Add(1) | ||
go func() { | ||
defer p.wg.Done() | ||
for i, request := range req.Reqs { | ||
if i != 0 && !task.waitOrStop() { | ||
sendResponse(req.ReqID, 0, nil, task.servingTime) | ||
|
@@ -502,7 +511,9 @@ func (h *serverHandler) handleMsg(p *peer) error { | |
) | ||
reqCnt := len(req.Hashes) | ||
if accept(req.ReqID, uint64(reqCnt), MaxReceiptFetch) { | ||
p.wg.Add(1) | ||
go func() { | ||
defer p.wg.Done() | ||
for i, hash := range req.Hashes { | ||
if i != 0 && !task.waitOrStop() { | ||
sendResponse(req.ReqID, 0, nil, task.servingTime) | ||
|
@@ -557,7 +568,9 @@ func (h *serverHandler) handleMsg(p *peer) error { | |
) | ||
reqCnt := len(req.Reqs) | ||
if accept(req.ReqID, uint64(reqCnt), MaxProofsFetch) { | ||
p.wg.Add(1) | ||
go func() { | ||
defer p.wg.Done() | ||
nodes := light.NewNodeSet() | ||
|
||
for i, request := range req.Reqs { | ||
|
@@ -658,7 +671,9 @@ func (h *serverHandler) handleMsg(p *peer) error { | |
) | ||
reqCnt := len(req.Reqs) | ||
if accept(req.ReqID, uint64(reqCnt), MaxHelperTrieProofsFetch) { | ||
p.wg.Add(1) | ||
go func() { | ||
defer p.wg.Done() | ||
var ( | ||
lastIdx uint64 | ||
lastType uint | ||
|
@@ -725,7 +740,9 @@ func (h *serverHandler) handleMsg(p *peer) error { | |
} | ||
reqCnt := len(req.Txs) | ||
if accept(req.ReqID, uint64(reqCnt), MaxTxSend) { | ||
p.wg.Add(1) | ||
go func() { | ||
defer p.wg.Done() | ||
stats := make([]light.TxStatus, len(req.Txs)) | ||
for i, tx := range req.Txs { | ||
if i != 0 && !task.waitOrStop() { | ||
|
@@ -771,7 +788,9 @@ func (h *serverHandler) handleMsg(p *peer) error { | |
} | ||
reqCnt := len(req.Hashes) | ||
if accept(req.ReqID, uint64(reqCnt), MaxTxStatus) { | ||
p.wg.Add(1) | ||
go func() { | ||
defer p.wg.Done() | ||
stats := make([]light.TxStatus, len(req.Hashes)) | ||
for i, hash := range req.Hashes { | ||
if i != 0 && !task.waitOrStop() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you remove the
balanceTracker
link frompeer
and use this function instead? We can do this if there is a good reason, I'm just curious why you needed this extra function and extra map lookup every time the stats are updated.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think balanceTracker is just to collect stat for clientpool, so other components other than clientpool should not know and maintain these things.
Although we introduce another map lookup, it's really trivial. I think it's easier to maintain.