Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
janezpodhostnik committed Dec 12, 2023
1 parent 4a149e9 commit cacba0d
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 28 deletions.
15 changes: 12 additions & 3 deletions cmd/util/ledger/migrations/account_based_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,16 +239,23 @@ func MigrateGroupConcurrently(
case jobs <- job:
}
}
close(jobs)
}()

// read job results
logAccount := moduleUtil.LogProgress("processing account group", accountGroups.Len(), log)
logAccount := moduleUtil.LogProgress(
"processing account group",
accountGroups.Len(),
log,
)

migrated := make([]*ledger.Payload, 0)
migrated := make([]*ledger.Payload, accountGroups.AllPayloadsCount())
durations := newMigrationDurations(logTopNDurations)
contextDone := false
for i := 0; i < accountGroups.Len(); i++ {
select {
case <-ctx.Done():
contextDone = true
break
case result := <-resultCh:
durations.Add(result)
Expand All @@ -257,8 +264,10 @@ func MigrateGroupConcurrently(
migrated = append(migrated, accountMigrated...)
logAccount(1)
}
if contextDone {
break
}
}
close(jobs)

// make sure to exit all workers before returning from this function
// so that the migrator can be closed properly
Expand Down
8 changes: 4 additions & 4 deletions cmd/util/ledger/migrations/storage_used_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (m *AccountUsageMigrator) MigrateAccount(
return nil, fmt.Errorf("cannot create new payload with value: %w", err)
}

payloads[statusIndex] = &newPayload
payloads[statusIndex] = newPayload

return payloads, nil
}
Expand Down Expand Up @@ -120,13 +120,13 @@ func (m *AccountUsageMigrator) compareUsage(

// newPayloadWithValue returns a new payload with the key from the given payload, and
// the value from the argument
func newPayloadWithValue(payload *ledger.Payload, value ledger.Value) (ledger.Payload, error) {
func newPayloadWithValue(payload *ledger.Payload, value ledger.Value) (*ledger.Payload, error) {
key, err := payload.Key()
if err != nil {
return ledger.Payload{}, err
return &ledger.Payload{}, err
}
newPayload := ledger.NewPayload(key, value)
return *newPayload, nil
return newPayload, nil
}

func registerSize(id flow.RegisterID, p *ledger.Payload) int {
Expand Down
2 changes: 1 addition & 1 deletion cmd/util/ledger/util/migration_runtime_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (m MigrationRuntimeInterface) GetBlockAtHeight(_ uint64) (block runtime.Blo
}

func (m MigrationRuntimeInterface) ReadRandom([]byte) error {
panic("unexpected UnsafeRandom call")
panic("unexpected ReadRandom call")
}

func (m MigrationRuntimeInterface) VerifySignature(_ []byte, _ string, _ []byte, _ []byte, _ runtime.SignatureAlgorithm, _ runtime.HashAlgorithm) (bool, error) {
Expand Down
63 changes: 45 additions & 18 deletions cmd/util/ledger/util/payload_grouping.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ import (
// encodedKeyAddressPrefixLength is the length of the address prefix in the encoded key
// 2 for uint16 of number of key parts
// 4 for uint32 of the length of the first key part
// 2 for uint32 of the key part type
// 2 for uint16 of the key part type
// 8 for the address which is the actual length of the first key part
const encodedKeyAddressPrefixLength = 2 + 4 + 2 + flow.AddressLength

// minSizeForSplitSortingIntoGoroutines below this size, no need to split
// the sorting into goroutines
const minSizeForSplitSortingIntoGoroutines = 100_000

const estimatedNumOfAccount = 30_000_000

// PayloadAccountGroup is a grouping of payloads by account
type PayloadAccountGroup struct {
Address common.Address
Expand All @@ -38,6 +40,8 @@ type PayloadAccountGrouping struct {
payloads sortablePayloads
indexes []int

payloadsCount int

current int
}

Expand Down Expand Up @@ -72,6 +76,11 @@ func (g *PayloadAccountGrouping) Len() int {
return len(g.indexes)
}

// AllPayloadsCount the number of accounts
func (g *PayloadAccountGrouping) AllPayloadsCount() int {
return g.payloadsCount
}

// GroupPayloadsByAccount takes a list of payloads and groups them by account.
// it uses nWorkers to sort the payloads by address and find the start and end indexes of
// each account.
Expand Down Expand Up @@ -102,10 +111,10 @@ func GroupPayloadsByAccount(

start = time.Now()
// find the indexes of the payloads that start a new account
indexes := make([]int, 0, len(p))
indexes := make([]int, 0, estimatedNumOfAccount)
for i := 0; i < len(p); {
indexes = append(indexes, i)
i = p.FindLastOfTheSameKey(i) + 1
i = p.FindNextKeyIndex(i)
}
end = time.Now()

Expand All @@ -117,13 +126,15 @@ func GroupPayloadsByAccount(
return &PayloadAccountGrouping{
payloads: p,
indexes: indexes,

payloadsCount: len(payloads),
}
}

// payloadToAddress takes a payload and return:
// - (address, true, nil) if the payload is for an account, the account address is returned
// - ("", false, nil) if the payload is not for an account
// - ("", false, err) if running into any exception
// - (address, nil) if the payload is for an account, the account address is returned
// - (common.ZeroAddress, nil) if the payload is not for an account
// - (common.ZeroAddress, err) if running into any exception
// The zero address is used for global Payloads and is not an actual account
func payloadToAddress(p *ledger.Payload) (common.Address, error) {
k, err := p.Key()
Expand Down Expand Up @@ -170,7 +181,7 @@ func (s sortablePayloads) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (s sortablePayloads) FindLastOfTheSameKey(i int) int {
func (s sortablePayloads) FindNextKeyIndex(i int) int {
low := i
step := 1
for low+step < len(s) && s.Compare(low+step, i) == 0 {
Expand All @@ -192,7 +203,7 @@ func (s sortablePayloads) FindLastOfTheSameKey(i int) int {
}
}

return low - 1
return low
}

// sortPayloads sorts the payloads in the range [i, j) using goroutines and merges
Expand All @@ -206,7 +217,7 @@ func sortPayloads(i, j int, source, buffer sortablePayloads, goroutineAllowance
return
}

// if we are out of goroutine allowance, sort with built-in sortß
// if we are out of goroutine allowance, sort with built-in sort
// if the length is less than minSizeForSplit, sort with built-in sort
if goroutineAllowance < 2 || j-i < minSizeForSplitSortingIntoGoroutines {
sort.Sort(source[i:j])
Expand Down Expand Up @@ -236,17 +247,33 @@ func sortPayloads(i, j int, source, buffer sortablePayloads, goroutineAllowance
func mergeInto(source, buffer sortablePayloads, i int, mid int, j int) {
left := i
right := mid
for k := i; k < j; k++ {
if left < mid && (right >= j || source.Compare(left, right) <= 0) {
buffer[k] = source[left]
left++
k := i
for left < mid && right < j {
// More elements in the both partitions to process.
if source.Compare(left, right) <= 0 {
// Move left partition elements with the same address to buffer.
nextLeft := source.FindNextKeyIndex(left)
n := copy(buffer[k:], source[left:nextLeft])
left = nextLeft
k += n
} else {
buffer[k] = source[right]
right++
// Move right partition elements with the same address to buffer.
nextRight := source.FindNextKeyIndex(right)
n := copy(buffer[k:], source[right:nextRight])
right = nextRight
k += n
}
}

for k := i; k < j; k++ {
source[k] = buffer[k]
// At this point:
// - one partition is exhausted.
// - remaining elements in the other partition (already sorted) can be copied over.
if left < mid {
// Copy remaining elements in the left partition.
copy(buffer[k:], source[left:mid])
} else {
// Copy remaining elements in the right partition.
copy(buffer[k:], source[right:j])
}
// Copy merged buffer back to source.
copy(source[i:j], buffer[i:j])
}
7 changes: 5 additions & 2 deletions cmd/util/ledger/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (a *AccountsAtreeLedger) ValueExists(owner, key []byte) (exists bool, err e
func (a *AccountsAtreeLedger) AllocateStorageIndex(owner []byte) (atree.StorageIndex, error) {
v, err := a.Accounts.AllocateStorageIndex(flow.BytesToAddress(owner))
if err != nil {
return atree.StorageIndex{}, fmt.Errorf("storage address allocation failed: %w", err)
return atree.StorageIndex{}, fmt.Errorf("storage index allocation failed: %w", err)
}
return v, nil
}
Expand Down Expand Up @@ -89,7 +89,10 @@ func NewPayloadSnapshot(payloads []*ledger.Payload) (*PayloadSnapshot, error) {
}

func (p PayloadSnapshot) Get(id flow.RegisterID) (flow.RegisterValue, error) {
value := p.Payloads[id]
value, exists := p.Payloads[id]
if !exists {
return nil, nil
}
return value.Value(), nil
}

Expand Down

0 comments on commit cacba0d

Please sign in to comment.