Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Athena to Amazon Neptune connector code #222

Merged
merged 36 commits into from
Oct 15, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
6303507
fixed issue with span function for null values
abhishekpradeepmishra Jul 1, 2020
b68dcc3
Merge pull request #1 from abhishekpradeepmishra/dev
abhishekpradeepmishra Jul 1, 2020
67fae55
Added code for Athena to Neptune connector
abhishekpradeepmishra Jul 1, 2020
c351e44
added code to handle =, >=, <=
abhishekpradeepmishra Jul 1, 2020
cfd5c1c
Fixed unit test cases
abhishekpradeepmishra Jul 3, 2020
e19fe66
added test base class, cleaned up NeptuneRecordHandler
abhishekpradeepmishra Jul 3, 2020
4efbc44
Inherited from Testbase
abhishekpradeepmishra Jul 3, 2020
8eaf751
refactored code to include TestBase and removed hardcoding of Neptune…
abhishekpradeepmishra Jul 7, 2020
9520d2d
Corrected Readme and athena-neptune.yaml and deleted UDF classes
Jul 9, 2020
b6c3277
Refactored Athena Neptune Handler
abhishekpradeepmishra Jul 9, 2020
b1c39a3
refactored code, mocked test cases for MetaHandler
abhishekpradeepmishra Jul 28, 2020
7b87f27
implemented code review comments
abhishekpradeepmishra Aug 12, 2020
f8c184a
removed unwanted files
abhishekpradeepmishra Aug 12, 2020
a246be6
mocked record handler code, removed string transformation in Gremlin …
abhishekpradeepmishra Aug 19, 2020
7619577
removed string templates
abhishekpradeepmishra Aug 19, 2020
23fe7bf
Added eclipse prefs to git ignore list
Sep 16, 2020
a810b84
implemented RowWriter pattern, code clean-up and refactoring pending
abhishekpradeepmishra Sep 21, 2020
9ef2e2a
added more contraints to RecordHanderTest, refactored code, removed u…
abhishekpradeepmishra Sep 22, 2020
e30ee4f
refactored code
abhishekpradeepmishra Sep 22, 2020
60712ab
1Modified SAM yaml to separate DB permissions
Sep 23, 2020
f4a9cd0
1. Added vscode settings to ignore
Sep 23, 2020
74b2cb6
Added packaged yaml to ignore list
Sep 24, 2020
093c6ad
Add S3 list all buckets and simplified neptune ARN
Sep 24, 2020
0e408f5
Merge branch 'master' of https://github.com/awslabs/aws-athena-query-…
Sep 24, 2020
9369a6f
Modifications align with latest FederatedIdentity
Sep 24, 2020
e4d654f
code review changes
abhishekpradeepmishra Oct 6, 2020
ca4b7d7
Draft version of Readme file
Oct 12, 2020
d0cfa3e
Merge branch 'athena-neptune' of https://github.com/abhishekpradeepmi…
Oct 12, 2020
33be79b
code review related changes
abhishekpradeepmishra Oct 12, 2020
73998cb
Merge remote-tracking branch 'upstream/master' into athena-neptune
Oct 12, 2020
56f4fb9
Updated data types in readme and updated gitignore
Oct 14, 2020
e98adb3
Merge remote-tracking branch 'upstream/master' into athena-neptune
Oct 14, 2020
7fc923c
added support for more data types
abhishekpradeepmishra Oct 14, 2020
0de38d3
Fixed import order formatting issue.
Oct 14, 2020
8ff2c50
added test cases constraints for Boolean and BigInt, fixed issue wit…
abhishekpradeepmishra Oct 14, 2020
05cf0c7
committing chagnes for test cases
abhishekpradeepmishra Oct 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@

import com.amazonaws.athena.connector.lambda.domain.predicate.Marker.Bound;

import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.tinkerpop.gremlin.process.traversal.P;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.lambda.PredicateTraversal;
import org.apache.tinkerpop.gremlin.structure.Vertex;

/**
* This class is a Utility class to general gremlin query equivalents of
* Contraints being passed via AWS Lambda Handler
*/
public class GremlinQueryPreProcessor {
avirtuos marked this conversation as resolved.
Show resolved Hide resolved

public enum Operator {
LESSTHAN, GREATERTHAN, EQUALTO, NOTEQUALTO
}
Expand All @@ -49,17 +50,13 @@ public enum Operator {
*
* @return A Gremlin Query Part equivalent to Contraint.
*/
// public static String generateGremlinQueryPart(String key, String value,
// String type, Bound bound,
// Operator operator) {

public static GraphTraversal<Vertex, Vertex> generateGremlinQueryPart(GraphTraversal<Vertex, Vertex> traversal,
String key, String value, String type, Bound bound, Operator operator) {

switch (type) {
String key, String value, ArrowType type, Bound bound, Operator operator) {

case "Int(32, true)":
Types.MinorType minorType = Types.getMinorTypeForArrowType(type);

switch (minorType) {
case INT:
if (operator.equals(Operator.GREATERTHAN)) {

traversal = bound.equals(Bound.EXACTLY) ? traversal.has(key, P.gte(Integer.parseInt(value)))
Expand All @@ -84,7 +81,59 @@ public static GraphTraversal<Vertex, Vertex> generateGremlinQueryPart(GraphTrave

break;

case "Utf8":
case FLOAT4:

if (operator.equals(Operator.GREATERTHAN)) {

traversal = bound.equals(Bound.EXACTLY) ? traversal.has(key, P.gte(Float.parseFloat(value)))
: traversal.has(key, P.gt(Float.parseFloat(value)));
}

if (operator.equals(Operator.LESSTHAN)) {

traversal = bound.equals(Bound.EXACTLY) ? traversal.has(key, P.lte(Float.parseFloat(value)))
: traversal.has(key, P.lt(Float.parseFloat(value)));
}

if (operator.equals(Operator.EQUALTO)) {

traversal = traversal.has(key, P.eq(Float.parseFloat(value)));
}

if (operator.equals(Operator.NOTEQUALTO)) {

traversal = traversal.has(key, P.neq(Float.parseFloat(value)));
}

break;

case FLOAT8:

if (operator.equals(Operator.GREATERTHAN)) {

traversal = bound.equals(Bound.EXACTLY) ? traversal.has(key, P.gte(Double.parseDouble(value)))
: traversal.has(key, P.gt(Double.parseDouble(value)));
}

if (operator.equals(Operator.LESSTHAN)) {

traversal = bound.equals(Bound.EXACTLY) ? traversal.has(key, P.lte(Double.parseDouble(value)))
: traversal.has(key, P.lt(Double.parseDouble(value)));
}

if (operator.equals(Operator.EQUALTO)) {

traversal = traversal.has(key, P.eq(Double.parseDouble(value)));
}

if (operator.equals(Operator.NOTEQUALTO)) {

traversal = traversal.has(key, P.neq(Integer.parseInt(value)));
}

break;

case VARCHAR:

if (operator.equals(Operator.EQUALTO)) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,12 @@ protected void readWithConstraint(final BlockSpiller spiller, final ReadRecordsR

/**
* Used to generate Gremlin Query part for Constraint Map
*
*
* @param traversal Gremlin Traversal, traversal is updated based on constraints map
* @param hasMap Constraint Hash Map
**/
*
* @return A Gremlin Query Part equivalent to Contraint.
*/
public GraphTraversal<Vertex, Vertex> getQueryPartForContraintsMap(GraphTraversal<Vertex, Vertex> traversal,
final Map hashMap) {

Expand All @@ -190,7 +193,7 @@ public GraphTraversal<Vertex, Vertex> getQueryPartForContraintsMap(GraphTraversa
if (range.getLow().getValue().toString().equals(range.getHigh().getValue().toString())) {

traversal = GremlinQueryPreProcessor.generateGremlinQueryPart(traversal, key,
range.getLow().getValue().toString(), range.getType().toString(),
range.getLow().getValue().toString(), range.getType(),
range.getLow().getBound(), GremlinQueryPreProcessor.Operator.EQUALTO);
break;
}
Expand All @@ -202,14 +205,14 @@ public GraphTraversal<Vertex, Vertex> getQueryPartForContraintsMap(GraphTraversa
+ range.getType().toString().equalsIgnoreCase(Types.MinorType.INT.getType().toString()));

traversal = GremlinQueryPreProcessor.generateGremlinQueryPart(traversal, key,
range.getLow().getValue().toString(), range.getType().toString(), range.getLow().getBound(),
range.getLow().getValue().toString(), range.getType(), range.getLow().getBound(),
GremlinQueryPreProcessor.Operator.GREATERTHAN);
}

if (!range.getHigh().isNullValue()) {

traversal = GremlinQueryPreProcessor.generateGremlinQueryPart(traversal, key,
range.getHigh().getValue().toString(), range.getType().toString(),
range.getHigh().getValue().toString(), range.getType(),
range.getLow().getBound(), GremlinQueryPreProcessor.Operator.LESSTHAN);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,18 @@
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.athena.connector.lambda.data.writers.holders.NullableVarCharHolder;
import org.apache.arrow.vector.holders.NullableIntHolder;
import org.apache.arrow.vector.holders.NullableFloat4Holder;
import org.apache.arrow.vector.holders.NullableFloat8Holder;

/**
* This class is a Utility class to create Extractors for each fields as per Schema
*/
public class TypeRowWriter {

public static RowWriterBuilder writeRowTemplate(RowWriterBuilder rowWriterBuilder, Field field) {

Logger logger = LoggerFactory.getLogger(TypeRowWriter.class);

ArrowType arrowType = field.getType();
Types.MinorType minorType = Types.getMinorTypeForArrowType(arrowType);

Expand All @@ -50,11 +49,10 @@ public static RowWriterBuilder writeRowTemplate(RowWriterBuilder rowWriterBuilde
rowWriterBuilder.withExtractor(field.getName(),
(Float8Extractor) (Object context, NullableFloat8Holder value) -> {
value.isSet = 1;

Map<Object, Object> obj = (Map<Object, Object>) context;
ArrayList<Object> objValues = (ArrayList)obj.get(field.getName());

value.value = Float.parseFloat(objValues.get(0).toString());
value.value = Double.parseDouble(objValues.get(0).toString());
avirtuos marked this conversation as resolved.
Show resolved Hide resolved
});

break;
Expand All @@ -76,7 +74,6 @@ public static RowWriterBuilder writeRowTemplate(RowWriterBuilder rowWriterBuilde
rowWriterBuilder.withExtractor(field.getName(),
(Float4Extractor) (Object context, NullableFloat4Holder value) -> {
value.isSet = 1;

Map<Object, Object> obj = (Map<Object, Object>) context;
ArrayList<Object> objValues = (ArrayList)obj.get(field.getName());

Expand All @@ -88,7 +85,6 @@ public static RowWriterBuilder writeRowTemplate(RowWriterBuilder rowWriterBuilde
rowWriterBuilder.withExtractor(field.getName(),
(IntExtractor) (Object context, NullableIntHolder value) -> {
value.isSet = 1;

Map<Object, Object> obj = (Map<Object, Object>) context;
ArrayList<Object> objValues = (ArrayList)obj.get(field.getName());
value.value = Integer.parseInt(objValues.get(0).toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,14 @@
*/
package com.amazonaws.athena.connectors.neptune;

import com.amazonaws.athena.connector.lambda.QueryStatusChecker;
import com.amazonaws.athena.connector.lambda.data.Block;
import com.amazonaws.athena.connector.lambda.data.BlockAllocator;
import com.amazonaws.athena.connector.lambda.data.BlockAllocatorImpl;
import com.amazonaws.athena.connector.lambda.data.BlockUtils;
import com.amazonaws.athena.connector.lambda.data.BlockWriter;
import com.amazonaws.athena.connector.lambda.data.SchemaBuilder;
import com.amazonaws.athena.connector.lambda.domain.Split;
import com.amazonaws.athena.connector.lambda.domain.TableName;
import com.amazonaws.athena.connector.lambda.domain.predicate.Constraints;
import com.amazonaws.athena.connector.lambda.domain.predicate.Range;
import com.amazonaws.athena.connector.lambda.domain.predicate.SortedRangeSet;
import com.amazonaws.athena.connector.lambda.domain.predicate.ValueSet;
import com.amazonaws.athena.connector.lambda.handlers.GlueMetadataHandler;
import com.amazonaws.athena.connector.lambda.metadata.GetSplitsRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetSplitsResponse;
import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutResponse;
import com.amazonaws.athena.connector.lambda.metadata.GetTableRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetTableResponse;
import com.amazonaws.athena.connector.lambda.metadata.ListSchemasRequest;
import com.amazonaws.athena.connector.lambda.metadata.ListSchemasResponse;
import com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest;
import com.amazonaws.athena.connector.lambda.metadata.ListTablesResponse;
import com.amazonaws.athena.connector.lambda.metadata.MetadataRequestType;
import com.amazonaws.athena.connector.lambda.metadata.MetadataResponse;
import com.amazonaws.athena.connector.lambda.security.FederatedIdentity;
import com.amazonaws.athena.connector.lambda.security.LocalKeyFactory;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.glue.AWSGlue;
Expand All @@ -56,10 +37,7 @@
import com.amazonaws.services.glue.model.StorageDescriptor;
import com.amazonaws.services.glue.model.Table;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import com.google.common.collect.ImmutableList;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.commons.collections.CursorableLinkedList.Cursor;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -70,20 +48,15 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.junit.Assert.*;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
Expand Down
Loading