Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Neptune Data builder Integration #438

Merged

Conversation

AndrewCiambrone
Copy link
Contributor

Summary of Changes

Implements the Neptune as a datastore in the Databuilder library.
RFC: https://github.com/amundsen-io/rfcs/blob/master/rfcs/013-neptune-support.md

Tests

I created tests for each model to test the serialization compatibility for Neptune. Also added a test the Neptune data loader.

Documentation

I created a sample script that shows how to use the FSNeptuneCSVLoader and NeptuneCSVPublisher.

CheckList

Make sure you have checked all steps below to ensure a timely review.

  • PR title addresses the issue accurately and concisely. Example: "Updates the version of Flask to v1.0.2"
  • PR includes a summary of changes.
  • PR adds unit tests, updates existing unit tests, OR documents why no test additions or modifications are needed.
  • In case of new functionality, my PR adds documentation that describes how to use it.
    • All the public functions and the classes in the PR contain docstrings that explain what it does
  • PR passes make test

Signed-off-by: Andrew Ciambrone <andrjc4@vt.edu>
Signed-off-by: Andrew Ciambrone <andrjc4@vt.edu>
Signed-off-by: Andrew Ciambrone <andrjc4@vt.edu>
Signed-off-by: Andrew Ciambrone <andrjc4@vt.edu>
Signed-off-by: Andrew Ciambrone <andrjc4@vt.edu>
Signed-off-by: Andrew Ciambrone <andrjc4@vt.edu>
@@ -0,0 +1,141 @@
# Copyright Contributors to the Amundsen project.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if you all want this to be placed in: https://github.com/amundsen-io/amundsengremlin

@@ -10,9 +10,9 @@
from databuilder.extractor.generic_extractor import GenericExtractor


class Neo4jEsLastUpdatedExtractor(GenericExtractor):
class EsLastUpdatedExtractor(GenericExtractor):
Copy link
Contributor Author

@AndrewCiambrone AndrewCiambrone Feb 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please Note: I changed this to be more generic so all data stores can use this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, cc @allisonsuarez from Lyft side

@@ -37,7 +37,7 @@ def __init__(self,
email: str,
first_name: str = '',
last_name: str = '',
name: str = '',
full_name: str = '',
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please Note: One of the sample files refers this as full_name and I thought that name made more sense. Up to you all if it should be name.

@AndrewCiambrone
Copy link
Contributor Author

AndrewCiambrone commented Feb 8, 2021

Please note that this PR brings support for multiple models that the gremlin metadata proxy currently does not support. So some models will not show up on the frontend even though they are ingested.

@feng-tao feng-tao added the keep fresh Disables stalebot from closing an issue label Feb 9, 2021
@feng-tao
Copy link
Member

will take a look

Copy link
Contributor

@dorianj dorianj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM. Left some code-level comments and questions. This is super neat!

I'm not familiar enough with the testing or and some other aspects to confidently approve, I will let @feng-tao give word on that.

neptune_uri = "wss://{host}/gremlin".format(
host=self._neptune_host
)
self.source_factory = neptune_bulk_loader_api.get_neptune_graph_traversal_source_factory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why save this factory?

super(NeptuneCSVPublisher, self).__init__()

def init(self, conf: ConfigTree) -> None:
self._boto_session = Session(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can/should this share code with NeptuneSessionClient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think they should. The api used by the NeptuneCSVPublisher are used in a different context than the api's used by the NeptuneSessionClient. I feel like it could be combined if you feel strongly. But I feel like it might break the abstraction of the client if the two should be blended together.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong feelings, totally fine to leave separate if we feel they may diverge more in the future -- just noticed that a lot of the code reading from config and forming connection string was similar and wondered.

errors=True
)
load_status_payload = load_status_response.get('payload', {})
if 'status' not in load_status_payload.get('overallStatus', {}):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, and ok to ignore, but I think more pythonic and DRY would be to just attempt the load_status_payload['overallStatus']['status'] and catch the KeyError?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(this applies in a few other places, but won't comment repeatedly)

file_paths = self._get_file_paths()
for file_location in file_paths:
with open(file_location, 'rb') as file_csv:
file_csv_bytes = BytesIO(file_csv.read())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why wrap the file_csv in a BytesIO? can we not pass the file_csv handle straight to neptune_api_client.upload, allowing it to stream from disk rather than buffer? If there's a reason why that's not ok, a comment would be helpful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that is not very efficient. Not sure what I was thinking when I did that. Good catch.

def validate(self) -> None:
"""
Validation method. Focused on limit the risk on deleting nodes and relations.
- Check if deleted nodes will be within 10% of total nodes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is 5% by default?

self.target_nodes = set(conf.get_list(NeptuneStalenessRemovalTask.TARGET_NODES))
self.target_relations = set(conf.get_list(NeptuneStalenessRemovalTask.TARGET_RELATIONS))
self.batch_size = conf.get_int(NeptuneStalenessRemovalTask.BATCH_SIZE)
self.dry_run = conf.get_bool(NeptuneStalenessRemovalTask.DRY_RUN)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see where this is used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great call out I must had removed it at some point during debugging. Added back in.

.with_fallback(NeptuneStalenessRemovalTask.DEFAULT_CONFIG)
self.target_nodes = set(conf.get_list(NeptuneStalenessRemovalTask.TARGET_NODES))
self.target_relations = set(conf.get_list(NeptuneStalenessRemovalTask.TARGET_RELATIONS))
self.batch_size = conf.get_int(NeptuneStalenessRemovalTask.BATCH_SIZE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are deletions being batched? i don't see this used atm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great callout I removed the flag. I could not find a great way to batch delete without it being ridiculously slow. So I found what I think is a okay way to delete the items from the db. It has not caused us any problems so far.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the batching stuff is unique to neo4j performance characteristics, so nice that we don't need to do batching here, much simpler

self,
total_records: Iterable[Dict[str, Any]],
stale_records: Iterable[Dict[str, Any]],
types: Iterable[str]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be similar to existing code, but i believe this typing won't guarantee that the iterator is replayable -- so doing if type_str not in types multiple times may fail?

Signed-off-by: Andrew Ciambrone <andrjc4@vt.edu>
Signed-off-by: Andrew Ciambrone <andrjc4@vt.edu>
Signed-off-by: Andrew Ciambrone <andrjc4@vt.edu>
Copy link
Member

@feng-tao feng-tao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks great, some minor naming nits , thanks for the great contribution!

edge_traversal.next()

@staticmethod
def _update_entity_properties_on_traversal(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason some static method starts with underscore while some don't?

@@ -10,9 +10,9 @@
from databuilder.extractor.generic_extractor import GenericExtractor


class Neo4jEsLastUpdatedExtractor(GenericExtractor):
class EsLastUpdatedExtractor(GenericExtractor):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, cc @allisonsuarez from Lyft side

yield result

def get_scope(self) -> str:
return 'extractor.search_data'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extractor.neptune_search_data ? the scope should be different between extractors

self._closer.close()

def get_scope(self) -> str:
return "loader.filesystem_csv_neptune"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loader.neptune_filesystem_csv ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to be consistent with https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/loader/file_system_neo4j_csv_loader.py#L187

but I think loader.neptune_filesystem_csv sounds better.

Copy link
Member

@feng-tao feng-tao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add a short tutorial on how to use Neptune e2e in https://github.com/amundsen-io/amundsen/tree/master/docs/tutorials ? That would be super helpful for others who want to use neptune , thanks!

Signed-off-by: Andrew Ciambrone <andrjc4@vt.edu>
@feng-tao feng-tao merged commit 303e8aa into amundsen-io:master Feb 17, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
keep fresh Disables stalebot from closing an issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants