Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract uncurried functions from storage writes #6803

Merged
merged 7 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 2 additions & 33 deletions ledger/complete/wal/checkpoint_v6_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/onflow/flow-go/ledger/complete/mtrie/node"
"github.com/onflow/flow-go/ledger/complete/mtrie/trie"
utilsio "github.com/onflow/flow-go/utils/io"
"github.com/onflow/flow-go/utils/merr"
)

const subtrieLevel = 4
Expand Down Expand Up @@ -708,39 +709,7 @@ func decodeSubtrieCount(encoded []byte) (uint16, error) {
return binary.BigEndian.Uint16(encoded), nil
}

// closeAndMergeError close the closable and merge the closeErr with the given err into a multierror
// Note: when using this function in a defer function, don't use as below:
// func XXX() (
//
// err error,
// ) {
// def func() {
// // bad, because the definition of err might get overwritten
// err = closeAndMergeError(closable, err)
// }()
//
// Better to use as below:
// func XXX() (
//
// errToReturn error,
// ) {
// def func() {
// // good, because the error to returned is only updated here, and guaranteed to be returned
// errToReturn = closeAndMergeError(closable, errToReturn)
// }()
func closeAndMergeError(closable io.Closer, err error) error {
var merr *multierror.Error
if err != nil {
merr = multierror.Append(merr, err)
}

closeError := closable.Close()
if closeError != nil {
merr = multierror.Append(merr, closeError)
}

return merr.ErrorOrNil()
}
var closeAndMergeError = merr.CloseAndMergeError

// withFile opens the file at the given path, and calls the given function with the opened file.
// it handles closing the file and evicting the file from Linux page cache.
Expand Down
150 changes: 150 additions & 0 deletions storage/operation/prefix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
//nolint:golint,unused
package operation

import (
"encoding/binary"
"fmt"

"github.com/onflow/flow-go/model/flow"
)

const (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where did these constants come from? Is this a unrelated thing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// codes for special database markers
// codeMax = 1 // deprecated
codeDBType = 2 // specifies a database type

// codes for views with special meaning
codeSafetyData = 10 // safety data for hotstuff state
codeLivenessData = 11 // liveness data for hotstuff state

// codes for fields associated with the root state
codeSporkID = 13
codeProtocolVersion = 14
codeEpochCommitSafetyThreshold = 15
codeSporkRootBlockHeight = 16

// code for heights with special meaning
codeFinalizedHeight = 20 // latest finalized block height
codeSealedHeight = 21 // latest sealed block height
codeClusterHeight = 22 // latest finalized height on cluster
codeExecutedBlock = 23 // latest executed block with max height
codeFinalizedRootHeight = 24 // the height of the highest finalized block contained in the root snapshot
codeLastCompleteBlockHeight = 25 // the height of the last block for which all collections were received
codeEpochFirstHeight = 26 // the height of the first block in a given epoch
codeSealedRootHeight = 27 // the height of the highest sealed block contained in the root snapshot

// codes for single entity storage
codeHeader = 30
_ = 31 // DEPRECATED: 31 was used for identities before epochs
codeGuarantee = 32
codeSeal = 33
codeTransaction = 34
codeCollection = 35
codeExecutionResult = 36
codeResultApproval = 37
codeChunk = 38
codeExecutionReceiptMeta = 39 // NOTE: prior to Mainnet25, this erroneously had the same value as codeExecutionResult (36)

// codes for indexing single identifier by identifier/integer
codeHeightToBlock = 40 // index mapping height to block ID
codeBlockIDToLatestSealID = 41 // index mapping a block its last payload seal
codeClusterBlockToRefBlock = 42 // index cluster block ID to reference block ID
codeRefHeightToClusterBlock = 43 // index reference block height to cluster block IDs
codeBlockIDToFinalizedSeal = 44 // index _finalized_ seal by sealed block ID
codeBlockIDToQuorumCertificate = 45 // index of quorum certificates by block ID
codeEpochProtocolStateByBlockID = 46 // index of epoch protocol state entry by block ID
codeProtocolKVStoreByBlockID = 47 // index of protocol KV store entry by block ID

// codes for indexing multiple identifiers by identifier
codeBlockChildren = 50 // index mapping block ID to children blocks
_ = 51 // DEPRECATED: 51 was used for identity indexes before epochs
codePayloadGuarantees = 52 // index mapping block ID to payload guarantees
codePayloadSeals = 53 // index mapping block ID to payload seals
codeCollectionBlock = 54 // index mapping collection ID to block ID
codeOwnBlockReceipt = 55 // index mapping block ID to execution receipt ID for execution nodes
_ = 56 // DEPRECATED: 56 was used for block->epoch status prior to Dynamic Protocol State in Mainnet25
codePayloadReceipts = 57 // index mapping block ID to payload receipts
codePayloadResults = 58 // index mapping block ID to payload results
codeAllBlockReceipts = 59 // index mapping of blockID to multiple receipts
codePayloadProtocolStateID = 60 // index mapping block ID to payload protocol state ID

// codes related to protocol level information
codeEpochSetup = 61 // EpochSetup service event, keyed by ID
codeEpochCommit = 62 // EpochCommit service event, keyed by ID
codeBeaconPrivateKey = 63 // BeaconPrivateKey, keyed by epoch counter
codeDKGStarted = 64 // flag that the DKG for an epoch has been started
codeDKGEnded = 65 // flag that the DKG for an epoch has ended (stores end state)
codeVersionBeacon = 67 // flag for storing version beacons
codeEpochProtocolState = 68
codeProtocolKVStore = 69

// code for ComputationResult upload status storage
// NOTE: for now only GCP uploader is supported. When other uploader (AWS e.g.) needs to
// be supported, we will need to define new code.
codeComputationResults = 66

// job queue consumers and producers
codeJobConsumerProcessed = 70
codeJobQueue = 71
codeJobQueuePointer = 72

// legacy codes (should be cleaned up)
codeChunkDataPack = 100
codeCommit = 101
codeEvent = 102
codeExecutionStateInteractions = 103
codeTransactionResult = 104
codeFinalizedCluster = 105
codeServiceEvent = 106
codeTransactionResultIndex = 107
codeLightTransactionResult = 108
codeLightTransactionResultIndex = 109
codeTransactionResultErrorMessage = 110
codeTransactionResultErrorMessageIndex = 111
codeIndexCollection = 200
codeIndexExecutionResultByBlock = 202
codeIndexCollectionByTransaction = 203
codeIndexResultApprovalByChunk = 204

// TEMPORARY codes
blockedNodeIDs = 205 // manual override for adding node IDs to list of ejected nodes, applies to networking layer only

// internal failure information that should be preserved across restarts
codeExecutionFork = 254
codeEpochEmergencyFallbackTriggered = 255
)

func MakePrefix(code byte, keys ...interface{}) []byte {
prefix := make([]byte, 1)
prefix[0] = code
for _, key := range keys {
prefix = append(prefix, KeyPartToBytes(key)...)
}
return prefix
}

func KeyPartToBytes(v interface{}) []byte {
switch i := v.(type) {
case uint8:
return []byte{i}
case uint32:
b := make([]byte, 4)
binary.BigEndian.PutUint32(b, i)
return b
case uint64:
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, i)
return b
case string:
return []byte(i)
case flow.Role:
return []byte{byte(i)}
case flow.Identifier:
return i[:]
case flow.ChainID:
return []byte(i)
default:
panic(fmt.Sprintf("unsupported type to convert (%T)", v))
}
}
29 changes: 20 additions & 9 deletions storage/operation/reads.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/merr"
)

// CheckFunc is a function that checks if the value should be read and decoded.
Expand Down Expand Up @@ -51,7 +52,7 @@ func IterateKeysByPrefixRange(r storage.Reader, startPrefix []byte, endPrefix []
// IterateKeys will iterate over all entries in the database, where the key starts with a prefixes in
// the range [startPrefix, endPrefix] (both inclusive).
// No errors expected during normal operations.
func IterateKeys(r storage.Reader, startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) error {
func IterateKeys(r storage.Reader, startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) (errToReturn error) {
if len(startPrefix) == 0 {
return fmt.Errorf("startPrefix prefix is empty")
}
Expand All @@ -69,7 +70,9 @@ func IterateKeys(r storage.Reader, startPrefix []byte, endPrefix []byte, iterFun
if err != nil {
return fmt.Errorf("can not create iterator: %w", err)
}
defer it.Close()
defer func() {
errToReturn = merr.CloseAndMergeError(it, errToReturn)
}()

for it.First(); it.Valid(); it.Next() {
item := it.IterItem()
Expand Down Expand Up @@ -130,7 +133,7 @@ func TraverseByPrefix(r storage.Reader, prefix []byte, iterFunc IterationFunc, o
// When this returned function is executed (and only then), it will write into the `keyExists` whether
// the key exists.
// No errors are expected during normal operation.
func KeyExists(r storage.Reader, key []byte) (bool, error) {
func KeyExists(r storage.Reader, key []byte) (exist bool, errToReturn error) {
_, closer, err := r.Get(key)
if err != nil {
// the key does not exist in the database
Expand All @@ -140,7 +143,9 @@ func KeyExists(r storage.Reader, key []byte) (bool, error) {
// exception while checking for the key
return false, irrecoverable.NewExceptionf("could not load data: %w", err)
}
defer closer.Close()
defer func() {
errToReturn = merr.CloseAndMergeError(closer, errToReturn)
}()

// the key does exist in the database
return true, nil
Expand All @@ -153,13 +158,15 @@ func KeyExists(r storage.Reader, key []byte) (bool, error) {
// - storage.ErrNotFound if the key does not exist in the database
// - generic error in case of unexpected failure from the database layer, or failure
// to decode an existing database value
func RetrieveByKey(r storage.Reader, key []byte, entity interface{}) error {
func RetrieveByKey(r storage.Reader, key []byte, entity interface{}) (errToReturn error) {
val, closer, err := r.Get(key)
if err != nil {
return err
}

defer closer.Close()
defer func() {
errToReturn = merr.CloseAndMergeError(closer, errToReturn)
}()

err = msgpack.Unmarshal(val, entity)
if err != nil {
Expand All @@ -172,7 +179,7 @@ func RetrieveByKey(r storage.Reader, key []byte, entity interface{}) error {
// keys with the format prefix` + `height` (where "+" denotes concatenation of binary strings). The height
// is encoded as Big-Endian (entries with numerically smaller height have lexicographically smaller key).
// The function finds the *highest* key with the given prefix and height equal to or below the given height.
func FindHighestAtOrBelowByPrefix(r storage.Reader, prefix []byte, height uint64, entity interface{}) error {
func FindHighestAtOrBelowByPrefix(r storage.Reader, prefix []byte, height uint64, entity interface{}) (errToReturn error) {
if len(prefix) == 0 {
return fmt.Errorf("prefix must not be empty")
}
Expand All @@ -182,7 +189,9 @@ func FindHighestAtOrBelowByPrefix(r storage.Reader, prefix []byte, height uint64
if err != nil {
return fmt.Errorf("can not create iterator: %w", err)
}
defer it.Close()
defer func() {
errToReturn = merr.CloseAndMergeError(it, errToReturn)
}()

var highestKey []byte

Expand All @@ -203,7 +212,9 @@ func FindHighestAtOrBelowByPrefix(r storage.Reader, prefix []byte, height uint64
return err
}

defer closer.Close()
defer func() {
errToReturn = merr.CloseAndMergeError(closer, errToReturn)
}()

err = msgpack.Unmarshal(val, entity)
if err != nil {
Expand Down
64 changes: 29 additions & 35 deletions storage/operation/writes.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,61 +10,55 @@ import (
"github.com/onflow/flow-go/storage"
)

// Upsert will encode the given entity using msgpack and will insert the resulting
// UpsertByKey will encode the given entity using msgpack and will insert the resulting
// binary data under the provided key.
// If the key already exists, the value will be overwritten.
// Error returns:
// - generic error in case of unexpected failure from the database layer or
// encoding failure.
func Upsert(key []byte, val interface{}) func(storage.Writer) error {
return func(w storage.Writer) error {
value, err := msgpack.Marshal(val)
if err != nil {
return irrecoverable.NewExceptionf("failed to encode value: %w", err)
}

err = w.Set(key, value)
if err != nil {
return irrecoverable.NewExceptionf("failed to store data: %w", err)
}
func UpsertByKey(w storage.Writer, key []byte, val interface{}) error {
value, err := msgpack.Marshal(val)
if err != nil {
return irrecoverable.NewExceptionf("failed to encode value: %w", err)
}

return nil
err = w.Set(key, value)
if err != nil {
return irrecoverable.NewExceptionf("failed to store data: %w", err)
}

return nil
}

// Remove removes the entity with the given key, if it exists. If it doesn't
// RemoveByKey removes the entity with the given key, if it exists. If it doesn't
// exist, this is a no-op.
// Error returns:
// * generic error in case of unexpected database error
func Remove(key []byte) func(storage.Writer) error {
return func(w storage.Writer) error {
err := w.Delete(key)
if err != nil {
return irrecoverable.NewExceptionf("could not delete item: %w", err)
}
return nil
func RemoveByKey(w storage.Writer, key []byte) error {
err := w.Delete(key)
if err != nil {
return irrecoverable.NewExceptionf("could not delete item: %w", err)
}
return nil
}

// RemoveByPrefix removes all keys with the given prefix
// RemoveByKeyPrefix removes all keys with the given prefix
// Error returns:
// * generic error in case of unexpected database error
func RemoveByPrefix(reader storage.Reader, key []byte) func(storage.Writer) error {
return RemoveByRange(reader, key, key)
func RemoveByKeyPrefix(reader storage.Reader, w storage.Writer, key []byte) error {
return RemoveByKeyRange(reader, w, key, key)
}

// RemoveByRange removes all keys with a prefix that falls within the range [start, end], both inclusive.
// RemoveByKeyRange removes all keys with a prefix that falls within the range [start, end], both inclusive.
// It returns error if endPrefix < startPrefix
// no other errors are expected during normal operation
func RemoveByRange(reader storage.Reader, startPrefix []byte, endPrefix []byte) func(storage.Writer) error {
return func(w storage.Writer) error {
if bytes.Compare(startPrefix, endPrefix) > 0 {
return fmt.Errorf("startPrefix key must be less than or equal to endPrefix key")
}
err := w.DeleteByRange(reader, startPrefix, endPrefix)
if err != nil {
return irrecoverable.NewExceptionf("could not delete item: %w", err)
}
return nil
func RemoveByKeyRange(reader storage.Reader, w storage.Writer, startPrefix []byte, endPrefix []byte) error {
if bytes.Compare(startPrefix, endPrefix) > 0 {
return fmt.Errorf("startPrefix key must be less than or equal to endPrefix key")
}
err := w.DeleteByRange(reader, startPrefix, endPrefix)
if err != nil {
return irrecoverable.NewExceptionf("could not delete item: %w", err)
}
return nil
}
Loading
Loading