From c1c41ef5301656ac3426e2b64b2699b04390aaa2 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Thu, 19 Nov 2020 23:47:48 +0800 Subject: [PATCH 01/10] metrics for statediff stats --- statediff/metrics.go | 54 ++++++++++++++++++++++++++++++++++++++++++++ statediff/service.go | 8 +++++++ 2 files changed, 62 insertions(+) create mode 100644 statediff/metrics.go diff --git a/statediff/metrics.go b/statediff/metrics.go new file mode 100644 index 000000000000..d75c583a0438 --- /dev/null +++ b/statediff/metrics.go @@ -0,0 +1,54 @@ +package statediff + +import ( + "strings" + + "github.com/ethereum/go-ethereum/metrics" +) + +const ( + namespace = "statediff" +) + +// Build a fully qualified metric name +func metricName(subsystem, name string) string { + if name == "" { + return "" + } + parts := []string{namespace, name} + if subsystem != "" { + parts = []string{namespace, subsystem, name} + } + // Prometheus uses _ but geth metrics uses / and replaces + return strings.Join(parts, "/") +} + +type statediffMetricsHandles struct { + // Height of latest synced by core.BlockChain + // FIXME + lastSyncHeight metrics.Gauge + // Height of the latest block received from chainEvent channel + lastEventHeight metrics.Gauge + // Height of latest state diff + lastStatediffHeight metrics.Gauge + // Current length of chainEvent channels + serviceLoopChannelLen metrics.Gauge + writeLoopChannelLen metrics.Gauge +} + +func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles { + ctx := statediffMetricsHandles{ + lastSyncHeight: metrics.NewGauge(), + lastEventHeight: metrics.NewGauge(), + lastStatediffHeight: metrics.NewGauge(), + serviceLoopChannelLen: metrics.NewGauge(), + writeLoopChannelLen: metrics.NewGauge(), + } + subsys := "" // todo + reg.Register(metricName(subsys, "last_sync_height"), ctx.lastSyncHeight) + reg.Register(metricName(subsys, "last_event_height"), ctx.lastEventHeight) + reg.Register(metricName(subsys, "last_statediff_height"), ctx.lastStatediffHeight) + reg.Register(metricName(subsys, "service_loop_channel_len"), ctx.serviceLoopChannelLen) + reg.Register(metricName(subsys, "write_loop_channel_len"), ctx.writeLoopChannelLen) + return ctx +} diff --git a/statediff/service.go b/statediff/service.go index cee137bf5070..c3f270bb453c 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" @@ -55,6 +56,8 @@ var writeLoopParams = Params{ IncludeCode: true, } +var statediffMetrics = RegisterStatediffMetrics(metrics.DefaultRegistry) + type blockChain interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription GetBlockByHash(hash common.Hash) *types.Block @@ -193,8 +196,10 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { select { //Notify chain event channel of events case chainEvent := <-chainEventCh: + statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) log.Debug("(WriteLoop) Event received from chainEventCh", "event", chainEvent) currentBlock := chainEvent.Block + statediffMetrics.lastEventHeight.Update(int64(currentBlock.Number().Uint64())) parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain) if parentBlock == nil { log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) @@ -205,6 +210,8 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { log.Error("statediff (DB write) processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error()) continue } + // TODO: how to handle with concurrent workers + statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64())) case err := <-errCh: log.Warn("Error from chain event subscription", "error", err) sds.close() @@ -226,6 +233,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { select { //Notify chain event channel of events case chainEvent := <-chainEventCh: + statediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh))) log.Debug("Event received from chainEventCh", "event", chainEvent) // if we don't have any subscribers, do not process a statediff if atomic.LoadInt32(&sds.subscribers) == 0 { From 72a47729bbd4b73b55a678d52668a7c19a8a0a50 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Tue, 24 Nov 2020 18:19:39 +0800 Subject: [PATCH 02/10] metrics namespace/subsystem = statediff/{indexer,service} --- statediff/indexer/metrics.go | 8 ++++---- statediff/metrics.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/statediff/indexer/metrics.go b/statediff/indexer/metrics.go index 5ee3426a32f2..fc0727edafb6 100644 --- a/statediff/indexer/metrics.go +++ b/statediff/indexer/metrics.go @@ -8,7 +8,7 @@ import ( ) const ( - indexerNamespace = "indexer" + namespace = "statediff" ) // Build a fully qualified metric name @@ -16,9 +16,9 @@ func metricName(subsystem, name string) string { if name == "" { return "" } - parts := []string{indexerNamespace, name} + parts := []string{namespace, name} if subsystem != "" { - parts = []string{indexerNamespace, subsystem, name} + parts = []string{namespace, subsystem, name} } // Prometheus uses _ but geth metrics uses / and replaces return strings.Join(parts, "/") @@ -57,7 +57,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles { tTxAndRecProcessing: metrics.NewTimer(), tStateStoreCodeProcessing: metrics.NewTimer(), } - subsys := "" // todo + subsys := "indexer" reg.Register(metricName(subsys, "blocks"), ctx.blocks) reg.Register(metricName(subsys, "transactions"), ctx.transactions) reg.Register(metricName(subsys, "receipts"), ctx.receipts) diff --git a/statediff/metrics.go b/statediff/metrics.go index d75c583a0438..7e7d6e328c91 100644 --- a/statediff/metrics.go +++ b/statediff/metrics.go @@ -44,7 +44,7 @@ func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles { serviceLoopChannelLen: metrics.NewGauge(), writeLoopChannelLen: metrics.NewGauge(), } - subsys := "" // todo + subsys := "service" reg.Register(metricName(subsys, "last_sync_height"), ctx.lastSyncHeight) reg.Register(metricName(subsys, "last_event_height"), ctx.lastEventHeight) reg.Register(metricName(subsys, "last_statediff_height"), ctx.lastStatediffHeight) From 02c7e785c5e82de7a8a9ec4695a8fca56df8e617 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Thu, 19 Nov 2020 17:37:38 +0800 Subject: [PATCH 03/10] statediff: use a worker pool (for direct writes) --- cmd/geth/config.go | 11 ++++++- cmd/geth/main.go | 1 + cmd/utils/flags.go | 9 ++++-- statediff/service.go | 69 +++++++++++++++++++++++++++++++++----------- 4 files changed, 69 insertions(+), 21 deletions(-) diff --git a/cmd/geth/config.go b/cmd/geth/config.go index eab7f699ce24..bb4d2f2e2f65 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -191,8 +191,17 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { } else { utils.Fatalf("Must specify client name for statediff DB output") } + } else { + if ctx.GlobalBool(utils.StateDiffWritingFlag.Name) { + utils.Fatalf("Must pass DB parameters if enabling statediff write loop") + } + } + params := statediff.ServiceParams{ + DBParams: dbParams, + EnableWriteLoop: ctx.GlobalBool(utils.StateDiffWritingFlag.Name), + NumWorkers: ctx.GlobalUint(utils.StateDiffWorkersFlag.Name), } - utils.RegisterStateDiffService(stack, backend, dbParams, ctx.GlobalBool(utils.StateDiffWritingFlag.Name)) + utils.RegisterStateDiffService(stack, backend, params) } // Configure GraphQL if requested diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 0e657f63630a..a3072089717f 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -162,6 +162,7 @@ var ( utils.StateDiffDBNodeIDFlag, utils.StateDiffDBClientNameFlag, utils.StateDiffWritingFlag, + utils.StateDiffWorkersFlag, configFileFlag, } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 6863364db570..57481200a7a1 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -746,6 +746,10 @@ var ( Name: "statediff.writing", Usage: "Activates progressive writing of state diffs to database as new block are synced", } + StateDiffWorkersFlag = cli.UintFlag{ + Name: "statediff.workers", + Usage: "Number of concurrent workers to use during statediff processing (0 = 1)", + } ) // MakeDataDir retrieves the currently requested data directory, terminating @@ -1744,9 +1748,8 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.C } // RegisterStateDiffService configures and registers a service to stream state diff data over RPC -// dbParams are: Postgres connection URI, Node ID, client name -func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, dbParams *statediff.DBParams, startWriteLoop bool) { - if err := statediff.New(stack, ethServ, dbParams, startWriteLoop); err != nil { +func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, params statediff.ServiceParams) { + if err := statediff.New(stack, ethServ, params); err != nil { Fatalf("Failed to register the Statediff service: %v", err) } } diff --git a/statediff/service.go b/statediff/service.go index c3f270bb453c..121279f2ef4a 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -92,6 +92,15 @@ type IService interface { WriteLoop(chainEventCh chan core.ChainEvent) } +// Wraps consructor parameters +type ServiceParams struct { + DBParams *DBParams + // Whether to enable writing state diffs directly to track blochain head + EnableWriteLoop bool + // Size of the worker pool + NumWorkers uint +} + // Service is the underlying struct for the state diffing service type Service struct { // Used to sync access to the Subscriptions @@ -107,41 +116,56 @@ type Service struct { // A mapping of subscription params rlp hash to the corresponding subscription params SubscriptionTypes map[common.Hash]Params // Cache the last block so that we can avoid having to lookup the next block's parent - lastBlock lastBlockCache + lastBlock blockCache // Whether or not we have any subscribers; only if we do, do we processes state diffs subscribers int32 // Interface for publishing statediffs as PG-IPLD objects indexer ind.Indexer // Whether to enable writing state diffs directly to track blochain head enableWriteLoop bool + // Size of the worker pool + numWorkers uint } // Wrap the cached last block for safe access from different service loops -type lastBlockCache struct { +type blockCache struct { sync.Mutex - block *types.Block + blocks map[common.Hash]*types.Block + maxSize uint +} + +func newBlockCache(max uint) blockCache { + return blockCache{ + blocks: make(map[common.Hash]*types.Block), + maxSize: max, + } } // New creates a new statediff.Service -func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error { +// func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error { +func New(stack *node.Node, ethServ *eth.Ethereum, params ServiceParams) error { blockChain := ethServ.BlockChain() var indexer ind.Indexer - if dbParams != nil { + if params.DBParams != nil { info := nodeinfo.Info{ GenesisBlock: blockChain.Genesis().Hash().Hex(), NetworkID: strconv.FormatUint(ethServ.NetVersion(), 10), ChainID: blockChain.Config().ChainID.Uint64(), - ID: dbParams.ID, - ClientName: dbParams.ClientName, + ID: params.DBParams.ID, + ClientName: params.DBParams.ClientName, } // TODO: pass max idle, open, lifetime? - db, err := postgres.NewDB(dbParams.ConnectionURL, postgres.ConnectionConfig{}, info) + db, err := postgres.NewDB(params.DBParams.ConnectionURL, postgres.ConnectionConfig{}, info) if err != nil { return err } indexer = ind.NewStateDiffIndexer(blockChain.Config(), db) } + workers := params.NumWorkers + if workers == 0 { + workers = 1 + } sds := &Service{ Mutex: sync.Mutex{}, BlockChain: blockChain, @@ -149,8 +173,10 @@ func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWrit QuitChan: make(chan bool), Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), SubscriptionTypes: make(map[common.Hash]Params), + lastBlock: newBlockCache(workers), indexer: indexer, - enableWriteLoop: enableWriteLoop, + enableWriteLoop: params.EnableWriteLoop, + numWorkers: workers, } stack.RegisterLifecycle(sds) stack.RegisterAPIs(sds.APIs()) @@ -174,16 +200,20 @@ func (sds *Service) APIs() []rpc.API { } } -func (lbc *lastBlockCache) replace(currentBlock *types.Block, bc blockChain) *types.Block { +// Return the parent block of currentBlock, using the cached block if available +func (lbc *blockCache) replace(currentBlock *types.Block, bc blockChain) *types.Block { lbc.Lock() parentHash := currentBlock.ParentHash() var parentBlock *types.Block - if lbc.block != nil && bytes.Equal(lbc.block.Hash().Bytes(), parentHash.Bytes()) { - parentBlock = lbc.block + if block, ok := lbc.blocks[parentHash]; ok { + parentBlock = block + if len(lbc.blocks) > int(lbc.maxSize) { + delete(lbc.blocks, parentHash) + } } else { parentBlock = bc.GetBlockByHash(parentHash) } - lbc.block = currentBlock + lbc.blocks[currentBlock.Hash()] = currentBlock lbc.Unlock() return parentBlock } @@ -417,13 +447,18 @@ func (sds *Service) Unsubscribe(id rpc.ID) error { func (sds *Service) Start() error { log.Info("Starting statediff service") - chainEventCh := make(chan core.ChainEvent, chainEventChanSize) - go sds.Loop(chainEventCh) + { + // TODO: also use worker pool here? + chainEventCh := make(chan core.ChainEvent, chainEventChanSize) + go sds.Loop(chainEventCh) + } if sds.enableWriteLoop { log.Info("Starting statediff DB write loop", "params", writeLoopParams) - go sds.WriteLoop(make(chan core.ChainEvent, chainEventChanSize)) - go sds.indexer.ReportDBMetrics(5*time.Second, sds.QuitChan) + chainEventCh := make(chan core.ChainEvent, chainEventChanSize) + for worker := uint(0); worker < sds.numWorkers; worker++ { + go sds.WriteLoop(chainEventCh) + } } return nil From bf02717f6076fd3c8ae07da925525982c0e8ffe3 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Thu, 19 Nov 2020 20:25:45 +0800 Subject: [PATCH 04/10] fix test --- statediff/service.go | 10 +++++----- statediff/service_test.go | 3 +++ 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 121279f2ef4a..7127cf5da4b1 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -116,7 +116,7 @@ type Service struct { // A mapping of subscription params rlp hash to the corresponding subscription params SubscriptionTypes map[common.Hash]Params // Cache the last block so that we can avoid having to lookup the next block's parent - lastBlock blockCache + BlockCache blockCache // Whether or not we have any subscribers; only if we do, do we processes state diffs subscribers int32 // Interface for publishing statediffs as PG-IPLD objects @@ -134,7 +134,7 @@ type blockCache struct { maxSize uint } -func newBlockCache(max uint) blockCache { +func NewBlockCache(max uint) blockCache { return blockCache{ blocks: make(map[common.Hash]*types.Block), maxSize: max, @@ -173,7 +173,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, params ServiceParams) error { QuitChan: make(chan bool), Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), SubscriptionTypes: make(map[common.Hash]Params), - lastBlock: newBlockCache(workers), + BlockCache: NewBlockCache(workers), indexer: indexer, enableWriteLoop: params.EnableWriteLoop, numWorkers: workers, @@ -230,7 +230,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { log.Debug("(WriteLoop) Event received from chainEventCh", "event", chainEvent) currentBlock := chainEvent.Block statediffMetrics.lastEventHeight.Update(int64(currentBlock.Number().Uint64())) - parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain) + parentBlock := sds.BlockCache.replace(currentBlock, sds.BlockChain) if parentBlock == nil { log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) continue @@ -271,7 +271,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { continue } currentBlock := chainEvent.Block - parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain) + parentBlock := sds.BlockCache.replace(currentBlock, sds.BlockChain) if parentBlock == nil { log.Error("Parent block is nil, skipping this block", "number", currentBlock.Number()) continue diff --git a/statediff/service_test.go b/statediff/service_test.go index ef3c1bb2c927..ca9a483a5967 100644 --- a/statediff/service_test.go +++ b/statediff/service_test.go @@ -94,6 +94,7 @@ func testErrorInChainEventLoop(t *testing.T) { QuitChan: serviceQuit, Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription), SubscriptionTypes: make(map[common.Hash]statediff.Params), + BlockCache: statediff.NewBlockCache(1), } payloadChan := make(chan statediff.Payload, 2) quitChan := make(chan bool) @@ -177,6 +178,7 @@ func testErrorInBlockLoop(t *testing.T) { QuitChan: make(chan bool), Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription), SubscriptionTypes: make(map[common.Hash]statediff.Params), + BlockCache: statediff.NewBlockCache(1), } payloadChan := make(chan statediff.Payload) quitChan := make(chan bool) @@ -256,6 +258,7 @@ func testErrorInStateDiffAt(t *testing.T) { QuitChan: make(chan bool), Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription), SubscriptionTypes: make(map[common.Hash]statediff.Params), + BlockCache: statediff.NewBlockCache(1), } stateDiffPayload, err := service.StateDiffAt(testBlock1.NumberU64(), defaultParams) if err != nil { From 5c35c86a8b9509e42ae88f56820a4e4031ca5b21 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Fri, 20 Nov 2020 00:52:49 +0800 Subject: [PATCH 05/10] fix chain event subscription --- statediff/service.go | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 7127cf5da4b1..4b4a476c745d 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -218,16 +218,34 @@ func (lbc *blockCache) replace(currentBlock *types.Block, bc blockChain) *types. return parentBlock } +type workerParams struct { + chainEventCh <-chan core.ChainEvent + errCh <-chan error + wg *sync.WaitGroup + id uint +} + func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) defer chainEventSub.Unsubscribe() errCh := chainEventSub.Err() + var wg sync.WaitGroup + wg.Add(int(sds.numWorkers)) + for worker := uint(0); worker < sds.numWorkers; worker++ { + params := workerParams{chainEventCh: chainEventCh, errCh: errCh, wg: &wg, id: worker} + go sds.writeLoopWorker(params) + } + wg.Wait() +} + +func (sds *Service) writeLoopWorker(params workerParams) { + defer params.wg.Done() for { select { //Notify chain event channel of events - case chainEvent := <-chainEventCh: - statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) - log.Debug("(WriteLoop) Event received from chainEventCh", "event", chainEvent) + case chainEvent := <-params.chainEventCh: + statediffMetrics.writeLoopChannelLen.Update(int64(len(params.chainEventCh))) + log.Debug("WriteLoop(): chain event received", "event", chainEvent) currentBlock := chainEvent.Block statediffMetrics.lastEventHeight.Update(int64(currentBlock.Number().Uint64())) parentBlock := sds.BlockCache.replace(currentBlock, sds.BlockChain) @@ -237,12 +255,12 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { } err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams) if err != nil { - log.Error("statediff (DB write) processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error()) + log.Error("statediff (DB write) processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id) continue } // TODO: how to handle with concurrent workers statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64())) - case err := <-errCh: + case err := <-params.errCh: log.Warn("Error from chain event subscription", "error", err) sds.close() return @@ -456,9 +474,7 @@ func (sds *Service) Start() error { if sds.enableWriteLoop { log.Info("Starting statediff DB write loop", "params", writeLoopParams) chainEventCh := make(chan core.ChainEvent, chainEventChanSize) - for worker := uint(0); worker < sds.numWorkers; worker++ { - go sds.WriteLoop(chainEventCh) - } + go sds.WriteLoop(chainEventCh) } return nil From dd6f9ccabe43301c304098b7e081fbb1496a584d Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Fri, 20 Nov 2020 16:48:41 +0800 Subject: [PATCH 06/10] log tweaks --- statediff/service.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 4b4a476c745d..b29afc993319 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -253,9 +253,10 @@ func (sds *Service) writeLoopWorker(params workerParams) { log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) continue } + log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id) err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams) if err != nil { - log.Error("statediff (DB write) processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id) + log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id) continue } // TODO: how to handle with concurrent workers @@ -282,7 +283,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { //Notify chain event channel of events case chainEvent := <-chainEventCh: statediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh))) - log.Debug("Event received from chainEventCh", "event", chainEvent) + log.Debug("Loop(): chain event received", "event", chainEvent) // if we don't have any subscribers, do not process a statediff if atomic.LoadInt32(&sds.subscribers) == 0 { log.Debug("Currently no subscribers to the statediffing service; processing is halted") @@ -291,7 +292,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { currentBlock := chainEvent.Block parentBlock := sds.BlockCache.replace(currentBlock, sds.BlockChain) if parentBlock == nil { - log.Error("Parent block is nil, skipping this block", "number", currentBlock.Number()) + log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) continue } sds.streamStateDiff(currentBlock, parentBlock.Root()) @@ -532,7 +533,7 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- Cod log.Info("sending code and codehash", "block height", blockNumber) currentTrie, err := sds.BlockChain.StateCache().OpenTrie(current.Root()) if err != nil { - log.Error("error creating trie for block", "number", current.Number(), "err", err) + log.Error("error creating trie for block", "block height", current.Number(), "err", err) close(quitChan) return } @@ -580,7 +581,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { // Writes a state diff from the current block, parent state root, and provided params func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error { - log.Info("Writing state diff", "block height", block.Number().Uint64()) + // log.Info("Writing state diff", "block height", block.Number().Uint64()) var totalDifficulty *big.Int var receipts types.Receipts if params.IncludeTD { From ae2f32f9d8b4b6e740d33e19c104fbdcd00c363a Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Tue, 24 Nov 2020 21:51:30 +0800 Subject: [PATCH 07/10] func name --- statediff/service.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index b29afc993319..0bd31fdde142 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -200,8 +200,9 @@ func (sds *Service) APIs() []rpc.API { } } -// Return the parent block of currentBlock, using the cached block if available -func (lbc *blockCache) replace(currentBlock *types.Block, bc blockChain) *types.Block { +// Return the parent block of currentBlock, using the cached block if available; +// and cache the passed block +func (lbc *blockCache) getParentBlock(currentBlock *types.Block, bc blockChain) *types.Block { lbc.Lock() parentHash := currentBlock.ParentHash() var parentBlock *types.Block @@ -248,7 +249,7 @@ func (sds *Service) writeLoopWorker(params workerParams) { log.Debug("WriteLoop(): chain event received", "event", chainEvent) currentBlock := chainEvent.Block statediffMetrics.lastEventHeight.Update(int64(currentBlock.Number().Uint64())) - parentBlock := sds.BlockCache.replace(currentBlock, sds.BlockChain) + parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain) if parentBlock == nil { log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) continue @@ -290,7 +291,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { continue } currentBlock := chainEvent.Block - parentBlock := sds.BlockCache.replace(currentBlock, sds.BlockChain) + parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain) if parentBlock == nil { log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) continue From 8c9d8cbc3f971de47b019f1516e4d9f4dc790f06 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Tue, 24 Nov 2020 22:09:41 +0800 Subject: [PATCH 08/10] unused import --- statediff/service.go | 1 - 1 file changed, 1 deletion(-) diff --git a/statediff/service.go b/statediff/service.go index 0bd31fdde142..6a6e0bd348c2 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -22,7 +22,6 @@ import ( "strconv" "sync" "sync/atomic" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" From ab841a9abe512f0faf76c54ead3ae74dfcb60693 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Tue, 24 Nov 2020 22:58:27 +0800 Subject: [PATCH 09/10] intermediate chain event channel for metrics --- statediff/service.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 6a6e0bd348c2..e36ef442837a 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -230,9 +230,25 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { defer chainEventSub.Unsubscribe() errCh := chainEventSub.Err() var wg sync.WaitGroup + // Process metrics for chain events, then forward to workers + chainEventFwd := make(chan core.ChainEvent, chainEventChanSize) + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case chainEvent := <-chainEventCh: + statediffMetrics.lastEventHeight.Update(int64(chainEvent.Block.Number().Uint64())) + statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) + chainEventFwd <- chainEvent + case <-sds.QuitChan: + return + } + } + }() wg.Add(int(sds.numWorkers)) for worker := uint(0); worker < sds.numWorkers; worker++ { - params := workerParams{chainEventCh: chainEventCh, errCh: errCh, wg: &wg, id: worker} + params := workerParams{chainEventCh: chainEventFwd, errCh: errCh, wg: &wg, id: worker} go sds.writeLoopWorker(params) } wg.Wait() @@ -244,10 +260,8 @@ func (sds *Service) writeLoopWorker(params workerParams) { select { //Notify chain event channel of events case chainEvent := <-params.chainEventCh: - statediffMetrics.writeLoopChannelLen.Update(int64(len(params.chainEventCh))) log.Debug("WriteLoop(): chain event received", "event", chainEvent) currentBlock := chainEvent.Block - statediffMetrics.lastEventHeight.Update(int64(currentBlock.Number().Uint64())) parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain) if parentBlock == nil { log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) From 83c35833553e8d807b54b06c10321c69427d2aa0 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Wed, 25 Nov 2020 18:25:25 +0800 Subject: [PATCH 10/10] cleanup --- cmd/geth/usage.go | 1 + statediff/service.go | 11 ++++------- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index e3b52daa1a34..3acd401458f0 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -242,6 +242,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.StateDiffDBNodeIDFlag, utils.StateDiffDBClientNameFlag, utils.StateDiffWritingFlag, + utils.StateDiffWorkersFlag, }, }, { diff --git a/statediff/service.go b/statediff/service.go index e36ef442837a..2f16fc4abda5 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -276,11 +276,11 @@ func (sds *Service) writeLoopWorker(params workerParams) { // TODO: how to handle with concurrent workers statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64())) case err := <-params.errCh: - log.Warn("Error from chain event subscription", "error", err) + log.Warn("Error from chain event subscription", "error", err, "worker", params.id) sds.close() return case <-sds.QuitChan: - log.Info("Quitting the statediff writing process") + log.Info("Quitting the statediff writing process", "worker", params.id) sds.close() return } @@ -480,11 +480,8 @@ func (sds *Service) Unsubscribe(id rpc.ID) error { func (sds *Service) Start() error { log.Info("Starting statediff service") - { - // TODO: also use worker pool here? - chainEventCh := make(chan core.ChainEvent, chainEventChanSize) - go sds.Loop(chainEventCh) - } + chainEventCh := make(chan core.ChainEvent, chainEventChanSize) + go sds.Loop(chainEventCh) if sds.enableWriteLoop { log.Info("Starting statediff DB write loop", "params", writeLoopParams)