Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stop writing to closed channel panic #6763

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 8 additions & 21 deletions ethdb/privateapi/ethbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,30 +268,17 @@ func convertPayloadStatus(payloadStatus *engineapi.PayloadStatus) *remote.Engine
}

func (s *EthBackendServer) stageLoopIsBusy() bool {
waiter := make(chan struct{})
defer libcommon.SafeClose(waiter)

busy := true
wg := sync.WaitGroup{}
wg.Add(1)

go func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
wait, ok := s.hd.BeaconRequestList.WaitForWaiting(ctx)
if !ok {
select {
case <-time.After(1 * time.Second):
// timed out so just call done
wg.Done()
case <-waiter:
// state is now waiting so we're not busy
busy = false
wg.Done()
case <-wait:
case <-ctx.Done():
}
}()

s.hd.BeaconRequestList.WaitForWaiting(waiter)

wg.Wait()
}

return busy
return !s.hd.BeaconRequestList.IsWaiting()
}

// EngineNewPayload validates and possibly executes payload
Expand Down
53 changes: 35 additions & 18 deletions turbo/engineapi/request_list.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package engineapi

import (
"context"
"sync"
"sync/atomic"

"github.com/emirpasic/gods/maps/treemap"

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"

"github.com/ledgerwatch/erigon/core/types"
"go.uber.org/atomic"
)

// This is the status of a newly execute block.
Expand Down Expand Up @@ -53,10 +54,16 @@ type RequestList struct {
requestId int
requests *treemap.Map // map[int]*RequestWithStatus
interrupt Interrupt
waiting uint32
waiting atomic.Uint32
syncCond *sync.Cond

// waiterMtx is used to place locks around `waiters` so the slice can be read/modified safely
// and also serves as a lock around the `waiting` field so that this can't be modified whilst it
// is being read. waiters is modified as part of updating waiter so we need to ensure these
// two things lock at the same time
waiterMtx sync.Mutex
waiters []chan struct{}

waiters []chan struct{}
}

func NewRequestList() *RequestList {
Expand All @@ -65,6 +72,7 @@ func NewRequestList() *RequestList {
syncCond: sync.NewCond(&sync.Mutex{}),
waiterMtx: sync.Mutex{},
waiters: make([]chan struct{}, 0),
waiting: atomic.Uint32{},
}
return rl
}
Expand Down Expand Up @@ -122,8 +130,14 @@ func (rl *RequestList) WaitForRequest(onlyNew bool, noWait bool) (interrupt Inte
rl.syncCond.L.Lock()
defer rl.syncCond.L.Unlock()

rl.waiterMtx.Lock()
rl.updateWaiting(1)
defer rl.updateWaiting(0)
rl.waiterMtx.Unlock()
defer func() {
rl.waiterMtx.Lock()
rl.updateWaiting(0)
rl.waiterMtx.Unlock()
}()

for {
interrupt = rl.interrupt
Expand All @@ -143,33 +157,36 @@ func (rl *RequestList) WaitForRequest(onlyNew bool, noWait bool) (interrupt Inte
}

func (rl *RequestList) IsWaiting() bool {
return atomic.LoadUint32(&rl.waiting) != 0
return rl.waiting.Load() != 0
}

// update waiting should always be called from a locked context using rl.waiterMtx as it
// updates rl.waiters in certain scenarios
func (rl *RequestList) updateWaiting(val uint32) {
rl.waiterMtx.Lock()
defer rl.waiterMtx.Unlock()
atomic.StoreUint32(&rl.waiting, val)
rl.waiting.Store(val)

if val == 1 {
// something might be waiting to be notified of the waiting state being ready
for _, c := range rl.waiters {
c <- struct{}{}
for i, c := range rl.waiters {
close(c)
rl.waiters[i] = nil
}
rl.waiters = make([]chan struct{}, 0)
rl.waiters = rl.waiters[:0]
}
}

func (rl *RequestList) WaitForWaiting(c chan struct{}) {
func (rl *RequestList) WaitForWaiting(ctx context.Context) (chan struct{}, bool) {
rl.waiterMtx.Lock()
defer rl.waiterMtx.Unlock()
val := atomic.LoadUint32(&rl.waiting)
if val == 1 {
// we are already waiting so just send to the channel and quit
c <- struct{}{}

isWaiting := rl.waiting.Load()

if isWaiting == 1 {
// we are already waiting so just return
return nil, true
} else {
// we need to register a waiter now to be notified when we are ready
c := make(chan struct{}, 1)
rl.waiters = append(rl.waiters, c)
return c, false
}
}

Expand Down