-
Notifications
You must be signed in to change notification settings - Fork 981
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move substreams processing to block stream #4851
Conversation
67d1a13
to
c8a2c3b
Compare
for entity_change in block.changes.entity_changes.iter() { | ||
match entity_change.operation() { | ||
Operation::Unset => { | ||
for parsed_change in block.parsed_changes.clone().into_iter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This clone
seems at first sight superflous. In each arm of the match, you either use a reference or clone, I didn't see any move in there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will check but I think this is because block is a &Arc so you can't move it out of there to call into_iter while before it was iter()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I should have been clearer. I think the clone().into_iter()
should be iter()
because all values are either used a a reference or clone (at least from my reading, I could be wrong).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that state.entity_cache.remove(entity_key);
is the only move. I think cloning just this one would be better. I leave to you the decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scratch all my comment above, I just saw state.entity_cache.set(key, entity)?;
which is a full move, I missed it because it was part of non-modified code. Sorry :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's it I was just re-checking and the set requires the move so we might as well clone it all ahead of time. No problem at all, thanks for checking!
c8a2c3b
to
1a3d68d
Compare
use graph::substreams::Clock; | ||
use graph::substreams_rpc::response::Message as SubstreamsMessage; | ||
use prost::Message; | ||
|
||
pub struct Mapper {} | ||
pub struct Mapper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to explain here what this is for, and how setting schema
to None
influences things
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do!
ParsedChanges::Upsert { key, entity } | ||
} | ||
entity_change::Operation::Delete => ParsedChanges::Delete(key), | ||
entity_change::Operation::Unset => ParsedChanges::Unset, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not familiar with substreams vocab, but what does Unset
mean in this context? Do we even need to keep that in the parsed changes? Might be good to add that as a comment to ParsedChanges
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do, Unset is just to preserve the code logic that's applied to protobuf. Unset there is the default value when a new unsupported value is used. Say you have an enum with 3 values, if the server has a 4th value then when parsing it will default to Unset, works as a sentinel value of sorts.
In this case, at parsing we don't "care" about what operation it is, we're just transforming so there is a 1:1 mapping of supported operations. Once we run the trigger processing then there will be an error if the operation is Unset
as that's not a valid operation. I didn't want to change the responsibility of the block stream so this was the "easiest" way to maintain it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will also add a comment here to make it clearer
pub enum ParsedChanges { | ||
Unset, | ||
Delete(EntityKey), | ||
Upsert { key: EntityKey, entity: Entity }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No more intermediate HashMap
.. yay!
@@ -723,7 +725,7 @@ where | |||
for trigger in triggers { | |||
// Using an `EmptyStore` and clearing the cache for each trigger is a makeshift way to | |||
// get causality region isolation. | |||
let schema = self.inputs.store.input_schema(); | |||
let schema = ReadStore::input_schema(&self.inputs.store); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was that change necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the input_schema
function to another trait which caused this call to be ambiguous.
…hen parallel processing is possible
1a3d68d
to
0b02cda
Compare
Improves perfromance based on this issue