Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix task result with path/tree can't be serialized #1351

Merged
merged 3 commits into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.tinkerpop.gremlin.process.traversal.Path;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.Tree;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONIo;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
import org.apache.tinkerpop.gremlin.structure.io.graphson.TinkerPopJacksonModule;
import org.apache.tinkerpop.shaded.jackson.core.JsonGenerator;
import org.apache.tinkerpop.shaded.jackson.core.JsonParser;
Expand Down Expand Up @@ -70,10 +75,10 @@ public class HugeGraphSONModule extends TinkerPopJacksonModule {

private static final long serialVersionUID = 6480426922914059122L;

public static boolean OPTIMIZE_SERIALIZE = true;

private static final String TYPE_NAMESPACE = "hugegraph";

private static boolean OPTIMIZE_SERIALIZE = true;

@SuppressWarnings("rawtypes")
private static final Map<Class, String> TYPE_DEFINITIONS;

Expand Down Expand Up @@ -198,6 +203,9 @@ public static void registerGraphSerializers(SimpleModule module) {
*/
module.addSerializer(HugeVertex.class, new HugeVertexSerializer());
module.addSerializer(HugeEdge.class, new HugeEdgeSerializer());

module.addSerializer(Path.class, new PathSerializer());
module.addSerializer(Tree.class, new TreeSerializer());
}

@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -485,6 +493,49 @@ public void serializeWithType(HugeEdge value, JsonGenerator generator,
}
}

private static class PathSerializer extends StdSerializer<Path> {

public PathSerializer() {
super(Path.class);
}

@Override
public void serialize(Path path, JsonGenerator jsonGenerator,
SerializerProvider provider) throws IOException {
jsonGenerator.writeStartObject();
jsonGenerator.writeObjectField(GraphSONTokens.LABELS,
path.labels());
jsonGenerator.writeObjectField(GraphSONTokens.OBJECTS,
path.objects());
jsonGenerator.writeEndObject();
}
}

@SuppressWarnings("rawtypes") // Tree<T>
private static class TreeSerializer extends StdSerializer<Tree> {

public TreeSerializer() {
super(Tree.class);
}

@Override
public void serialize(Tree tree, JsonGenerator jsonGenerator,
SerializerProvider provider) throws IOException {
jsonGenerator.writeStartArray();
@SuppressWarnings("unchecked")
Set<Map.Entry<Element, Tree>> set = tree.entrySet();
for (Map.Entry<Element, Tree> entry : set) {
jsonGenerator.writeStartObject();
jsonGenerator.writeObjectField(GraphSONTokens.KEY,
entry.getKey());
jsonGenerator.writeObjectField(GraphSONTokens.VALUE,
entry.getValue());
jsonGenerator.writeEndObject();
}
jsonGenerator.writeEndArray();
}
}

private static class ShardSerializer extends StdSerializer<Shard> {

public ShardSerializer() {
Expand All @@ -493,8 +544,7 @@ public ShardSerializer() {

@Override
public void serialize(Shard shard, JsonGenerator jsonGenerator,
SerializerProvider provider)
throws IOException {
SerializerProvider provider) throws IOException {
jsonGenerator.writeStartObject();
jsonGenerator.writeStringField("start", shard.start());
jsonGenerator.writeStringField("end", shard.end());
Expand All @@ -511,8 +561,7 @@ public FileSerializer() {

@Override
public void serialize(File file, JsonGenerator jsonGenerator,
SerializerProvider provider)
throws IOException {
SerializerProvider provider) throws IOException {
jsonGenerator.writeStartObject();
jsonGenerator.writeStringField("file", file.getName());
jsonGenerator.writeEndObject();
Expand All @@ -527,8 +576,7 @@ public BlobSerializer() {

@Override
public void serialize(Blob blob, JsonGenerator jsonGenerator,
SerializerProvider provider)
throws IOException {
SerializerProvider provider) throws IOException {
jsonGenerator.writeBinary(blob.bytes());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,9 @@ public void testGremlinJobWithScript() throws TimeoutException {
+ "schema.propertyKey('lang').asText().ifNotExist().create();"
+ "schema.propertyKey('date').asDate().ifNotExist().create();"
+ "schema.propertyKey('price').asInt().ifNotExist().create();"
+ "person1=schema.vertexLabel('person1').properties('name','age').ifNotExist().create();"
+ "person2=schema.vertexLabel('person2').properties('name','age').ifNotExist().create();"
+ "knows=schema.edgeLabel('knows').sourceLabel('person1').targetLabel('person2').properties('date').ifNotExist().create();"
+ "schema.vertexLabel('person1').properties('name','age').ifNotExist().create();"
+ "schema.vertexLabel('person2').properties('name','age').ifNotExist().create();"
+ "schema.edgeLabel('knows').sourceLabel('person1').targetLabel('person2').properties('date').ifNotExist().create();"
+ "for(int i = 0; i < 1000; i++) {"
+ " p1=graph.addVertex(T.label,'person1','name','p1-'+i,'age',29);"
+ " p2=graph.addVertex(T.label,'person2','name','p2-'+i,'age',27);"
Expand Down Expand Up @@ -302,6 +302,89 @@ public void testGremlinJobWithScript() throws TimeoutException {
Assert.assertEquals("[1000]", task.result());
}

@Test
public void testGremlinJobWithSerializedResults() throws TimeoutException {
HugeGraph graph = graph();
TaskScheduler scheduler = graph.taskScheduler();

String script = "schema=graph.schema();"
+ "schema.propertyKey('name').asText().ifNotExist().create();"
+ "schema.vertexLabel('char').useCustomizeNumberId().properties('name').ifNotExist().create();"
+ "schema.edgeLabel('next').sourceLabel('char').targetLabel('char').properties('name').ifNotExist().create();"
+ "g.addV('char').property(id,1).property('name','A').as('a')"
+ " .addV('char').property(id,2).property('name','B').as('b')"
+ " .addV('char').property(id,3).property('name','C').as('c')"
+ " .addV('char').property(id,4).property('name','D').as('d')"
+ " .addV('char').property(id,5).property('name','E').as('e')"
+ " .addV('char').property(id,6).property('name','F').as('f')"
+ " .addE('next').from('a').to('b').property('name','ab')"
+ " .addE('next').from('b').to('c').property('name','bc')"
+ " .addE('next').from('b').to('d').property('name','bd')"
+ " .addE('next').from('c').to('d').property('name','cd')"
+ " .addE('next').from('c').to('e').property('name','ce')"
+ " .addE('next').from('d').to('e').property('name','de')"
+ " .addE('next').from('e').to('f').property('name','ef')"
+ " .addE('next').from('f').to('d').property('name','fd')"
+ " .iterate();"
+ "g.tx().commit(); g.E().count();";

HugeTask<Object> task = runGremlinJob(script);
task = scheduler.waitUntilTaskCompleted(task.id(), 10);
Assert.assertEquals("test-gremlin-job", task.name());
Assert.assertEquals("gremlin", task.type());
Assert.assertEquals(TaskStatus.SUCCESS, task.status());
Assert.assertEquals("[8]", task.result());

Id edgeLabelId = graph.schema().getEdgeLabel("next").id();

script = "g.V(1).outE().inV().path()";
task = runGremlinJob(script);
task = scheduler.waitUntilTaskCompleted(task.id(), 10);
Assert.assertEquals(TaskStatus.SUCCESS, task.status());
String expected = String.format("[{\"labels\":[[],[],[]],\"objects\":["
+ "{\"id\":1,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"A\"}},"
+ "{\"id\":\"L1>%s>>L2\",\"label\":\"next\",\"type\":\"edge\",\"outV\":1,\"outVLabel\":\"char\",\"inV\":2,\"inVLabel\":\"char\",\"properties\":{\"name\":\"ab\"}},"
+ "{\"id\":2,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"B\"}}"
+ "]}]", edgeLabelId);
Assert.assertEquals(expected, task.result());

script = "g.V(1).out().out().path()";
task = runGremlinJob(script);
task = scheduler.waitUntilTaskCompleted(task.id(), 10);
Assert.assertEquals(TaskStatus.SUCCESS, task.status());
expected = "[{\"labels\":[[],[],[]],\"objects\":["
+ "{\"id\":1,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"A\"}},"
+ "{\"id\":2,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"B\"}},"
+ "{\"id\":3,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"C\"}}]},"
+ "{\"labels\":[[],[],[]],\"objects\":["
+ "{\"id\":1,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"A\"}},"
+ "{\"id\":2,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"B\"}},"
+ "{\"id\":4,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"D\"}}]}]";
Assert.assertEquals(expected, task.result());

script = "g.V(1).outE().inV().tree()";
task = runGremlinJob(script);
task = scheduler.waitUntilTaskCompleted(task.id(), 10);
Assert.assertEquals(TaskStatus.SUCCESS, task.status());
expected = String.format("[[{\"key\":{\"id\":1,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"A\"}},"
+ "\"value\":["
+ "{\"key\":{\"id\":\"L1>%s>>L2\",\"label\":\"next\",\"type\":\"edge\",\"outV\":1,\"outVLabel\":\"char\",\"inV\":2,\"inVLabel\":\"char\",\"properties\":{\"name\":\"ab\"}},"
+ "\"value\":[{\"key\":{\"id\":2,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"B\"}},\"value\":[]}]}]}]]",
edgeLabelId);
Assert.assertEquals(expected, task.result());

script = "g.V(1).out().out().tree()";
task = runGremlinJob(script);
task = scheduler.waitUntilTaskCompleted(task.id(), 10);
Assert.assertEquals(TaskStatus.SUCCESS, task.status());
expected = "[[{\"key\":{\"id\":1,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"A\"}},"
+ "\"value\":[{\"key\":{\"id\":2,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"B\"}},"
+ "\"value\":["
+ "{\"key\":{\"id\":3,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"C\"}},\"value\":[]},"
+ "{\"key\":{\"id\":4,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"D\"}},\"value\":[]}]}]}]]";
Assert.assertEquals(expected, task.result());
}

@Test
public void testGremlinJobWithFailure() throws TimeoutException {
HugeGraph graph = graph();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.baidu.hugegraph.schema.PropertyKey;
import com.baidu.hugegraph.schema.SchemaManager;
import com.baidu.hugegraph.task.TaskScheduler;
import com.baidu.hugegraph.testutil.Whitebox;
import com.baidu.hugegraph.type.define.IdStrategy;
import com.baidu.hugegraph.type.define.NodeRole;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -251,7 +252,8 @@ public GraphComputer compute() throws IllegalArgumentException {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public <I extends Io> I io(final Io.Builder<I> builder) {
HugeGraphSONModule.OPTIMIZE_SERIALIZE = false;
Whitebox.setInternalState(HugeGraphSONModule.class,
"OPTIMIZE_SERIALIZE", false);
return (I) builder.graph(this).onMapper(mapper ->
mapper.addRegistry(HugeGraphIoRegistry.instance())
).create();
Expand Down