Skip to content

Commit

Permalink
core/rawdb: use atomic int added in go1.19 (#26935)
Browse files Browse the repository at this point in the history
  • Loading branch information
s7v7nislands committed Mar 21, 2023
1 parent 8a9a73c commit 905a723
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 100 deletions.
24 changes: 11 additions & 13 deletions core/rawdb/chain_freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ const (
// The background thread will keep moving ancient chain segments from key-value
// database to flat files for saving space on live database.
type chainFreezer struct {
// WARNING: The `threshold` field is accessed atomically. On 32 bit platforms, only
// 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
// so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)
threshold atomic.Uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)

*Freezer
quit chan struct{}
Expand All @@ -60,12 +57,13 @@ func newChainFreezer(datadir string, namespace string, readonly bool) (*chainFre
if err != nil {
return nil, err
}
return &chainFreezer{
Freezer: freezer,
threshold: params.FullImmutabilityThreshold,
quit: make(chan struct{}),
trigger: make(chan chan struct{}),
}, nil
cf := chainFreezer{
Freezer: freezer,
quit: make(chan struct{}),
trigger: make(chan chan struct{}),
}
cf.threshold.Store(params.FullImmutabilityThreshold)
return &cf, nil
}

// Close closes the chain freezer instance and terminates the background thread.
Expand Down Expand Up @@ -124,8 +122,8 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
continue
}
number := ReadHeaderNumber(nfdb, hash)
threshold := atomic.LoadUint64(&f.threshold)
frozen := atomic.LoadUint64(&f.frozen)
threshold := f.threshold.Load()
frozen := f.frozen.Load()
switch {
case number == nil:
log.Error("Current full block number unavailable", "hash", hash)
Expand Down Expand Up @@ -186,7 +184,7 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {

// Wipe out side chains also and track dangling side chains
var dangling []common.Hash
frozen = atomic.LoadUint64(&f.frozen) // Needs reload after during freezeRange
frozen = f.frozen.Load() // Needs reload after during freezeRange
for number := first; number < frozen; number++ {
// Always keep the genesis block in active database
if number != 0 {
Expand Down
5 changes: 3 additions & 2 deletions core/rawdb/chain_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,12 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool
}
}
// process runs in parallel
nThreadsAlive := int32(threads)
var nThreadsAlive atomic.Int32
nThreadsAlive.Store(int32(threads))
process := func() {
defer func() {
// Last processor closes the result channel
if atomic.AddInt32(&nThreadsAlive, -1) == 0 {
if nThreadsAlive.Add(-1) == 0 {
close(hashesCh)
}
}()
Expand Down
7 changes: 3 additions & 4 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"path"
"path/filepath"
"strings"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -72,9 +71,9 @@ func (frdb *freezerdb) Freeze(threshold uint64) error {
}
// Set the freezer threshold to a temporary value
defer func(old uint64) {
atomic.StoreUint64(&frdb.AncientStore.(*chainFreezer).threshold, old)
}(atomic.LoadUint64(&frdb.AncientStore.(*chainFreezer).threshold))
atomic.StoreUint64(&frdb.AncientStore.(*chainFreezer).threshold, threshold)
frdb.AncientStore.(*chainFreezer).threshold.Store(old)
}(frdb.AncientStore.(*chainFreezer).threshold.Load())
frdb.AncientStore.(*chainFreezer).threshold.Store(threshold)

// Trigger a freeze cycle and block until it's done
trigger := make(chan struct{}, 1)
Expand Down
53 changes: 25 additions & 28 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,8 @@ const freezerTableSize = 2 * 1000 * 1000 * 1000
// reserving it for go-ethereum. This would also reduce the memory requirements
// of Geth, and thus also GC overhead.
type Freezer struct {
// WARNING: The `frozen` and `tail` fields are accessed atomically. On 32 bit platforms, only
// 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
// so take advantage of that (https://golang.org/pkg/sync/atomic/#pkg-note-BUG).
frozen uint64 // Number of blocks already frozen
tail uint64 // Number of the first stored item in the freezer
frozen atomic.Uint64 // Number of blocks already frozen
tail atomic.Uint64 // Number of the first stored item in the freezer

// This lock synchronizes writers and the truncate operation, as well as
// the "atomic" (batched) read operations.
Expand Down Expand Up @@ -212,12 +209,12 @@ func (f *Freezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]

// Ancients returns the length of the frozen items.
func (f *Freezer) Ancients() (uint64, error) {
return atomic.LoadUint64(&f.frozen), nil
return f.frozen.Load(), nil
}

// Tail returns the number of first stored item in the freezer.
func (f *Freezer) Tail() (uint64, error) {
return atomic.LoadUint64(&f.tail), nil
return f.tail.Load(), nil
}

// AncientSize returns the ancient size of the specified category.
Expand Down Expand Up @@ -251,7 +248,7 @@ func (f *Freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize
defer f.writeLock.Unlock()

// Roll back all tables to the starting position in case of error.
prevItem := atomic.LoadUint64(&f.frozen)
prevItem := f.frozen.Load()
defer func() {
if err != nil {
// The write operation has failed. Go back to the previous item position.
Expand All @@ -272,7 +269,7 @@ func (f *Freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize
if err != nil {
return 0, err
}
atomic.StoreUint64(&f.frozen, item)
f.frozen.Store(item)
return writeSize, nil
}

Expand All @@ -284,15 +281,15 @@ func (f *Freezer) TruncateHead(items uint64) error {
f.writeLock.Lock()
defer f.writeLock.Unlock()

if atomic.LoadUint64(&f.frozen) <= items {
if f.frozen.Load() <= items {
return nil
}
for _, table := range f.tables {
if err := table.truncateHead(items); err != nil {
return err
}
}
atomic.StoreUint64(&f.frozen, items)
f.frozen.Store(items)
return nil
}

Expand All @@ -304,15 +301,15 @@ func (f *Freezer) TruncateTail(tail uint64) error {
f.writeLock.Lock()
defer f.writeLock.Unlock()

if atomic.LoadUint64(&f.tail) >= tail {
if f.tail.Load() >= tail {
return nil
}
for _, table := range f.tables {
if err := table.truncateTail(tail); err != nil {
return err
}
}
atomic.StoreUint64(&f.tail, tail)
f.tail.Store(tail)
return nil
}

Expand Down Expand Up @@ -343,22 +340,22 @@ func (f *Freezer) validate() error {
)
// Hack to get boundary of any table
for kind, table := range f.tables {
head = atomic.LoadUint64(&table.items)
tail = atomic.LoadUint64(&table.itemHidden)
head = table.items.Load()
tail = table.itemHidden.Load()
name = kind
break
}
// Now check every table against those boundaries.
for kind, table := range f.tables {
if head != atomic.LoadUint64(&table.items) {
return fmt.Errorf("freezer tables %s and %s have differing head: %d != %d", kind, name, atomic.LoadUint64(&table.items), head)
if head != table.items.Load() {
return fmt.Errorf("freezer tables %s and %s have differing head: %d != %d", kind, name, table.items.Load(), head)
}
if tail != atomic.LoadUint64(&table.itemHidden) {
return fmt.Errorf("freezer tables %s and %s have differing tail: %d != %d", kind, name, atomic.LoadUint64(&table.itemHidden), tail)
if tail != table.itemHidden.Load() {
return fmt.Errorf("freezer tables %s and %s have differing tail: %d != %d", kind, name, table.itemHidden.Load(), tail)
}
}
atomic.StoreUint64(&f.frozen, head)
atomic.StoreUint64(&f.tail, tail)
f.frozen.Store(head)
f.tail.Store(tail)
return nil
}

Expand All @@ -369,11 +366,11 @@ func (f *Freezer) repair() error {
tail = uint64(0)
)
for _, table := range f.tables {
items := atomic.LoadUint64(&table.items)
items := table.items.Load()
if head > items {
head = items
}
hidden := atomic.LoadUint64(&table.itemHidden)
hidden := table.itemHidden.Load()
if hidden > tail {
tail = hidden
}
Expand All @@ -386,8 +383,8 @@ func (f *Freezer) repair() error {
return err
}
}
atomic.StoreUint64(&f.frozen, head)
atomic.StoreUint64(&f.tail, tail)
f.frozen.Store(head)
f.tail.Store(tail)
return nil
}

Expand All @@ -413,7 +410,7 @@ func (f *Freezer) MigrateTable(kind string, convert convertLegacyFn) error {
// and that error will be returned.
forEach := func(t *freezerTable, offset uint64, fn func(uint64, []byte) error) error {
var (
items = atomic.LoadUint64(&t.items)
items = t.items.Load()
batchSize = uint64(1024)
maxBytes = uint64(1024 * 1024)
)
Expand All @@ -436,7 +433,7 @@ func (f *Freezer) MigrateTable(kind string, convert convertLegacyFn) error {
}
// TODO(s1na): This is a sanity-check since as of now no process does tail-deletion. But the migration
// process assumes no deletion at tail and needs to be modified to account for that.
if table.itemOffset > 0 || table.itemHidden > 0 {
if table.itemOffset.Load() > 0 || table.itemHidden.Load() > 0 {
return fmt.Errorf("migration not supported for tail-deleted freezers")
}
ancientsPath := filepath.Dir(table.index.Name())
Expand All @@ -452,7 +449,7 @@ func (f *Freezer) MigrateTable(kind string, convert convertLegacyFn) error {
out []byte
start = time.Now()
logged = time.Now()
offset = newTable.items
offset = newTable.items.Load()
)
if offset > 0 {
log.Info("found previous migration attempt", "migrated", offset)
Expand Down
5 changes: 2 additions & 3 deletions core/rawdb/freezer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package rawdb

import (
"fmt"
"sync/atomic"

"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/rlp"
Expand Down Expand Up @@ -107,7 +106,7 @@ func (t *freezerTable) newBatch() *freezerTableBatch {
func (batch *freezerTableBatch) reset() {
batch.dataBuffer = batch.dataBuffer[:0]
batch.indexBuffer = batch.indexBuffer[:0]
batch.curItem = atomic.LoadUint64(&batch.t.items)
batch.curItem = batch.t.items.Load()
batch.totalBytes = 0
}

Expand Down Expand Up @@ -201,7 +200,7 @@ func (batch *freezerTableBatch) commit() error {

// Update headBytes of table.
batch.t.headBytes += dataSize
atomic.StoreUint64(&batch.t.items, batch.curItem)
batch.t.items.Store(batch.curItem)

// Update metrics.
batch.t.sizeGauge.Inc(dataSize + indexSize)
Expand Down
Loading

0 comments on commit 905a723

Please sign in to comment.