Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NonderterministicFastCommit to speed up migrations when ordering isn't required #403

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 4 additions & 36 deletions .github/workflows/safer-golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,6 @@
# Safer GitHub Actions Workflow for golangci-lint.
# https://github.com/x448/safer-golangci-lint
#
# safer-golangci-lint.yml
#
# This workflow downloads, verifies, and runs golangci-lint in a
# deterministic, reviewable, and safe manner.
#
# To use:
# Step 1. Copy this file into [your_github_repo]/.github/workflows/
# Step 2. There's no step 2 if you like the default settings.
#
# See golangci-lint docs for more info at
# https://github.com/golangci/golangci-lint
#
# 100% of the script for downloading, installing, and running golangci-lint
# is embedded in this file. The embedded SHA-256 digest is used to verify the
# downloaded golangci-lint tarball (golangci-lint-1.xx.x-linux-amd64.tar.gz).
#
# The embedded SHA-256 digest matches golangci-lint-1.xx.x-checksums.txt at
# https://github.com/golangci/golangci-lint/releases
#
# To use a newer version of golangci-lint, change these values:
# 1. GOLINTERS_VERSION
# 2. GOLINTERS_TGZ_DGST
#
# Release v1.52.2 (May 14, 2023)
# - Bump Go to 1.20
# - Bump actions/setup-go to v4
# - Bump golangci-lint to 1.52.2
# - Hash of golangci-lint-1.52.2-linux-amd64.tar.gz
# - SHA-256: c9cf72d12058a131746edd409ed94ccd578fbd178899d1ed41ceae3ce5f54501
# This SHA-256 digest matches golangci-lint-1.52.2-checksums.txt at
# https://github.com/golangci/golangci-lint/releases
#
name: linters

# Remove default permissions and grant only what is required in each job.
Expand All @@ -49,9 +17,9 @@ on:

env:
GO_VERSION: '1.20'
GOLINTERS_VERSION: 1.52.2
GOLINTERS_VERSION: 1.53.3
GOLINTERS_ARCH: linux-amd64
GOLINTERS_TGZ_DGST: c9cf72d12058a131746edd409ed94ccd578fbd178899d1ed41ceae3ce5f54501
GOLINTERS_TGZ_DGST: 4f62007ca96372ccba54760e2ed39c2446b40ec24d9a90c21aad9f2fdf6cf0da
GOLINTERS_TIMEOUT: 15m
OPENSSL_DGST_CMD: openssl dgst -sha256 -r
CURL_CMD: curl --proto =https --tlsv1.2 --location --silent --show-error --fail
Expand All @@ -64,12 +32,12 @@ jobs:
contents: read
steps:
- name: Checkout source
uses: actions/checkout@v3
uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these dependency updates necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These updates to pin dependencies improves OpenSSF score. However, I reverted the commit that updated golangci-lint to 1.53.3 (out of scope of this PR).

Example when all dependencies are pinned (score 0-10 with 10 being perfect):

image

For more, replace "example/repo" in this URL to see complete report (not all repos are scored yet).

with:
fetch-depth: 1

- name: Setup Go
uses: actions/setup-go@v4
uses: actions/setup-go@cdcb36043654635271a94b9a6d1392de5bb323a7 # v5.0.1
with:
go-version: ${{ env.GO_VERSION }}
check-latest: true
Expand Down
205 changes: 203 additions & 2 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,12 +777,17 @@ func (s *PersistentSlabStorage) sortedOwnedDeltaKeys() []SlabID {
}

func (s *PersistentSlabStorage) Commit() error {
var err error

// this part ensures the keys are sorted so commit operation is deterministic
keysWithOwners := s.sortedOwnedDeltaKeys()

for _, id := range keysWithOwners {
return s.commit(keysWithOwners)
}

func (s *PersistentSlabStorage) commit(keys []SlabID) error {
var err error

for _, id := range keys {
slab := s.deltas[id]

// deleted slabs
Expand Down Expand Up @@ -964,6 +969,202 @@ func (s *PersistentSlabStorage) FastCommit(numWorkers int) error {
return nil
}

// NondeterministicFastCommit commits changes in nondeterministic order.
// This is used by migration program when ordering isn't required.
func (s *PersistentSlabStorage) NondeterministicFastCommit(numWorkers int) error {
// No changes
if len(s.deltas) == 0 {
return nil
}

type slabToBeEncoded struct {
slabID SlabID
slab Slab
}

type encodedSlab struct {
slabID SlabID
data []byte
err error
}

// Define encoder (worker) to encode slabs in parallel
encoder := func(
wg *sync.WaitGroup,
done <-chan struct{},
jobs <-chan slabToBeEncoded,
results chan<- encodedSlab,
) {
defer wg.Done()

for job := range jobs {
// Check if goroutine is signaled to stop before proceeding.
select {
case <-done:
return
default:
}

id := job.slabID
slab := job.slab

if slab == nil {
results <- encodedSlab{
slabID: id,
data: nil,
err: nil,
}
continue
}

// Serialize
data, err := EncodeSlab(slab, s.cborEncMode)
results <- encodedSlab{
slabID: id,
data: data,
err: err,
}
}
}

// slabIDsWithOwner contains slab IDs with owner:
// - modified slab IDs are stored from front to back
// - deleted slab IDs are stored from back to front
// This is to avoid extra allocations.
slabIDsWithOwner := make([]SlabID, len(s.deltas))

// Modified slabs need to be encoded (in parallel) and stored in underlying storage.
modifiedSlabCount := 0
// Deleted slabs need to be removed from underlying storage.
deletedSlabCount := 0
for k, v := range s.deltas {
// Ignore slabs not owned by accounts
if k.address == AddressUndefined {
continue
}
if v == nil {
turbolent marked this conversation as resolved.
Show resolved Hide resolved
index := len(slabIDsWithOwner) - 1 - deletedSlabCount
fxamacker marked this conversation as resolved.
Show resolved Hide resolved
slabIDsWithOwner[index] = k
deletedSlabCount++
} else {
slabIDsWithOwner[modifiedSlabCount] = k
modifiedSlabCount++
}
}

modifiedSlabIDs := slabIDsWithOwner[:modifiedSlabCount]

deletedSlabIDs := slabIDsWithOwner[len(slabIDsWithOwner)-deletedSlabCount:]

if modifiedSlabCount == 0 && deletedSlabCount == 0 {
return nil
}

if modifiedSlabCount < 2 {
// Avoid goroutine overhead.
// Return after committing modified and deleted slabs.
ids := modifiedSlabIDs
ids = append(ids, deletedSlabIDs...)
return s.commit(ids)
}

if numWorkers > modifiedSlabCount {
numWorkers = modifiedSlabCount
}

var wg sync.WaitGroup

// Create done signal channel
done := make(chan struct{})

// Create job queue
jobs := make(chan slabToBeEncoded, modifiedSlabCount)

// Create result queue
results := make(chan encodedSlab, modifiedSlabCount)

defer func() {
// This ensures that all goroutines are stopped before output channel is closed.

// Wait for all goroutines to finish
wg.Wait()

// Close output channel
close(results)
}()

// Launch workers to encode slabs
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go encoder(&wg, done, jobs, results)
}

// Send jobs
for _, id := range modifiedSlabIDs {
jobs <- slabToBeEncoded{id, s.deltas[id]}
}
close(jobs)

// Remove deleted slabs from underlying storage.
for _, id := range deletedSlabIDs {

err := s.baseStorage.Remove(id)
if err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// Wrap err as external error (if needed) because err is returned by BaseStorage interface.
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to remove slab %s", id))
}

// Deleted slabs are removed from deltas and added to read cache so that:
// 1. next read is from in-memory read cache
// 2. deleted slabs are not re-committed in next commit
s.cache[id] = nil
delete(s.deltas, id)
}

// Process encoded slabs
for i := 0; i < modifiedSlabCount; i++ {
result := <-results

if result.err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// result.err is already categorized by Encode().
return result.err
}

id := result.slabID
data := result.data

if data == nil {
// Closing done channel signals goroutines to stop.
close(done)
// This is unexpected because deleted slabs are processed separately.
return NewEncodingErrorf("unexpectd encoded empty data")
}

// Store
err := s.baseStorage.Store(id, data)
if err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// Wrap err as external error (if needed) because err is returned by BaseStorage interface.
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to store slab %s", id))
}

s.cache[id] = s.deltas[id]
// It's safe to remove slab from deltas because
// iteration is on non-temp slabs and temp slabs
// are still in deltas.
delete(s.deltas, id)
}

// Do NOT reset deltas because slabs with empty address are not saved.

return nil
}

func (s *PersistentSlabStorage) DropDeltas() {
s.deltas = make(map[SlabID]Slab)
}
Expand Down
Loading
Loading