Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/state: lock-free subfetcher #30782

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 89 additions & 101 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,8 @@ type subfetcher struct {
addr common.Address // Address of the account that the trie belongs to
trie Trie // Trie being populated with nodes

tasks []*subfetcherTask // Items queued up for retrieval
lock sync.Mutex // Lock protecting the task queue
tasks chan []*subfetcherTask // Queue items for retrieval

wake chan struct{} // Wake channel if a new task is scheduled
stop chan struct{} // Channel to interrupt processing
term chan struct{} // Channel to signal interruption

Expand Down Expand Up @@ -267,7 +265,7 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo
owner: owner,
root: root,
addr: addr,
wake: make(chan struct{}, 1),
tasks: make(chan []*subfetcherTask),
stop: make(chan struct{}),
term: make(chan struct{}),
seenReadAddr: make(map[common.Address]struct{}),
Expand All @@ -281,30 +279,22 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo

// schedule adds a batch of trie keys to the queue to prefetch.
func (sf *subfetcher) schedule(addrs []common.Address, slots []common.Hash, read bool) error {
// Ensure the subfetcher is still alive
select {
case <-sf.term:
return errTerminated
default:
}
// Append the tasks to the current queue
sf.lock.Lock()
tasks := make([]*subfetcherTask, 0, len(addrs)+len(slots))
for _, addr := range addrs {
sf.tasks = append(sf.tasks, &subfetcherTask{read: read, addr: &addr})
tasks = append(tasks, &subfetcherTask{read: read, addr: &addr})
}
for _, slot := range slots {
sf.tasks = append(sf.tasks, &subfetcherTask{read: read, slot: &slot})
tasks = append(tasks, &subfetcherTask{read: read, slot: &slot})
}
sf.lock.Unlock()

// Notify the background thread to execute scheduled tasks
select {
case sf.wake <- struct{}{}:
// Wake signal sent
default:
// Wake signal not sent as a previous one is already queued
case sf.tasks <- tasks:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now blocking. Previously, any number of threads could append to the sf.tasks, and they could all exit afterwards, since they select was not blocking.

Now, instead, they all allocate their own slice of tasks, and each thread need to wait until it has been delivered over the sf.tasks channel.

This seems to be quite a bit change in semantics, yet not something you mentioned on the PR description. I'm guessing it was unintentional. Unless I have misunderstood something, I think this highlights why we should reject this PR: it touches some very intricate parts of geth, which currently works fine. It's very easy to slip when making changes here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've responsed to this in main conversation thread.

return nil
case <-sf.stop:
return errTerminated // imminently so no need to wait for sf.term
case <-sf.term:
return errTerminated
}
return nil
}

// wait blocks until the subfetcher terminates. This method is used to block on
Expand Down Expand Up @@ -379,92 +369,90 @@ func (sf *subfetcher) loop() {
if err := sf.openTrie(); err != nil {
return
}
for {
select {
case <-sf.wake:
// Execute all remaining tasks in a single run
sf.lock.Lock()
tasks := sf.tasks
sf.tasks = nil
sf.lock.Unlock()

for _, task := range tasks {
if task.addr != nil {
key := *task.addr
if task.read {
if _, ok := sf.seenReadAddr[key]; ok {
sf.dupsRead++
continue
}
if _, ok := sf.seenWriteAddr[key]; ok {
sf.dupsCross++
continue
}
} else {
if _, ok := sf.seenReadAddr[key]; ok {
sf.dupsCross++
continue
}
if _, ok := sf.seenWriteAddr[key]; ok {
sf.dupsWrite++
continue
}
}
} else {
key := *task.slot
if task.read {
if _, ok := sf.seenReadSlot[key]; ok {
sf.dupsRead++
continue
}
if _, ok := sf.seenWriteSlot[key]; ok {
sf.dupsCross++
continue
}
} else {
if _, ok := sf.seenReadSlot[key]; ok {
sf.dupsCross++
continue
}
if _, ok := sf.seenWriteSlot[key]; ok {
sf.dupsWrite++
continue
}

work := make(chan *subfetcherTask)
go func() {
var wg sync.WaitGroup
for {
select {
case tasks := <-sf.tasks:
wg.Add(1)
go func() {
defer wg.Done()
for _, t := range tasks {
work <- t
}
}()

case <-sf.stop:
wg.Wait() // guarantee of no more sends on `work`
close(work)
return
}
}
}()

for task := range work {
if task.addr != nil {
key := *task.addr
if task.read {
if _, ok := sf.seenReadAddr[key]; ok {
sf.dupsRead++
continue
}
if task.addr != nil {
sf.trie.GetAccount(*task.addr)
} else {
sf.trie.GetStorage(sf.addr, (*task.slot)[:])
if _, ok := sf.seenWriteAddr[key]; ok {
sf.dupsCross++
continue
}
if task.read {
if task.addr != nil {
sf.seenReadAddr[*task.addr] = struct{}{}
} else {
sf.seenReadSlot[*task.slot] = struct{}{}
}
} else {
if task.addr != nil {
sf.seenWriteAddr[*task.addr] = struct{}{}
} else {
sf.seenWriteSlot[*task.slot] = struct{}{}
}
} else {
if _, ok := sf.seenReadAddr[key]; ok {
sf.dupsCross++
continue
}
if _, ok := sf.seenWriteAddr[key]; ok {
sf.dupsWrite++
continue
}
}

case <-sf.stop:
// Termination is requested, abort if no more tasks are pending. If
// there are some, exhaust them first.
sf.lock.Lock()
done := sf.tasks == nil
sf.lock.Unlock()

if done {
return
} else {
key := *task.slot
if task.read {
if _, ok := sf.seenReadSlot[key]; ok {
sf.dupsRead++
continue
}
if _, ok := sf.seenWriteSlot[key]; ok {
sf.dupsCross++
continue
}
} else {
if _, ok := sf.seenReadSlot[key]; ok {
sf.dupsCross++
continue
}
if _, ok := sf.seenWriteSlot[key]; ok {
sf.dupsWrite++
continue
}
}
}
if task.addr != nil {
sf.trie.GetAccount(*task.addr)
} else {
sf.trie.GetStorage(sf.addr, (*task.slot)[:])
}
if task.read {
if task.addr != nil {
sf.seenReadAddr[*task.addr] = struct{}{}
} else {
sf.seenReadSlot[*task.slot] = struct{}{}
}
} else {
if task.addr != nil {
sf.seenWriteAddr[*task.addr] = struct{}{}
} else {
sf.seenWriteSlot[*task.slot] = struct{}{}
}
// Some tasks are pending, loop and pick them up (that wake branch
// will be selected eventually, whilst stop remains closed to this
// branch will also run afterwards).
}
}
}
53 changes: 53 additions & 0 deletions core/state/trie_prefetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package state

import (
"math/big"
"sync"
"sync/atomic"
"testing"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -66,6 +68,57 @@ func TestUseAfterTerminate(t *testing.T) {
}
}

func TestSchdeulerTerminationRaceCondition(t *testing.T) {
// The lock-based implementation of [subfetcher] had a race condition
// whereby schedule() could obtain the lock after the <-sf.stop branch of
// loop() had already checked for an empty queue. Although probabilistic,
// this test reliably triggered at a rates of (~4 in 10k) and (~100 in 50k)
// on an Apple M3 Max chip.

t.Parallel()
db := filledStateDB()
skey := common.HexToHash("aaa")

// Maximise concurrency by synchronising all scheduling and termination.
start := make(chan struct{})
var (
wg sync.WaitGroup
raceInduced atomic.Uint64
)

const numTrials = 50_000
for i := 0; i < numTrials; i++ {
wg.Add(2)
fetcher := newSubfetcher(db.db, db.originalRoot, common.Hash{}, db.originalRoot, common.Address{})

var gotScheduleErr error
doneScheduling := make(chan struct{})
go func() {
defer wg.Done()
<-start
gotScheduleErr = fetcher.schedule(nil, []common.Hash{skey}, false)
close(doneScheduling)
}()

go func() {
defer wg.Done()
<-start
fetcher.terminate(false /*async*/)

<-doneScheduling
if gotScheduleErr == nil && len(fetcher.tasks) > 0 {
raceInduced.Add(1)
}
}()
}

close(start)
wg.Wait()
if got := raceInduced.Load(); got > 0 {
t.Errorf("In %d/%d concurrent trials %T.schedule() returned nil error but >0 tasks remain in queue after %[3]T.terminate([blocking]) returned", got, numTrials, &subfetcher{})
}
}

func TestVerklePrefetcher(t *testing.T) {
disk := rawdb.NewMemoryDatabase()
db := triedb.NewDatabase(disk, triedb.VerkleDefaults)
Expand Down