-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(ingestion/nifi): Improve nifi lineage extraction performance #11490
Conversation
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
@hsheth2 please take a look again - I changed the PR completely, now it is improving the performance of the lineage extraction. It is ready to be merged if I get approvals. |
@@ -694,8 +781,11 @@ def fetch_provenance_events( | |||
|
|||
if provenance_response.ok: | |||
provenance = provenance_response.json().get("provenance", {}) | |||
total = provenance.get("results", {}).get("total") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would these counts be available in provenance results before verifying that fetching provenance results has finished - which is done below in while loop ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great catch - while it doesn't cause any exception it actually always returns 0
in such case for total
and totalCount
. Moved the retrieval of these values to back where they were.
connections: List[Tuple[str, str]] = field(default_factory=list) | ||
connections: BidirectionalComponentGraph = field( | ||
default_factory=BidirectionalComponentGraph | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming, perf improvement is due to removing duplicate connection edges,
I wonder if changing connections: List[Tuple[str, str]]
to connections: Set[Tuple[str, str]]
and keeping everything else just as before give similar boost in perf ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would address duplicates problem but then we would need to still iterate over whole set to find matches for incoming/outgoing connections for an entity. The question is why would we do it this way then if we already have better implementation in place? Do you see any shortcoming of the current implementation, beside taking roughly twice the amount of memory (which I think is not much in first place, amount of connections, even for the biggest clusters, tends to be below 100k).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation of BidirectionalComponentGraph
is fair and is adapted as per operations we need. It looks good to me.
self.outgoing: Dict[str, Set[str]] = defaultdict(set) | ||
self.incoming: Dict[str, Set[str]] = defaultdict(set) | ||
# this will not count duplicates/removal of non-existing connections correctly - it is only for quick check | ||
self.connections_cnt = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename instance variables incoming, outgoing and connections_cnt to start with an underscore to clarify that they are not to be modified/accessed outside of class ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed the variables, as requested. Do we follow this pattern in other relevant classes as well? I haven't noticed many fields starting with a _
.
del self.outgoing[component] | ||
del self.incoming[component] | ||
|
||
self.connections_cnt -= deleted_connections_cnt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.connections_cnt -= deleted_connections_cnt | |
self.connections_cnt += added_connections_cnt - deleted_connections_cnt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be wrong - the add_connection
function already increments the counter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, can you add a comment mentioning so , or use remove_connection
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The count thing isn't so clear to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added an explanatory comment, remove_connection
shouldn't be used while removing a component
(due to performance reasons)
connections: List[Tuple[str, str]] = field(default_factory=list) | ||
connections: BidirectionalComponentGraph = field( | ||
default_factory=BidirectionalComponentGraph | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation of BidirectionalComponentGraph
is fair and is adapted as per operations we need. It looks good to me.
Checklist