Skip to content

Commit

Permalink
Merge pull request #5488 from onflow/fxamacker/fix-payload-grouping-d…
Browse files Browse the repository at this point in the history
…ata-race

Fix data race in account grouping for account based migration
  • Loading branch information
fxamacker authored Mar 1, 2024
2 parents 057d89b + 4621fd8 commit 5dcab61
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 7 deletions.
14 changes: 7 additions & 7 deletions cmd/util/ledger/util/payload_grouping.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func GroupPayloadsByAccount(
indexes := make([]int, 0, estimatedNumOfAccount)
for i := 0; i < len(p); {
indexes = append(indexes, i)
i = p.FindNextKeyIndex(i)
i = p.FindNextKeyIndexUntil(i, len(p))
}
end = time.Now()

Expand Down Expand Up @@ -177,17 +177,17 @@ func (s sortablePayloads) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (s sortablePayloads) FindNextKeyIndex(i int) int {
func (s sortablePayloads) FindNextKeyIndexUntil(i int, upperBound int) int {
low := i
step := 1
for low+step < len(s) && s.Compare(low+step, i) == 0 {
for low+step < upperBound && s.Compare(low+step, i) == 0 {
low += step
step *= 2
}

high := low + step
if high > len(s) {
high = len(s)
if high > upperBound {
high = upperBound
}

for low < high {
Expand Down Expand Up @@ -248,13 +248,13 @@ func mergeInto(source, buffer sortablePayloads, i int, mid int, j int) {
// More elements in the both partitions to process.
if source.Compare(left, right) <= 0 {
// Move left partition elements with the same address to buffer.
nextLeft := source.FindNextKeyIndex(left)
nextLeft := source.FindNextKeyIndexUntil(left, mid)
n := copy(buffer[k:], source[left:nextLeft])
left = nextLeft
k += n
} else {
// Move right partition elements with the same address to buffer.
nextRight := source.FindNextKeyIndex(right)
nextRight := source.FindNextKeyIndexUntil(right, j)
n := copy(buffer[k:], source[right:nextRight])
right = nextRight
k += n
Expand Down
44 changes: 44 additions & 0 deletions cmd/util/ledger/util/payload_grouping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,20 @@ func TestGroupPayloadsByAccount(t *testing.T) {
require.Greater(t, groups.Len(), 1)
}

func TestGroupPayloadsByAccountForDataRace(t *testing.T) {
log := zerolog.New(zerolog.NewTestWriter(t))

const accountSize = 4
var payloads []*ledger.Payload
for i := 0; i < accountSize; i++ {
payloads = append(payloads, generateRandomPayloadsWithAddress(generateRandomAddress(), 100_000)...)
}

const nWorkers = 8
groups := util.GroupPayloadsByAccount(log, payloads, nWorkers)
require.Equal(t, accountSize, groups.Len())
}

func TestGroupPayloadsByAccountCompareResults(t *testing.T) {
log := zerolog.Nop()
payloads := generateRandomPayloads(1000000)
Expand Down Expand Up @@ -129,6 +143,36 @@ func generateRandomPayloads(n int) []*ledger.Payload {
return payloads
}

func generateRandomPayloadsWithAddress(address string, n int) []*ledger.Payload {
const meanPayloadsPerAccount = 100
const minPayloadsPerAccount = 1

payloads := make([]*ledger.Payload, 0, n)

for i := 0; i < n; {

registersForAccount := minPayloadsPerAccount + int(rand2.ExpFloat64()*(meanPayloadsPerAccount-minPayloadsPerAccount))
if registersForAccount > n-i {
registersForAccount = n - i
}
i += registersForAccount

accountKey := convert.RegisterIDToLedgerKey(flow.RegisterID{
Owner: address,
Key: generateRandomString(10),
})
for j := 0; j < registersForAccount; j++ {
payloads = append(payloads,
ledger.NewPayload(
accountKey,
[]byte(generateRandomString(10)),
))
}
}

return payloads
}

func generateRandomAccountKey() ledger.Key {
return convert.RegisterIDToLedgerKey(flow.RegisterID{
Owner: generateRandomAddress(),
Expand Down

0 comments on commit 5dcab61

Please sign in to comment.