Skip to content

Commit

Permalink
neptune python util update + neo4j to neptune export fix + doc update
Browse files Browse the repository at this point in the history
  • Loading branch information
abhishekpradeepmishra committed Nov 8, 2021
1 parent 564934c commit f661a2c
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ public PropertyValue parse(String s){
PropertyValue parseArrayValue(String s, ArrayNode arrayNode) {
Set<String> values = new HashSet<>();
for (JsonNode node : arrayNode) {
System.out.println(node.textValue());
//values.add(format(node.textValue().replace(";", semicolonReplacement)));
values.add(format(node.asText().replace(";", semicolonReplacement)));
}
if (values.size() < arrayNode.size()){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# and limitations under the License.

import sys
import boto3
import requests
from neptune_python_utils.endpoints import Endpoints

Expand All @@ -23,32 +22,8 @@ def __init__(self, region, role_arn):
self.region = region
self.role_arn = role_arn

def neptune_endpoints(self, connection_name):
"""Gets Neptune endpoint information from the Glue Data Catalog.
You may need to install a Glue VPC Endpoint in your VPC for this method to work.
You can store Neptune endpoint information as JDBC connections in the Glue Data Catalog.
JDBC connection strings must begin 'jdbc:'. To store a Neptune endpoint, use the following format:
'jdbc:<protocol>://<dns_name>:<port>/<endpoint>'
For example, if you store:
'jdbc:wss://my-neptune-cluster.us-east-1.neptune.amazonaws.com:8182/gremlin'
– this method will return:
'wss://my-neptune-cluster.us-east-1.neptune.amazonaws.com:8182/gremlin'
Example:
>>> gremlin_endpoint = GlueNeptuneConnectionInfo(glueContext).neptune_endpoint('neptune')
"""
glue = boto3.client('glue', region_name=self.region)

connection = glue.get_connection(Name=connection_name)
neptune_uri = connection['Connection']['ConnectionProperties']['JDBC_CONNECTION_URL'][5:]
parse_result = requests.utils.urlparse(neptune_uri)
def neptune_endpoints(self, neptune_endpoint):
parse_result = requests.utils.urlparse(neptune_endpoint)
netloc_parts = parse_result.netloc.split(':')
host = netloc_parts[0]
port = netloc_parts[1]
Expand Down
47 changes: 45 additions & 2 deletions neptune-python-utils/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,49 @@ endpoints = GlueNeptuneConnectionInfo(region, role_arn).neptune_endpoints('neptu

### Using neptune-python-utils to insert or upsert data from an AWS Glue job

When using neptune-python-utils with Glue Job, you need to attach a Glue Connection of type "Network". This allows Glue Job to connect to the Neptune cluster. You can create the Glue Connection either manually or using sample script below:

```
import boto3
session = boto3.Session(region_name=aws_region)
ec2_resource = session.resource("ec2")
ec2_client = session.client("ec2")
glue_client = session.client("glue")
subnet_ids = []
for vpc in ec2_resource.vpcs.all():
if vpc.id == '<replace-with-cluster-vpc-id>':
for subnet in vpc.subnets.all():
subnet_ids.append(subnet.id)
subnets = ec2_client.describe_subnets(SubnetIds=subnet_ids)
connections = []
for subnet in subnets["Subnets"]:
connectionName = 'connection' + subnet["SubnetId"]
response = glue_client.create_connection(
ConnectionInput={
'Name': connectionName,
'Description': connectionName,
'ConnectionType': 'NETWORK',
'ConnectionProperties': {},
'PhysicalConnectionRequirements': {
'SubnetId': subnet["SubnetId"],
'SecurityGroupIdList': [
'<replace-with-neptune-security-group>',
],
'AvailabilityZone': subnet["AvailabilityZone"]
}
}
)
connections.append(connectionName)
```

The code below, taken from the sample Glue job [export-from-mysql-to-neptune.py](https://github.com/aws-samples/amazon-neptune-samples/blob/master/gremlin/glue-neptune/glue-jobs/mysql-neptune/export-from-mysql-to-neptune.py), shows extracting data from several tables in an RDBMS, formatting the dynamic frame columns according to the Neptune bulk load CSV column headings format, and then bulk loading direct into Neptune.

Parallel inserts and upserts can sometimes trigger a `ConcurrentModificationException`. _neptune-python-utils_ will attempt 5 retries for each batch should such exceptions occur.
Expand Down Expand Up @@ -577,7 +620,7 @@ from neptune_python_utils.glue_gremlin_csv_transforms import GlueGremlinCsvTrans
from neptune_python_utils.endpoints import Endpoints
from neptune_python_utils.gremlin_utils import GremlinUtils
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'DATABASE_NAME', 'NEPTUNE_CONNECTION_NAME', 'AWS_REGION', 'CONNECT_TO_NEPTUNE_ROLE_ARN'])
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'DATABASE_NAME', 'NEPTUNE_ENDPOINT', 'AWS_REGION', 'CONNECT_TO_NEPTUNE_ROLE_ARN'])
sc = SparkContext()
glueContext = GlueContext(sc)
Expand All @@ -592,7 +635,7 @@ supplier_table = 'salesdb_supplier'
# Create Gremlin client
gremlin_endpoints = GlueNeptuneConnectionInfo(args['AWS_REGION'], args['CONNECT_TO_NEPTUNE_ROLE_ARN']).neptune_endpoints(args['NEPTUNE_CONNECTION_NAME'])
gremlin_endpoints = GlueNeptuneConnectionInfo(args['AWS_REGION'], args['CONNECT_TO_NEPTUNE_ROLE_ARN']).neptune_endpoints(args['NEPTUNE_ENDPOINT'])
gremlin_client = GlueGremlinClient(gremlin_endpoints)
# Create Product vertices
Expand Down

0 comments on commit f661a2c

Please sign in to comment.