-
Notifications
You must be signed in to change notification settings - Fork 34
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is all looks great!! I couldn't find much to fix but I've left some questions.
Expect(string(logContent)).To(ContainSubstring(utils.ErrContractNotFound{Contract: address.Hex()}.Error())) | ||
}, w, mockTailer, []*tail.Line{line}) | ||
}) | ||
Eventually(func() (string, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙌 These asynchronous tests are really cool!! I'm going to want to reuse this pattern. To make sure I understand this correctly, the eventually
keeps checking for those values til they assert positively or negatively or until a timeout, and the done Done
allows the tests themselves to be ran asynchronously by waiting for done
to be closed until a certain timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question!
As I understand it, Eventually
means the assertion only fails if it remains false through the timeout (but not just because it's false when it's first evaluated), and Done
makes the tests run asynchronously so that channel reads aren't blocking other tests.
I think it may be a little redundant to be using both Eventually
and Done
, since both enforce timeouts. We could probably get away with removing Done
, but it seems like maybe there's added value insofar as the tests don't block while awaiting something like a (nanosecond 😂) ticker.
if parseErr != nil { | ||
errs <- parseErr | ||
} | ||
out <- row |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still sends after a parseErr
, is it okay that empty StorageDiffRow{}
s enter the out
channel in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch! I had fixed that on a spike I did beforehand but then forgot it after test-driving this work. Probably don't want to send empty rows 👍
return parseErr | ||
executeErr := storageTransformer.Execute(row) | ||
if executeErr != nil { | ||
if isKeyNotFound(executeErr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not very experienced with the storage transformers, what is it that can change to break this error from continuing in processQueue
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand the question - could you say more? processQueue
differs from processRow
in that it only takes further action if exectureErr
is nil - otherwise it just leaves the row in the queue to be retried later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry that was worded poorly, I guess I am wondering what it is that changes in the storage transformer's execution cycle that causes the executeErr
it returns to be nil in the processQueue
when it had been isKeyNotFound
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah great question! That's basically the crux of what makes this feature worthwhile :)
Storage keys for values in more complex data structures like mappings are derived by hashing the index of that variable on the contract concatenated with the key to the mapping. E.g. if the contract has a variable mapping (address => uint256) balances
, you need to know the address
keys in that mapping to be able to recognize a given diff as representing a changed balance.
We're using the event transformers to catch all of the possible keys being used in the mappings we care about. But the event transformers could be lagging behind the storage transformers, in which case we wouldn't yet be able to recognize a given diff's storage key. That's where we queue the diff, and then this process will keep retrying - on the assumption that the event transformers will eventually give us the value we need to recognize the key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, that clears everything up!
@@ -0,0 +1,33 @@ | |||
package fetcher |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could add license header
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Some curious questions 🕵️♀️
cmd/root.go
Outdated
pollingInterval = 7 * time.Second | ||
validationWindow = 15 | ||
pollingInterval = 7 * time.Second | ||
queueRecheckInterval = 5 * time.Minute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be nice to make this a CLI argument with a default value? Backwards compatible, but still configurable.
(kinda applies to all our hard coded constants like batch size, validation window, etc, but that's not in scope here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I dig that idea 👍
) | ||
|
||
BeforeEach(func() { | ||
row = utils.StorageDiffRow{ | ||
Contract: common.HexToAddress("0x123456"), | ||
BlockHash: common.HexToHash("0x678901"), | ||
BlockHeight: 987, | ||
StorageKey: common.HexToHash("0x654321"), | ||
StorageValue: common.HexToHash("0x198765"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For future-proofing, does it make sense to see if it works without the leading 0x
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or is it formatted that way for both geth and parity for example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@m0ar I think the 0x
is inconsequential when calling common.HexToHash
since its implementation converts the string with a function that chops off the 0x
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah but I mean the storage diff code, does it always assume there is no leading 0x
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah the code that converts the raw diffs to their internal representation uses the same functions, so it'll handle hex values with and without the prefix.
Expect(len(rows)).To(Equal(2)) | ||
Expect(rows[0]).NotTo(Equal(rows[1])) | ||
Expect(rows[0].Id).NotTo(BeZero()) | ||
Expect(rows[0].Contract).To(Or(Equal(row.Contract), Equal(rowTwo.Contract))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the ordering not matter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the ordering should matter here, since we iterate through all rows. Let me know if it seems like I'm missing something, though.
storageWatcher.processRow(row) | ||
case <-ticker.C: | ||
storageWatcher.processQueue() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to add a default sleep case here if all falls through?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 good question! Curious to hear more from you and others.
On the one hand, I'm inclined to keep spinning so that we stay up to date with new rows as quickly as possible. But, on the other hand, I could see the benefit of throttling that if we're checking many times more often than necessary.
I think I'd be tempted to stick with the current implementation and then do some investigation around optimal sleep times if we decide to go that route, but am very open to reconsidering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking we run this on the same machine as the other commands, won't this hog a core basically permanently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like when the interval is 5 minutes, this will still loop at blazing speed until some channel is populated no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that sounds right but it's unclear to me if we need to make that optimization right now. Our light sync code, for example, has a similar pattern, and seems to run fine alongside other processes right now (granted, that loop does have a sleep in one of the case
statements - but I believe it would loop by default while neither the ticker or nor the missing blocks are receiving messages).
I'm definitely not opposed to exploring what kind of hit we're taking here and putting a story on the board to add defaults that sleep. But i think it may be worth a separate story because it's a bit tricky to test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That linked code is a bit different, since it'll sleep after every run of backfilling if it yielded no missing blocks. Only in the case of actual meat to work with it'll re-run directly afterwards. Anyway, putting it in a later story is fine :)
logrus.Warn(fmt.Sprintf("error queueing storage diff with unrecognized key: %s", queueErr)) | ||
} | ||
} else { | ||
logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these rows re-run some time, or how do we follow up failing transformer execution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I think these rows would be lost. Thinking I may change things to queue diffs on any error in execution...
} | ||
|
||
func (storageWatcher StorageWatcher) processQueue() { | ||
rows, fetchErr := storageWatcher.Queue.GetAll() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be safer from a disposability POW to peek+pop one element at a time instead? I'm thinking about the case where we empty and cache up the whole queue, but something happens and we crash/are killed/etc, do we not lose the queue?
if !ok { | ||
logrus.Warn(utils.ErrContractNotFound{Contract: row.Contract.Hex()}.Error()) | ||
// delete row from queue if address no longer watched | ||
storageWatcher.deleteRow(row.Id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...or does GetAll
not actually remove from the queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep right now the GetAll
function just fetches without deleting, pretty much exactly for the consideration you described above. Thinking that for now it makes sense to risk duplicate rows (if a queued diff that's processed isn't deleted) rather than risk deleting rows that haven't been processed, but also interested in exploring better ways of avoiding both risks.
I did consider doing everything inside of a transaction, but that felt a little heavy handed given that it would require (I think) another transformer implementation that passes around a transaction injected by the watcher. I think the better solution might lie in eventually using a different tool that's better suited to managing a queue, but would welcome thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe some kind of unique constraint and inspecting the returned error? Since this runs quite seldom anyway, can't the one-at-a-time approach work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, that makes sense. Another case where I'd propose adding a separate story since it'd be a cross repo change set (adding constraints + a custom error and handling it specifically)
func (storageWatcher StorageWatcher) deleteRow(id int) { | ||
deleteErr := storageWatcher.Queue.Delete(id) | ||
if deleteErr != nil { | ||
logrus.Warn(fmt.Sprintf("error deleting persisted row from queue: %s", deleteErr)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a subsequent run, what would happen if we encounter an already persisted row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right now the row's data would be duplicated by default, though a given storage transformer could prevent that by adding a uniqueness constraint on parsed storage rows' block number + value - though in that case you'd probably also want to return a nil
error on execute so that the delete
step could happen for the queued row.
defer os.Remove(tempFile.Name()) | ||
logrus.SetOutput(tempFile) | ||
|
||
go storageWatcher.Execute(rows, errs, time.Nanosecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a fast recheck interval... ould it mayhaps slow down the test since we'll go into that case
very often? Maybe it isn't an issue, idk 🤷♀️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I may try tuning this parameter, but it's a little bit tricky to know if it's playing a role due to the fact that these tests run pretty fast and there's a decent bit of random variance in how long they take to execute across runs without changing it.
- Replaces directly reading from a CSV - Simplifies testing - Should hopefully make it easier to plug in other sources for storage diffs (e.g. differently formatted CSVs, JSON RPC, etc)
- Iterate through queued storage at defined interval, popping rows from the queue if successfully persisted
- For any error, not just if key isn't recognized - Means we don't lose track of diffs on random ephemeral errors
eec51b1
to
6716c3b
Compare
Currently setup to run at 5 minute intervals, but that's easy to change if we gather some insight about how often we actually want it to run