-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
statediff: Use a worker pool #43
Conversation
Running this on a fresh full chain, I'm not able to reproduce the trie node errors I was seeing before, so they may not be directly related - removing the WIP tag. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! A few comments but only one or two changes. Running locally everything seem to be working (ran with 4 and 8 workers, around block 80,000). I do see some errors on shutdown that I think are new but also probably inconsequential:
WARN [11-24|02:40:55.972] Error from chain event subscription error=nil WARN [11-24|02:40:56.031] Error from chain event subscription error=nil WARN [11-24|02:40:56.032] Error from chain event subscription error=nil
Are you still seeing a Trie data race issue on your end?
I haven't benchmarked, it would be good to get a rough estimate on performance impact.
statediff/helpers.go
Outdated
@@ -96,3 +99,39 @@ func CheckKeyType(elements []interface{}) (sdtypes.NodeType, error) { | |||
return sdtypes.Unknown, fmt.Errorf("unknown hex prefix") | |||
} | |||
} | |||
|
|||
// Deep-copy a receipt | |||
func CopyReceipt(dst, src *types.Receipt) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know you can't do it with rlp encoding/decoding since it only encodes the consensus fields but I wonder if you could copy by marshaling the receipt to JSON bytes and then unmarshalling into a new types.Receipt
.
@@ -89,6 +89,15 @@ type IService interface { | |||
WriteLoop(chainEventCh chan core.ChainEvent) | |||
} | |||
|
|||
// Wraps consructor parameters | |||
type ServiceParams struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the params getting passed together like this!
@@ -172,41 +198,63 @@ func (sds *Service) APIs() []rpc.API { | |||
} | |||
} | |||
|
|||
func (lbc *lastBlockCache) replace(currentBlock *types.Block, bc blockChain) *types.Block { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if the cache is still beneficial with the concurrent approach given the additional complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked and the hit/access ratio is very close to 1, so it does still seem to be helping, I can verify further with benchmarks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh that's awesome, thanks for checking that!
parentBlock = block | ||
if len(lbc.blocks) > int(lbc.maxSize) { | ||
delete(lbc.blocks, parentHash) | ||
} | ||
} else { | ||
parentBlock = bc.GetBlockByHash(parentHash) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be missing it but I think we still need to cache the parentBlock returned by bc.GetBlockByHash
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My reasoning was that two blocks shouldn't be processed with the same parent block, so we just cache the current one so it's available when its child block gets processed. I did some logging to verify this - blocks are accessed no more than once. Also, the number of misses is so low that hardly any parents would get added to the cache (since we would only add them on a miss).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah that makes sense, and is generally true. The only time this wouldn't be true is when there are reorgs while syncing at the head of chain (will never see it happen during chain import e.g. during any of our local tests). I think in that case it is likely still better to need to reach past the cache to retrieve the block rather than to waste cache space on the off chance.
lbc.Unlock() | ||
return parentBlock | ||
} | ||
|
||
type workerParams struct { | ||
chainEventCh <-chan core.ChainEvent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When integrating with the metrics we might want to have an intermediate select
loop that logs the metric off the primary core.ChainEvent channel before passing the event along to a secondary worker queue channel here, if we want to avoid logging metrics from within worker goroutines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, after rebasing on this branch
statediff/service.go
Outdated
@@ -410,12 +458,16 @@ func (sds *Service) Unsubscribe(id rpc.ID) error { | |||
func (sds *Service) Start() error { | |||
log.Info("Starting statediff service") | |||
|
|||
chainEventCh := make(chan core.ChainEvent, chainEventChanSize) | |||
go sds.Loop(chainEventCh) | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Brackets and comment can go I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh yeah, meant to clean it up
statediff/service.go
Outdated
// To avoid a data race caused by Geth's internal caching, deep-copy the accessed receipts | ||
for _, rct := range sds.BlockChain.GetReceiptsByHash(block.Hash()) { | ||
var newrct types.Receipt | ||
CopyReceipt(&newrct, rct) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is simpler and appears to be working when I run locally:
newRct := new(types.Receipt)
*newRct = *rct
receipts = append(receipts, newRct)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's odd - I was getting a data race detected at various points in this loop, but now I can't reproduce it even on that commit where it was happening. That was the reason for the deep copy before, but now it seems even the shallow copy is not necessary, so I can revert those changes.
09c7f65
to
141311c
Compare
141311c
to
ab841a9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Feel free to merge when you are ready.
The chain event subscription error seems to be from the workers receiving the close of the error channel before the quit channel. Like you said, doesn't seem to indicate a real problem, though. |
Changes statediff service to spawn a pool of workers each running an independent
WriteLoop
.statediff.workers
CLI flag.receipts.DeriveFields()
when used during indexing, though I didn't debug this deeply enough to find out why the same receipts data would ever be touched by goroutines which should only be working on separate blocks.probably due to the(cache miss shouldn't cause this)state.Database
hitting its cache limit.Resolves #41