-
Notifications
You must be signed in to change notification settings - Fork 34
Conversation
} | ||
} | ||
|
||
func (fetcher *GethRpcStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffRow, errs chan<- error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was thinking that calling this data structure StorageDiffRow doesn't make as much sense with the rpc pub sub, since it's not technically a row. Would calling it StorageDiff make sense? I'm worried that it's too generic, and there's a lot of structures/packages/files/variables called stateDiff or storageDiff and it may lose some of the meaning. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @elizabethengelman this is looking great!
In regards to the hashes vs addresses issue, during the state-diffing process the node iterator can easily obtain the hash (it.LeafKey()
), but I see no means of ascertaining the address. I'm curious how Parity is doing this. I also might be missing something obvious over in Geth, another reason it would be good if that were reviewed by more people.
In regards to dep ensure, we can get rid of the IPFS and eth-block-extractor dependencies in the Gopkg.toml since we aren't using those here and that seems to prevent any issues.
errs <- err | ||
} | ||
|
||
for diff := range ethStatediffPayloadChan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a select statement within a repeating for loop (possibly within a goroutine? similar to https://github.com/vulcanize/vulcanizedb/blob/ipfs_concurrency/pkg/ipfs/service.go#L147) As is, once it iterates through everything that is in the channel it will fall through and exit this method, it won't wait for and process new things that are added to the channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch! I've added a for loop here instead of a range. I don't think it needs a goroutine, since FetchStorageDiffs
is being called in a goroutine, but I could be wrong. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah if that's the case there should be no need for an internal goroutine! I may be missing something, but it might be worth enforcing buffering on the statediffPayloadChan
. I think that channel is only ever used internal to the FetchStorageDiffs
method so instead of passing in the chan as an argument to NewGethRpcStorageFetcher
it could be initialized with a size as part of the NewGethRpcStorageFetcher
function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I keep going back an forth on that. I think a big part of the reason I am passing it into NewStorageWatcher
is so that it's easier to test. But totally be happy to reconsider that - testing channels is hard, I'm having trouble seeing a different way to do that at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's an excellent point, I wasn't thinking about tests (kinda my crux).
thanks for the review @i-norden! i mentioned it in the And good catch, removed the ipfs and eth-block-extractor dependencies. |
7f652c3
to
885dcbe
Compare
b7de602
to
47e886f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks really awesome! A lot of the comments in here are geared more toward information gathering than suggesting revisions. Excited to start parsing diffs off the subscription! 🎉 🚀
ethStatediffPayloadChan := fetcher.statediffPayloadChan | ||
_, err := fetcher.streamer.Stream(ethStatediffPayloadChan) | ||
if err != nil { | ||
errs <- err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we might want to return
after this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice call. I wonder if we should even have the process exit here if the subscription fails. Because even if we return here, the loop in the watcher continues.
Long term, I think it would be cool to allow for a retry so that if a subscriptions fails, the process is smart enough to try again. 🤔
stateDiff := new(statediff.StateDiff) | ||
err = rlp.DecodeBytes(diff.StateDiffRlp, stateDiff) | ||
if err != nil { | ||
errs <- err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and then maybe continue
here? not sure if there are potentially weird consequences from cycling through the rest of the execution when some of these things fail
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if there's one (or several) decoding failures it makes sense to continue. I'm not entirely sure when continue
is necessary and when it's not to be honest. It looks like this continues, even without continue
🤔🤯
@@ -43,12 +45,17 @@ func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) Storage | |||
queue := storage.NewStorageQueue(db) | |||
return StorageWatcher{ | |||
db: db, | |||
diffSource: "csv", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be the default? also, is it worth putting these strings in an enum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that with the refactor in 8e8468f this is no longer as big of a deal since the only place the strings are used is in the commands to decide which StorageWatcher
to create. 🤷♀
@@ -71,10 +78,28 @@ func (storageWatcher StorageWatcher) Execute(rows chan utils.StorageDiffRow, err | |||
} | |||
} | |||
|
|||
func (storageWatcher StorageWatcher) getTransformer(contractAddress common.Address) (transformer.StorageTransformer, bool) { | |||
if storageWatcher.diffSource == "csv" { | |||
storageTransformer, ok := storageWatcher.Transformers[contractAddress] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think we could probably just return this call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, i thought so too, but my IDE says Not enough arguments to return
- I wonder if it's because getting a var from a map can return just the one value, and not the ok
value
} else if storageWatcher.diffSource == "geth" { | ||
logrus.Debug("number of transformers", len(storageWatcher.Transformers)) | ||
for address, t := range storageWatcher.Transformers { | ||
keccakOfTransformerAddress := common.BytesToAddress(crypto.Keccak256(address[:])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering if it might be worth trying to memoize this? maybe keeping an additional variable that maps the keccak => transformer? Also wondering whether maybe the difference between csv
and geth
is great enough that it may be worth having separate implementations behind an interface 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
func NewGethRpcStorageFetcher(streamer streamer.Streamer, statediffPayloadChan chan statediff.Payload) GethRpcStorageFetcher { | ||
return GethRpcStorageFetcher{ | ||
statediffPayloadChan: statediffPayloadChan, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to distinguish between statediffs and storage diffs here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call! I think that the term "state diff" is correct at this point in the process since we're getting this from the geth rpc pub sub, an at this point the payload includes the state diffs.
But, further down when we're iterating through the accounts, that is when it is specifically a storage diff.
//this is not the contract address, but the keccak 256 of the address | ||
Contract: common.BytesToAddress(test_data.ContractLeafKey[:]), | ||
BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), | ||
BlockHeight: intHeight, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we make BlockHeight
an int64
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. 🤔It's surprisingly a more widespread change than I was expecting - the StorageRepository.Create
method expects the block height to be an int
as well. I can't think of any reasons why that couldn't/shouldn't change to a int64
as well, but would love some other thoughts on this
Proof: [][]byte{}, | ||
}} | ||
emptyStorage = make([]statediff.StorageDiff, 0) | ||
contractAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476592") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the values for the test data important?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I snagged this data from our geth patch: https://github.com/vulcanize/go-ethereum/blob/6d440a95791e993dc2d329e1554cb5686bcc9a81/statediff/testhelpers/test_data.go.
@i-norden do you recall if this data is based on real chain data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you were the one to write the first tests and most the test data for our geth patch 🙃
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is based on real chain data though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha oh! 😊
|
||
func (streamer *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { | ||
logrus.Info("streaming diffs from geth") | ||
return streamer.client.Subscribe("statediff", payloadChan, "stream") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
Id: 1337, | ||
Contract: address, | ||
BlockHash: common.HexToHash("0xfedcba9876543210"), | ||
BlockHeight: 0, | ||
StorageKey: common.HexToHash("0xabcdef1234567890"), | ||
StorageValue: common.HexToHash("0x9876543210abcdef"), | ||
} | ||
gethRow = utils.StorageDiffRow{ | ||
Id: 1338, | ||
Contract: common.BytesToAddress(crypto.Keccak256(address[:])), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this approach is correct, but I sorta paused wondering whether the value here should actually be a keccak of the padded address (expanded to 32 bytes, as with storage diffs where the key in a mapping is an address).
e1ad96d
to
8e8468f
Compare
cmd/execute.go
Outdated
stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient) | ||
payloadChan := make(chan statediff.Payload) | ||
storageFetcher := fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan) | ||
sw := watcher.NewGethStorageWatcher(&storageFetcher, &db) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you pass in the storageFetcher
address to the NewGethStorageWatcher
function but not to the NewCsvStorageWatcher
function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch! This was required because the GethRpcStorageFetcher
's implementation of FetchStorageDiffs
had a pointer receiver. But, I actually don't think that's necessary! Updated in 74f6f80.
f6358d2
to
d408771
Compare
@elizabethengelman do we need to add all the vendor stuff? Was hoping we'd be able to leave that out if we specified everything in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a few more comments. It's looking good to me, I'm going to go ahead and approve as I trust you will make any needed adjustments before merging :)
StorageFetcher fetcher.IStorageFetcher | ||
Queue storage.IStorageQueue | ||
Transformers map[common.Address]transformer.StorageTransformer | ||
KeccakAddressTransformers map[common.Address]transformer.StorageTransformer // keccak hash of an address => transformer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be map[common.Hash]transformer.StorageTransformer
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh interesting. It is probably more accurate for it to be a common.Hash
vs an common.Address
. I was trying to reuse this structure defined here for a diff, but it looks like that may not work out. I think when we convert from a hash to an address, it ends up cutting off the first several characters to make sure that it is the right length for an address, which isn't ideal. I'll have to think on that a bit to see how best to handle it.
|
||
func (fetcher GethRpcStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiff, errs chan<- error) { | ||
ethStatediffPayloadChan := fetcher.statediffPayloadChan | ||
clientSubscription, clientSubErr := fetcher.streamer.Stream(ethStatediffPayloadChan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to handle the clientSubscription
errors? We could use a select
to receive payloads from ethStatediffPayloadChan
and errors from clientSubscription.Err()
kinda like in the seed node service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is an interesting point. Originally I think I just had the client errors being send to the errs
chan, but that ends up just warning the user that there was an error in the watcher. It seems like we may potentially want to stop the process (at least for now) if the subscription fails, so that we don't miss the failure, and able to manually intervene.
But my thought is that long term, this would retry the subscription if it fails, or is dropped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bc794e4 LGTM! Doesn't look like it included edits to this part though? I'm still not sure what the best way to handle the subscription errors is at this time, the original intention of that err channel is to signal resubscription like you intend https://github.com/vulcanize/go-ethereum/blob/statediffing/rpc/subscription.go#L229 but for now maybe we should just stop the process?
Queue: queue, | ||
Transformers: transformers, | ||
} | ||
type StorageWatcher struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stupid question, but why are there 3 storage watchers (CSV, geth, and this one)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StorageWatcher
is like the base or parent struct that GethStorageWatcher
and CsvStorageWatcher
inherit from since there is a lot of shared behavior. I'd be happy to rethink, or update this approach if this is confusing, or not ideal.
height := test_data.BlockNumber | ||
intHeight := int(height.Int64()) | ||
expectedStorageDiff := utils.StorageDiff{ | ||
//this is not the contract address, but the keccak 256 of the address |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the Contract
field be renamed to something like ContractAddressHash
, and should it be type common.Hash
instead of common.Address
?
@rmulhol nice catch about the vendor dir! I think that was a rebase issue, and that it should work without. Removing it now! ✂️ |
952b2ac
to
3e86ec4
Compare
} | ||
} | ||
|
||
func HexToKeccak256Hash(addr string) common.Hash { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super small thing, but this method should work for an address or a hash string, right? I wonder if it would make sense to change the arg name from addr
to hexString
or something. I'd be happy to make that change, but wanted to verify that I'm reading it correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, good call!
geth:run the geth patch with these flags:
vdb:
|
I tested this with the parity mainnet diffs and after running |
- Tool to facilitate parsing diffs from Geth patch that emits hashed versions of storage keys
…okup" This reverts commit cda646b.
- Enables syncing Geth and Parity diffs with same transformer lookup - Maybe worth always hashing the storage key so we don't need a hashed and not-hashed version in the key lookups?
- Also prefer crypto.Keccak256Hash(x) to common.BytesToHash(crypto.Keccak256(x))
8d5e92b
to
5c0e559
Compare
specifically golang.org/x/crypto/sha3
ebc0ce0
to
267de00
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚢 Looks awesome!
@elizabethengelman Sorry I completely missed these comments, my bad. Seems like the merge did cause some issues, I can't build the
|
@m0ar 🤔 are you trying to build against mcd_transformers |
No, against |
I think that mcd_transformers branch |
Geth
make geth
VulcanizeDB
When running locally I've needed to make sure to replace the transformer repo with a specific version, i.e. run:
go mod edit -replace=github.com/vulcanize/mcd_transformers=github.com/vulcanize/mcd_transformers@vdb-363-geth-diffs
build:
make build
run
headerSync
andcomposeAndExecute
concurrently:headerSync
:composeAndExecute
:if you're seeing this error
plugin was built with a different version of package golang.org/x/crypto/sha3
trygo get golang.org/x/crypto/sha3
. This dependency issue still needs to be figured out.This is still a bit of a rough draft.One of the biggest hangups was that the geth patch only emits the keccak256 hash of the contract address, not the contract address itself. Whereas the parity patch emits the actual unhashed contract address. This gets particularly tricky in theStorageWatcher
when determining which transformer to use.For now, I've created a method to convert the transformers' contract addresses to keccak hashes when we pass in the--state-diff-source geth
flag, and use the plain address by default or when--state-diff-source csv
is passed to the command. I don't love this solution because it leaks the details about how the state diffs are received into the watcher. I have some rough ideas about how to fix this, but wanted to get the rough draft up for feedback!If I remember correctly, how to get the actual address while processing statediffs isn't immediately obvious with a non-archive node, so this is why the geth patch is emitting the keccak256 hash of the address.Also, it's worth mentioning that when I tried tono longer relevant since we're using go modules now.dep ensure
, I ran into issues, which could be specific to my system. But, when I just dogo build
and rely on the versions in the vendor dir, it seems to work ok.