Skip to content

Commit

Permalink
Merge pull request #5411 from onflow/leo/localnet-test-en
Browse files Browse the repository at this point in the history
[Execution] Shadow Execution node implementation
  • Loading branch information
zhangchiqing authored Mar 1, 2024
2 parents c0d0bd6 + f265dc9 commit 057d89b
Show file tree
Hide file tree
Showing 18 changed files with 783 additions and 109 deletions.
158 changes: 133 additions & 25 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/host"
"github.com/shirou/gopsutil/v3/mem"
"github.com/vmihailenco/msgpack"
"go.uber.org/atomic"

"github.com/onflow/flow-go/admin/commands"
Expand Down Expand Up @@ -70,24 +71,30 @@ import (
modelbootstrap "github.com/onflow/flow-go/model/bootstrap"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/model/messages"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/blobs"
"github.com/onflow/flow-go/module/chainsync"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
execdatacache "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache"
exedataprovider "github.com/onflow/flow-go/module/executiondatasync/provider"
"github.com/onflow/flow-go/module/executiondatasync/pruner"
"github.com/onflow/flow-go/module/executiondatasync/tracker"
"github.com/onflow/flow-go/module/finalizedreader"
finalizer "github.com/onflow/flow-go/module/finalizer/consensus"
"github.com/onflow/flow-go/module/mempool/herocache"
"github.com/onflow/flow-go/module/mempool/queue"
"github.com/onflow/flow-go/module/metrics"
edrequester "github.com/onflow/flow-go/module/state_synchronization/requester"
"github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/channels"
"github.com/onflow/flow-go/network/p2p/blob"
"github.com/onflow/flow-go/network/underlay"
"github.com/onflow/flow-go/state/protocol"
badgerState "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/blocktimer"
storageerr "github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
storage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage/badger/procedure"
storagepebble "github.com/onflow/flow-go/storage/pebble"
Expand Down Expand Up @@ -135,7 +142,7 @@ type ExecutionNode struct {
txResults *storage.TransactionResults
results *storage.ExecutionResults
myReceipts *storage.MyExecutionReceipts
providerEngine *exeprovider.Engine
providerEngine exeprovider.ProviderEngine
checkerEng *checker.Engine
syncCore *chainsync.Core
syncEngine *synchronization.Engine
Expand Down Expand Up @@ -213,6 +220,7 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
// TODO: will re-visit this once storehouse has implemented new WAL for checkpoint file of
// payloadless trie.
// Component("execution data pruner", exeNode.LoadExecutionDataPruner).
Component("observer collection indexer", exeNode.LoadObserverCollectionIndexer).
Component("blob service", exeNode.LoadBlobService).
Component("block data upload manager", exeNode.LoadBlockUploaderManager).
Component("GCP block data uploader", exeNode.LoadGCPBlockDataUploader).
Expand All @@ -227,7 +235,8 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
Component("collection requester engine", exeNode.LoadCollectionRequesterEngine).
Component("receipt provider engine", exeNode.LoadReceiptProviderEngine).
Component("synchronization engine", exeNode.LoadSynchronizationEngine).
Component("grpc server", exeNode.LoadGrpcServer)
Component("grpc server", exeNode.LoadGrpcServer).
Component("observer collection indexer", exeNode.LoadObserverCollectionIndexer)
}

func (exeNode *ExecutionNode) LoadMutableFollowerState(node *NodeConfig) error {
Expand Down Expand Up @@ -353,7 +362,11 @@ func (exeNode *ExecutionNode) LoadBlobService(
opts = append(opts, blob.WithRateLimit(float64(exeNode.exeConf.blobstoreRateLimit), exeNode.exeConf.blobstoreBurstLimit))
}

bs, err := node.EngineRegistry.RegisterBlobService(channels.ExecutionDataService, exeNode.executionDataDatastore, opts...)
edsChannel := channels.ExecutionDataService
if node.ObserverMode {
edsChannel = channels.PublicExecutionDataService
}
bs, err := node.EngineRegistry.RegisterBlobService(edsChannel, exeNode.executionDataDatastore, opts...)
if err != nil {
return nil, fmt.Errorf("failed to register blob service: %w", err)
}
Expand Down Expand Up @@ -524,26 +537,30 @@ func (exeNode *ExecutionNode) LoadProviderEngine(
}
exeNode.computationManager = manager

var chunkDataPackRequestQueueMetrics module.HeroCacheMetrics = metrics.NewNoopCollector()
if node.HeroCacheMetricsEnable {
chunkDataPackRequestQueueMetrics = metrics.ChunkDataPackRequestQueueMetricsFactory(node.MetricsRegisterer)
}
chdpReqQueue := queue.NewHeroStore(exeNode.exeConf.chunkDataPackRequestsCacheSize, node.Logger, chunkDataPackRequestQueueMetrics)
exeNode.providerEngine, err = exeprovider.New(
node.Logger,
node.Tracer,
node.EngineRegistry,
node.State,
exeNode.executionState,
exeNode.collector,
exeNode.checkAuthorizedAtBlock,
chdpReqQueue,
exeNode.exeConf.chunkDataPackRequestWorkers,
exeNode.exeConf.chunkDataPackQueryTimeout,
exeNode.exeConf.chunkDataPackDeliveryTimeout,
)
if err != nil {
return nil, err
if node.ObserverMode {
exeNode.providerEngine = &exeprovider.NoopEngine{}
} else {
var chunkDataPackRequestQueueMetrics module.HeroCacheMetrics = metrics.NewNoopCollector()
if node.HeroCacheMetricsEnable {
chunkDataPackRequestQueueMetrics = metrics.ChunkDataPackRequestQueueMetricsFactory(node.MetricsRegisterer)
}
chdpReqQueue := queue.NewHeroStore(exeNode.exeConf.chunkDataPackRequestsCacheSize, node.Logger, chunkDataPackRequestQueueMetrics)
exeNode.providerEngine, err = exeprovider.New(
node.Logger,
node.Tracer,
node.EngineRegistry,
node.State,
exeNode.executionState,
exeNode.collector,
exeNode.checkAuthorizedAtBlock,
chdpReqQueue,
exeNode.exeConf.chunkDataPackRequestWorkers,
exeNode.exeConf.chunkDataPackQueryTimeout,
exeNode.exeConf.chunkDataPackDeliveryTimeout,
)
if err != nil {
return nil, err
}
}

// Get latest executed block and a view at that block
Expand Down Expand Up @@ -937,6 +954,87 @@ func (exeNode *ExecutionNode) LoadExecutionDataPruner(
return exeNode.executionDataPruner, err
}

func (exeNode *ExecutionNode) LoadObserverCollectionIndexer(
node *NodeConfig,
) (
module.ReadyDoneAware,
error,
) {
if !node.ObserverMode {
node.Logger.Info().Msg("execution data downloader is disabled")
return &module.NoopReadyDoneAware{}, nil
}

node.Logger.Info().Msg("observer-mode is enabled, creating execution data downloader")

execDataDistributor := edrequester.NewExecutionDataDistributor()

executionDataDownloader := execution_data.NewDownloader(exeNode.blobService)

var heroCacheCollector module.HeroCacheMetrics = metrics.NewNoopCollector()
execDataCacheBackend := herocache.NewBlockExecutionData(10, node.Logger, heroCacheCollector)

// Execution Data cache that a downloader as the backend
// If the execution data doesn't exist, it uses the downloader to fetch it
executionDataCache := execdatacache.NewExecutionDataCache(
executionDataDownloader,
node.Storage.Headers,
node.Storage.Seals,
node.Storage.Results,
execDataCacheBackend,
)

processedBlockHeight := bstorage.NewConsumerProgress(node.DB, module.ConsumeProgressExecutionDataRequesterBlockHeight)
processedNotifications := bstorage.NewConsumerProgress(node.DB, module.ConsumeProgressExecutionDataRequesterNotification)

executionDataConfig := edrequester.ExecutionDataConfig{
InitialBlockHeight: node.SealedRootBlock.Header.Height,
MaxSearchAhead: edrequester.DefaultMaxSearchAhead,
FetchTimeout: edrequester.DefaultFetchTimeout,
MaxFetchTimeout: edrequester.DefaultMaxFetchTimeout,
RetryDelay: edrequester.DefaultRetryDelay,
MaxRetryDelay: edrequester.DefaultMaxRetryDelay,
}

r, err := edrequester.New(
node.Logger,
metrics.NewExecutionDataRequesterCollector(),
executionDataDownloader,
executionDataCache,
processedBlockHeight,
processedNotifications,
node.State,
node.Storage.Headers,
executionDataConfig,
execDataDistributor,
)

if err != nil {
return &module.NoopReadyDoneAware{}, err
}

// subscribe the block finalization event, and trigger workers to fetch execution data
exeNode.followerDistributor.AddOnBlockFinalizedConsumer(r.OnBlockFinalized)

execDataDistributor.AddOnExecutionDataReceivedConsumer(func(data *execution_data.BlockExecutionDataEntity) {
res := &messages.EntityResponse{}
for _, chunk := range data.BlockExecutionData.ChunkExecutionDatas {
col := chunk.Collection
blob, _ := msgpack.Marshal(col)
res.EntityIDs = append(res.EntityIDs, col.ID())
res.Blobs = append(res.Blobs, blob)
}

// notify the collection requester that collections have been received
err := exeNode.collectionRequester.ProcessLocal(res)
if err != nil {
node.Logger.Fatal().Err(err).Msgf("failed to process collection from local execution data for block %v", data.BlockExecutionData.BlockID)
}
})

return r, nil
}

func (exeNode *ExecutionNode) LoadCheckerEngine(
node *NodeConfig,
) (
Expand Down Expand Up @@ -965,8 +1063,13 @@ func (exeNode *ExecutionNode) LoadIngestionEngine(
module.ReadyDoneAware,
error,
) {
engineRegister := node.EngineRegistry
if node.ObserverMode {
engineRegister = &underlay.NoopEngineRegister{}
}

var err error
exeNode.collectionRequester, err = requester.New(node.Logger, node.Metrics.Engine, node.EngineRegistry, node.Me, node.State,
exeNode.collectionRequester, err = requester.New(node.Logger, node.Metrics.Engine, engineRegister, node.Me, node.State,
channels.RequestCollections,
filter.Any,
func() flow.Entity { return &flow.Collection{} },
Expand Down Expand Up @@ -1161,10 +1264,14 @@ func (exeNode *ExecutionNode) LoadReceiptProviderEngine(
}
receiptRequestQueue := queue.NewHeroStore(exeNode.exeConf.receiptRequestsCacheSize, node.Logger, receiptRequestQueueMetric)

engineRegister := node.EngineRegistry
if node.ObserverMode {
engineRegister = &underlay.NoopEngineRegister{}
}
eng, err := provider.New(
node.Logger.With().Str("engine", "receipt_provider").Logger(),
node.Metrics.Engine,
node.EngineRegistry,
engineRegister,
node.Me,
node.State,
receiptRequestQueue,
Expand All @@ -1191,6 +1298,7 @@ func (exeNode *ExecutionNode) LoadSynchronizationEngine(
if err != nil {
return nil, fmt.Errorf("could not initialize spam detection config: %w", err)
}

exeNode.syncEngine, err = synchronization.New(
node.Logger,
node.Metrics.Engine,
Expand Down
2 changes: 2 additions & 0 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ type BaseConfig struct {
AdminMaxMsgSize uint
BindAddr string
NodeRole string
ObserverMode bool
DynamicStartupANAddress string
DynamicStartupANPubkey string
DynamicStartupEpochPhase string
Expand Down Expand Up @@ -255,6 +256,7 @@ func DefaultBaseConfig() *BaseConfig {
AdminClientCAs: NotSet,
AdminMaxMsgSize: grpcutils.DefaultMaxMsgSize,
BindAddr: NotSet,
ObserverMode: false,
BootstrapDir: "bootstrap",
datadir: datadir,
secretsdir: NotSet,
Expand Down
33 changes: 1 addition & 32 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (builder *ObserverServiceBuilder) deriveBootstrapPeerIdentities() error {
return nil
}

ids, err := BootstrapIdentities(builder.bootstrapNodeAddresses, builder.bootstrapNodePublicKeys)
ids, err := cmd.BootstrapIdentities(builder.bootstrapNodeAddresses, builder.bootstrapNodePublicKeys)
if err != nil {
return fmt.Errorf("failed to derive bootstrap peer identities: %w", err)
}
Expand Down Expand Up @@ -647,37 +647,6 @@ func publicNetworkMsgValidators(log zerolog.Logger, idProvider module.IdentityPr
}
}

// BootstrapIdentities converts the bootstrap node addresses and keys to a Flow Identity list where
// each Flow Identity is initialized with the passed address, the networking key
// and the Node ID set to ZeroID, role set to Access, 0 stake and no staking key.
func BootstrapIdentities(addresses []string, keys []string) (flow.IdentitySkeletonList, error) {
if len(addresses) != len(keys) {
return nil, fmt.Errorf("number of addresses and keys provided for the boostrap nodes don't match")
}

ids := make(flow.IdentitySkeletonList, len(addresses))
for i, address := range addresses {
bytes, err := hex.DecodeString(keys[i])
if err != nil {
return nil, fmt.Errorf("failed to decode secured GRPC server public key hex %w", err)
}

publicFlowNetworkingKey, err := crypto.DecodePublicKey(crypto.ECDSAP256, bytes)
if err != nil {
return nil, fmt.Errorf("failed to get public flow networking key could not decode public key bytes %w", err)
}

// create the identity of the peer by setting only the relevant fields
ids[i] = &flow.IdentitySkeleton{
NodeID: flow.ZeroID, // the NodeID is the hash of the staking key and for the public network it does not apply
Address: address,
Role: flow.RoleAccess, // the upstream node has to be an access node
NetworkPubKey: publicFlowNetworkingKey,
}
}
return ids, nil
}

func (builder *ObserverServiceBuilder) initNodeInfo() error {
// use the networking key that was loaded from the configured file
networkingKey, err := loadNetworkingKey(builder.observerNetworkingKeyPath)
Expand Down
Loading

0 comments on commit 057d89b

Please sign in to comment.