diff --git a/hugegraph-client/pom.xml b/hugegraph-client/pom.xml
index 08b8d3ebe..ee26f2372 100644
--- a/hugegraph-client/pom.xml
+++ b/hugegraph-client/pom.xml
@@ -56,6 +56,11 @@
${lombok.version}
true
+
+ org.scala-lang
+ scala-library
+ provided
+
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/AbstractGraphElementSerializer.java b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/AbstractGraphElementSerializer.java
new file mode 100644
index 000000000..bc2a4cbb9
--- /dev/null
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/AbstractGraphElementSerializer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.serializer;
+
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.serializer.direct.util.GraphSchema;
+
+public abstract class AbstractGraphElementSerializer implements GraphElementSerializer {
+ protected HugeClient client;
+ protected GraphSchema graphSchema;
+
+ public AbstractGraphElementSerializer(HugeClient client) {
+ this.client = client;
+ this.graphSchema = new GraphSchema(client);
+ }
+
+ public void close() {
+ this.client.close();
+ }
+
+}
+
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/GraphElementSerializer.java b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/GraphElementSerializer.java
new file mode 100644
index 000000000..c55197386
--- /dev/null
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/GraphElementSerializer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.serializer;
+
+import org.apache.hugegraph.serializer.direct.struct.Directions;
+import org.apache.hugegraph.structure.GraphElement;
+import scala.Tuple2;
+
+public interface GraphElementSerializer {
+
+ Tuple2 getKeyBytes(GraphElement e, Directions direction);
+ byte[] getValueBytes(GraphElement e);
+
+}
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/SerializerFactory.java b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/SerializerFactory.java
new file mode 100644
index 000000000..6a3f6f45d
--- /dev/null
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/SerializerFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.serializer;
+
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.serializer.config.SerializerConfig;
+import org.apache.hugegraph.serializer.direct.HBaseSerializer;
+import org.apache.hugegraph.serializer.direct.HStoreSerializer;
+
+public class SerializerFactory {
+
+ public static GraphElementSerializer getSerializer(HugeClient client, SerializerConfig config) {
+ switch (config.getBackendStoreType()) {
+ case "hstore":
+ return new HStoreSerializer(client, config.getVertexPartitions(), config.getGraphName(), config.getPdAddress(), config.getPdRestPort());
+ case "hbase":
+ return new HBaseSerializer(client, config.getVertexPartitions(), config.getEdgePartitions());
+ default:
+ throw new IllegalArgumentException("Unsupported serializer backend type: " + config.getBackendStoreType());
+ }
+ }
+}
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/config/SerializerConfig.java b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/config/SerializerConfig.java
new file mode 100644
index 000000000..21c70ab41
--- /dev/null
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/config/SerializerConfig.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.serializer.config;
+
+
+import lombok.Data;
+
+@Data
+public class SerializerConfig {
+ private int vertexPartitions;
+ private int edgePartitions;
+ private String pdAddress;
+ private String pdRestPort;
+ private String graphName;
+
+
+ private String backendStoreType;
+
+ public SerializerConfig(int vertexPartitions, int edgePartitions, String pdAddress, String pdRestPort, String graphName) {
+ this.vertexPartitions = vertexPartitions;
+ this.edgePartitions = edgePartitions;
+ this.pdAddress = pdAddress;
+ this.pdRestPort = pdRestPort;
+ this.graphName = graphName;
+ }
+
+ public SerializerConfig() {
+ }
+
+}
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HBaseSerializer.java b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HBaseSerializer.java
index 18cb87af0..cf3e618d8 100644
--- a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HBaseSerializer.java
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HBaseSerializer.java
@@ -21,51 +21,52 @@
import java.util.Map;
import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.serializer.AbstractGraphElementSerializer;
+import org.apache.hugegraph.serializer.direct.struct.Directions;
import org.apache.hugegraph.serializer.direct.struct.HugeType;
import org.apache.hugegraph.serializer.direct.util.BytesBuffer;
-import org.apache.hugegraph.serializer.direct.util.GraphSchema;
import org.apache.hugegraph.serializer.direct.util.Id;
import org.apache.hugegraph.serializer.direct.util.IdGenerator;
import org.apache.hugegraph.structure.GraphElement;
import org.apache.hugegraph.structure.graph.Edge;
import org.apache.hugegraph.structure.schema.PropertyKey;
+import scala.Tuple2;
/**
* TODO: review later
*/
-public class HBaseSerializer {
+public class HBaseSerializer extends AbstractGraphElementSerializer {
private int edgeLogicPartitions;
private int vertexLogicPartitions;
- private HugeClient client;
- private GraphSchema graphSchema;
+
public HBaseSerializer(HugeClient client, int vertexPartitions, int edgePartitions) {
- this.client = client;
- this.graphSchema = new GraphSchema(client);
+ super(client);
this.edgeLogicPartitions = edgePartitions;
this.vertexLogicPartitions = vertexPartitions;
}
- public byte[] getKeyBytes(GraphElement e) {
+ @Override
+ public Tuple2 getKeyBytes(GraphElement e, Directions direction) {
byte[] array = null;
if (e.type() == "vertex" && e.id() != null) {
BytesBuffer buffer = BytesBuffer.allocate(2 + 1 + e.id().toString().length());
- buffer.writeShort(getPartition(HugeType.VERTEX, IdGenerator.of(e.id())));
+ buffer.writeShort(getPartition(HugeType.VERTEX, IdGenerator.of(e.id())));
buffer.writeId(IdGenerator.of(e.id()));
array = buffer.bytes();
} else if (e.type() == "edge") {
BytesBuffer buffer = BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID);
- Edge edge = (Edge)e;
+ Edge edge = (Edge) e;
buffer.writeShort(getPartition(HugeType.EDGE, IdGenerator.of(edge.sourceId())));
buffer.writeId(IdGenerator.of(edge.sourceId()));
buffer.write(HugeType.EDGE_OUT.code());
- buffer.writeId(IdGenerator.of(graphSchema.getEdgeLabel(e.label()).id())); //出现错误
+ buffer.writeId(IdGenerator.of(graphSchema.getEdgeLabel(e.label()).id()));
buffer.writeStringWithEnding("");
buffer.writeId(IdGenerator.of(edge.targetId()));
array = buffer.bytes();
}
- return array;
+ return new Tuple2<>(array, 0);
}
public byte[] getValueBytes(GraphElement e) {
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java
new file mode 100644
index 000000000..df4ddf18d
--- /dev/null
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.serializer.direct;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Range;
+import com.google.common.collect.TreeRangeMap;
+import org.apache.hugegraph.client.RestClient;
+import org.apache.hugegraph.driver.HugeClient;
+import org.apache.hugegraph.rest.RestClientConfig;
+import org.apache.hugegraph.rest.RestResult;
+import org.apache.hugegraph.serializer.AbstractGraphElementSerializer;
+import org.apache.hugegraph.serializer.direct.struct.Directions;
+import org.apache.hugegraph.serializer.direct.struct.HugeType;
+
+import org.apache.hugegraph.serializer.direct.util.*;
+import org.apache.hugegraph.structure.GraphElement;
+import org.apache.hugegraph.structure.graph.Edge;
+import org.apache.hugegraph.structure.schema.PropertyKey;
+import org.apache.hugegraph.util.PartitionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class HStoreSerializer extends AbstractGraphElementSerializer {
+
+
+ private static final Logger log = LoggerFactory.getLogger(HStoreSerializer.class);
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+
+ private Map partGraphIdMap;
+ private TreeRangeMap rangeMap;
+
+ public HStoreSerializer(HugeClient client, int numPartitions, String graphName, String pdAddress, String pdRestPort) {
+ super(client);
+ rangeMap = TreeRangeMap.create();
+ int partitionSize = PartitionUtils.MAX_VALUE / numPartitions;
+ if (PartitionUtils.MAX_VALUE % numPartitions != 0) {
+ partitionSize++;
+ }
+
+ for (int i = 0; i < numPartitions; i++) {
+ long startKey = (long) partitionSize * i;
+ long endKey = (long) partitionSize * (i + 1);
+ rangeMap.put(Range.closedOpen(startKey, endKey), i);
+ }
+ log.info("rangeMap:{}", rangeMap);
+ partGraphIdMap=getGraphId(graphName,processAddresses(pdAddress,pdRestPort));
+ log.info("partGraphIdMap:{}", partGraphIdMap);
+
+ }
+ public static String[] processAddresses(String addresses, String newPort) {
+ String[] addressArray = addresses.split(",");
+ String[] processedArray = new String[addressArray.length];
+ for (int i = 0; i < addressArray.length; i++) {
+ String address = addressArray[i];
+ int colonIndex = address.indexOf(":");
+ if (colonIndex != -1) {
+ String newAddress = "http://"+address.substring(0, colonIndex + 1) + newPort;
+ processedArray[i] = newAddress;
+ } else {
+ processedArray[i] = address;
+ }
+ }
+
+ return processedArray;
+ }
+
+
+
+ private Map getGraphId(String graphName, String[] urls) {
+ RestClientConfig config = RestClientConfig.builder()
+ .connectTimeout(5 * 1000)
+ .maxConns(10)
+ .build();
+
+
+ for (String url : urls) {
+ RestClient client = null;
+ try {
+ client = new RestClient(url, config);
+ RestResult restResult = client.get("v1/partitionsAndGraphId", Collections.singletonMap("graphName", graphName));
+ String content = restResult.content();
+ Map resMap = MAPPER.readValue(content, new TypeReference>() {
+ });
+ log.info("Response :{} ", resMap);
+ return resMap;
+ } catch (Exception e) {
+ log.error("Failed to get graphId", e);
+ } finally {
+ if (client != null) {
+ try {
+ client.close();
+ } catch (Exception e) {
+ log.error("Failed to close RestClient", e);
+ }
+ }
+ }
+ }
+ return Collections.emptyMap();
+ }
+
+ public Tuple2 getKeyBytes(GraphElement e, Directions direction) {
+ byte[] array = null;
+ if (e.type() == "vertex" && e.id() != null) {
+
+ BytesBuffer buffer = BytesBuffer.allocate(2 + 1 + e.id().toString().length());
+ Id id = IdGenerator.of(e.id());
+ buffer.writeId(id);
+ array = buffer.bytes();
+ int code = PartitionUtils.calcHashcode(id.asBytes());
+ byte[] buf = new byte[Short.BYTES + array.length + Short.BYTES];
+ Integer partId = rangeMap.get((long) code);
+ Long partGraphId = partGraphIdMap.get((long) partId);
+ Bits.putShort(buf, 0, Math.toIntExact(partGraphId));
+ Bits.put(buf, Short.BYTES, array);
+ Bits.putShort(buf, array.length + Short.BYTES, code);
+ return new Tuple2<>(buf, partId);
+ } else if (e.type() == "edge" && direction == null) {
+ BytesBuffer buffer = BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID);
+ Edge edge = (Edge) e;
+ buffer.writeId(IdGenerator.of(edge.sourceId()));
+ buffer.write(HugeType.EDGE_OUT.code());
+ buffer.writeId(IdGenerator.of(graphSchema.getEdgeLabel(e.label()).id()));
+ buffer.writeStringWithEnding("");
+ buffer.writeId(IdGenerator.of(edge.targetId()));
+ array = buffer.bytes();
+ int code = PartitionUtils.calcHashcode(IdGenerator.of(edge.sourceId()).asBytes());
+ Integer partId = rangeMap.get((long) code);
+ Long partGraphId = partGraphIdMap.get((long) partId);
+ byte[] buf = new byte[Short.BYTES + array.length + Short.BYTES];
+ Bits.putShort(buf, 0, Math.toIntExact(partGraphId));
+ Bits.put(buf, Short.BYTES, array);
+ Bits.putShort(buf, array.length + Short.BYTES, code);
+ return new Tuple2<>(buf, partId);
+ }else if(e.type() == "edge" && direction == Directions.IN){
+ BytesBuffer buffer = BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID);
+ Edge edge = (Edge) e;
+ buffer.writeId(IdGenerator.of(edge.sourceId()));
+ buffer.write(HugeType.EDGE_IN.code());
+ buffer.writeId(IdGenerator.of(graphSchema.getEdgeLabel(e.label()).id()));
+ buffer.writeStringWithEnding("");
+ buffer.writeId(IdGenerator.of(edge.targetId()));
+ array = buffer.bytes();
+ int code = PartitionUtils.calcHashcode(IdGenerator.of(edge.sourceId()).asBytes());
+ Integer partId = rangeMap.get((long) code);
+ Long partGraphId = partGraphIdMap.get((long) partId);
+ byte[] buf = new byte[Short.BYTES + array.length + Short.BYTES];
+ Bits.putShort(buf, 0, Math.toIntExact(partGraphId));
+ Bits.put(buf, Short.BYTES, array);
+ Bits.putShort(buf, array.length + Short.BYTES, code);
+ return new Tuple2<>(buf, partId);
+ }
+ return new Tuple2<>(array, 0);
+ }
+
+
+ public byte[] getValueBytes(GraphElement e) {
+ byte[] array = null;
+ if (e.type() == "vertex") {
+ int propsCount = e.properties().size(); //vertex.sizeOfProperties();
+ BytesBuffer buffer = BytesBuffer.allocate(8 + 16 * propsCount);
+ buffer.writeId(IdGenerator.of(graphSchema.getVertexLabel(e.label()).id()));
+ buffer.writeVInt(propsCount);
+ for (Map.Entry entry : e.properties().entrySet()) {
+ PropertyKey propertyKey = graphSchema.getPropertyKey(entry.getKey());
+ buffer.writeVInt(propertyKey.id().intValue());
+ buffer.writeProperty(propertyKey.dataType(), entry.getValue());
+ }
+ array = buffer.bytes();
+ } else if (e.type() == "edge") {
+ int propsCount = e.properties().size();
+ BytesBuffer buffer = BytesBuffer.allocate(4 + 16 * propsCount);
+ buffer.writeVInt(propsCount);
+ for (Map.Entry entry : e.properties().entrySet()) {
+ PropertyKey propertyKey = graphSchema.getPropertyKey(entry.getKey());
+ buffer.writeVInt(propertyKey.id().intValue());
+ buffer.writeProperty(propertyKey.dataType(), entry.getValue());
+ }
+ array = buffer.bytes();
+ }
+
+ return array;
+ }
+}
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/reuse/BytesDemo.java b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/reuse/BytesDemo.java
index ea7bbbd9a..8022b80e6 100644
--- a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/reuse/BytesDemo.java
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/reuse/BytesDemo.java
@@ -26,6 +26,7 @@
import org.apache.hugegraph.driver.SchemaManager;
import org.apache.hugegraph.serializer.direct.HBaseSerializer;
import org.apache.hugegraph.serializer.direct.RocksDBSerializer;
+import org.apache.hugegraph.serializer.direct.struct.Directions;
import org.apache.hugegraph.structure.graph.Edge;
import org.apache.hugegraph.structure.graph.Vertex;
@@ -135,7 +136,7 @@ private void writeGraphElements() {
// Old way: encode to json then send to server
if (bypassServer) {
- writeDirectly(vertices, edges);
+ writeDirectly(vertices, edges,null);
} else {
writeByServer(graph, vertices, edges);
}
@@ -144,15 +145,15 @@ private void writeGraphElements() {
/* we transfer the vertex & edge into bytes array
* TODO: use a batch and send them together
* */
- void writeDirectly(List vertices, List edges) {
+ void writeDirectly(List vertices, List edges, Directions direction) {
for (Vertex vertex : vertices) {
- byte[] rowkey = hBaseSer.getKeyBytes(vertex);
+ byte[] rowkey = hBaseSer.getKeyBytes(vertex, direction)._1;
byte[] values = hBaseSer.getValueBytes(vertex);
sendRpcToHBase("vertex", rowkey, values);
}
for (Edge edge : edges) {
- byte[] rowkey = hBaseSer.getKeyBytes(edge);
+ byte[] rowkey = hBaseSer.getKeyBytes(edge, direction)._1;
byte[] values = hBaseSer.getValueBytes(edge);
sendRpcToHBase("edge", rowkey, values);
}
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/struct/Directions.java b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/struct/Directions.java
new file mode 100644
index 000000000..f64e13cfd
--- /dev/null
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/struct/Directions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.serializer.direct.struct;
+
+public enum Directions {
+
+ // TODO: add NONE enum for non-directional edges
+
+ BOTH(0, "both"),
+
+ OUT(1, "out"),
+
+ IN(2, "in");
+ private byte code = 0;
+ private String name = null;
+
+ Directions(int code, String name) {
+ assert code < 256;
+ this.code = (byte) code;
+ this.name = name;
+ }
+}
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/util/Bits.java b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/util/Bits.java
new file mode 100644
index 000000000..30d7d24c1
--- /dev/null
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/util/Bits.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.serializer.direct.util;
+
+import java.nio.ByteBuffer;
+
+public class Bits {
+ /**
+ * 大头字节序写入short
+ */
+ public static void putShort(byte[] buf, int offSet, int x) {
+ buf[offSet] = (byte) (x >> 8);
+ buf[offSet + 1] = (byte) (x);
+ }
+
+ public static void putInt(byte[] buf, int offSet, int x) {
+ buf[offSet] = (byte) (x >> 24);
+ buf[offSet + 1] = (byte) (x >> 16);
+ buf[offSet + 2] = (byte) (x >> 8);
+ buf[offSet + 3] = (byte) (x);
+ }
+
+ /**
+ * 大头字节序读取short
+ */
+ public static int getShort(byte[] buf, int offSet) {
+ int x = buf[offSet] & 0xff;
+ x = (x << 8) + (buf[offSet + 1] & 0xff);
+ return x;
+ }
+
+ public static int getInt(byte[] buf, int offSet) {
+ int x = (buf[offSet] << 24)
+ + ((buf[offSet + 1] & 0xff) << 16)
+ + ((buf[offSet + 2] & 0xff) << 8)
+ + (buf[offSet + 3] & 0xff);
+ return x;
+ }
+
+ public static void put(byte[] buf, int offSet, byte[] srcBuf) {
+ System.arraycopy(srcBuf, 0, buf, offSet, srcBuf.length);
+ }
+
+ public static int toInt(byte[] bytes) {
+ ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
+ buffer.put(bytes);
+ buffer.flip();//need flip
+ return buffer.getInt();
+ }
+}
diff --git a/hugegraph-client/src/main/java/org/apache/hugegraph/util/PartitionUtils.java b/hugegraph-client/src/main/java/org/apache/hugegraph/util/PartitionUtils.java
new file mode 100644
index 000000000..5bb35be59
--- /dev/null
+++ b/hugegraph-client/src/main/java/org/apache/hugegraph/util/PartitionUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.util;
+
+public class PartitionUtils {
+
+ public static final int MAX_VALUE = 0xffff;
+
+ /**
+
+ * @param key
+ * @return hashcode
+ */
+ public static int calcHashcode(byte[] key) {
+ final int p = 16777619;
+ int hash = (int) 2166136261L;
+ for (byte element : key) {
+ hash = (hash ^ element) * p;
+ }
+ hash += hash << 13;
+ hash ^= hash >> 7;
+ hash += hash << 3;
+ hash ^= hash >> 17;
+ hash += hash << 5;
+ hash = hash & PartitionUtils.MAX_VALUE;
+ if (hash == PartitionUtils.MAX_VALUE) {
+ hash = PartitionUtils.MAX_VALUE - 1;
+ }
+ return hash;
+ }
+}
diff --git a/hugegraph-dist/scripts/dependency/known-dependencies.txt b/hugegraph-dist/scripts/dependency/known-dependencies.txt
index e610d0729..4e6af837e 100644
--- a/hugegraph-dist/scripts/dependency/known-dependencies.txt
+++ b/hugegraph-dist/scripts/dependency/known-dependencies.txt
@@ -326,3 +326,4 @@ woodstox-core-5.3.0.jar
xz-1.8.jar
zookeeper-3.6.2.jar
zookeeper-jute-3.6.2.jar
+rocksdbjni-7.2.2.jar
diff --git a/hugegraph-loader/assembly/static/bin/get-params.sh b/hugegraph-loader/assembly/static/bin/get-params.sh
index dde3f0c7c..3b160459d 100644
--- a/hugegraph-loader/assembly/static/bin/get-params.sh
+++ b/hugegraph-loader/assembly/static/bin/get-params.sh
@@ -27,7 +27,9 @@ function get_params() {
--incremental-mode | --failure-mode | --batch-insert-threads | --single-insert-threads | \
--max-conn | --max-conn-per-route | --batch-size | --max-parse-errors | --max-insert-errors | \
--timeout | --shutdown-timeout | --retry-times | --retry-interval | --check-vertex | \
- --print-progress | --dry-run | --sink-type | --vertex-partitions | --edge-partitions | --help )
+ --print-progress | --dry-run | --sink-type | --vertex-partitions | --edge-partitions | --backend | \
+ --pd-address | --pd-rest-port | --hdfs-uri |--max-download-rate \
+ | --help )
HUGEGRAPH_PARAMS="$HUGEGRAPH_PARAMS $1 $2"
shift 2
;;
diff --git a/hugegraph-loader/pom.xml b/hugegraph-loader/pom.xml
index 339312e30..518438bf3 100644
--- a/hugegraph-loader/pom.xml
+++ b/hugegraph-loader/pom.xml
@@ -15,8 +15,8 @@
License for the specific language governing permissions and limitations
under the License.
-->
-
4.0.0
@@ -542,6 +542,11 @@
${kafka.testcontainer.version}
test
+
+ org.rocksdb
+ rocksdbjni
+ 7.2.2
+
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/builder/EdgeBuilder.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/builder/EdgeBuilder.java
index 72382366a..4224f9eaf 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/builder/EdgeBuilder.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/builder/EdgeBuilder.java
@@ -28,6 +28,7 @@
import org.apache.hugegraph.loader.executor.LoadContext;
import org.apache.hugegraph.loader.mapping.EdgeMapping;
import org.apache.hugegraph.loader.mapping.InputStruct;
+import org.apache.hugegraph.structure.GraphElement;
import org.apache.hugegraph.structure.graph.Edge;
import org.apache.hugegraph.structure.graph.Vertex;
import org.apache.hugegraph.structure.schema.EdgeLabel;
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/AbstractDirectLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/AbstractDirectLoader.java
new file mode 100644
index 000000000..61dc3aefc
--- /dev/null
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/AbstractDirectLoader.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.loader.direct.loader;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hugegraph.loader.builder.EdgeBuilder;
+import org.apache.hugegraph.loader.builder.ElementBuilder;
+import org.apache.hugegraph.loader.builder.VertexBuilder;
+import org.apache.hugegraph.loader.executor.LoadContext;
+import org.apache.hugegraph.loader.executor.LoadOptions;
+import org.apache.hugegraph.loader.mapping.EdgeMapping;
+import org.apache.hugegraph.loader.mapping.ElementMapping;
+import org.apache.hugegraph.loader.mapping.InputStruct;
+import org.apache.hugegraph.loader.mapping.VertexMapping;
+import org.apache.hugegraph.loader.metrics.DistributedLoadMetrics;
+import org.apache.hugegraph.serializer.GraphElementSerializer;
+import org.apache.hugegraph.serializer.direct.struct.Directions;
+import org.apache.hugegraph.structure.GraphElement;
+import org.apache.hugegraph.structure.graph.Edge;
+import org.apache.hugegraph.structure.graph.Vertex;
+import org.apache.hugegraph.util.Log;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import scala.Tuple2;
+
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+
+public abstract class AbstractDirectLoader implements DirectLoader, Serializable {
+
+ public static final Logger LOG = Log.logger(AbstractDirectLoader.class);
+ protected LoadOptions loadOptions;
+ protected InputStruct struct;
+ protected DistributedLoadMetrics loadDistributeMetrics;
+
+ public AbstractDirectLoader(LoadOptions loadOptions, InputStruct struct, DistributedLoadMetrics loadDistributeMetrics) {
+ this.loadOptions = loadOptions;
+ this.struct = struct;
+ this.loadDistributeMetrics = loadDistributeMetrics;
+ }
+
+ public AbstractDirectLoader(LoadOptions loadOptions, InputStruct struct) {
+ this.loadOptions = loadOptions;
+ this.struct = struct;
+ }
+
+ public void flushPermission(Configuration conf, String path) {
+ FsShell shell = new FsShell(conf);
+ try {
+ LOG.info("shell start execute");
+ shell.run(new String[]{"-chmod", "-R", "777", path});
+ shell.close();
+ } catch (Exception e) {
+ LOG.error("Couldnt change the file permissions " + e
+ + " Please run command:"
+ + "hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles "
+ + path + " '"
+ + "test" + "'\n"
+ + " to load generated HFiles into HBase table");
+ }
+ }
+
+ @Override
+ public void bulkload(Dataset ds) {
+ JavaPairRDD javaPairRDD = buildVertexAndEdge(ds, null);
+ String path = generateFiles(javaPairRDD);
+ loadFiles(path,null);
+ }
+
+ protected List getElementBuilders(LoadContext context) {
+ context.schemaCache().updateAll();
+ List buildersForGraphElement = new LinkedList<>();
+ for (VertexMapping vertexMapping : struct.vertices()) {
+ buildersForGraphElement.add(new VertexBuilder(context, struct, vertexMapping));
+ }
+ for (EdgeMapping edgeMapping : struct.edges()) {
+ buildersForGraphElement.add(new EdgeBuilder(context, struct, edgeMapping));
+ }
+ return buildersForGraphElement;
+ }
+
+ protected List> buildAndSer(GraphElementSerializer serializer,
+ Row row,
+ List builders, Directions directions) {
+ List elementsElement;
+ List> result = new LinkedList<>();
+
+ for (ElementBuilder builder : builders) {
+ ElementMapping elementMapping = builder.mapping();
+ if (elementMapping.skip()) {
+ continue;
+ }
+ if ("".equals(row.mkString())) {
+ break;
+ }
+ switch (struct.input().type()) {
+ case FILE:
+ case HDFS:
+ elementsElement = builder.build(row);
+ break;
+ default:
+ throw new AssertionError(String.format("Unsupported input source '%s'", struct.input().type()));
+ }
+
+ boolean isVertex = builder.mapping().type().isVertex();
+ if (isVertex) {
+ for (Vertex vertex : (List) (Object) elementsElement) {
+ Tuple2 tuple2 = vertexSerialize(serializer, vertex);
+ loadDistributeMetrics.vertexMetrics().get(builder.mapping().label()).plusDisParseSuccess(1L);
+ loadDistributeMetrics.vertexMetrics().get(builder.mapping().label()).plusDisInsertSuccess(1L);
+ result.add(tuple2);
+ }
+ } else {
+ for (Edge edge : (List) (Object) elementsElement) {
+ Tuple2 tuple2 = edgeSerialize(serializer, edge,directions);
+ loadDistributeMetrics.edgeMetrics().get(builder.mapping().label()).plusDisParseSuccess(1L);
+ loadDistributeMetrics.edgeMetrics().get(builder.mapping().label()).plusDisInsertSuccess(1L);
+ result.add(tuple2);
+ }
+ }
+ }
+ return result;
+ }
+
+ protected abstract Tuple2 vertexSerialize(GraphElementSerializer serializer, Vertex vertex);
+
+ protected abstract Tuple2 edgeSerialize(GraphElementSerializer serializer, Edge edge, Directions direction);
+}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/DirectLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/DirectLoader.java
index f99e4a497..432602916 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/DirectLoader.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/DirectLoader.java
@@ -17,56 +17,17 @@
package org.apache.hugegraph.loader.direct.loader;
-import java.io.Serializable;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hugegraph.loader.builder.EdgeBuilder;
-import org.apache.hugegraph.loader.builder.ElementBuilder;
-import org.apache.hugegraph.loader.builder.VertexBuilder;
-import org.apache.hugegraph.loader.executor.LoadContext;
-import org.apache.hugegraph.loader.executor.LoadOptions;
-import org.apache.hugegraph.loader.mapping.EdgeMapping;
-import org.apache.hugegraph.loader.mapping.InputStruct;
-import org.apache.hugegraph.loader.mapping.VertexMapping;
+import org.apache.hugegraph.serializer.direct.struct.Directions;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-public abstract class DirectLoader implements Serializable {
-
- LoadOptions loadOptions;
- InputStruct struct;
-
- public DirectLoader(LoadOptions loadOptions,
- InputStruct struct) {
- this.loadOptions = loadOptions;
- this.struct = struct;
- }
-
- public final void bulkload(Dataset ds) {
- JavaPairRDD javaPairRDD = buildVertexAndEdge(ds);
- String path = generateFiles(javaPairRDD);
- loadFiles(path);
- }
-
- protected List getElementBuilders() {
- LoadContext context = new LoadContext(loadOptions);
- context.schemaCache().updateAll();
- List buildersForGraphElement = new LinkedList<>();
- for (VertexMapping vertexMapping : struct.vertices()) {
- buildersForGraphElement.add(new VertexBuilder(context, struct, vertexMapping));
- }
- for (EdgeMapping edgeMapping : struct.edges()) {
- buildersForGraphElement.add(new EdgeBuilder(context, struct, edgeMapping));
- }
- context.close();
- return buildersForGraphElement;
- }
+public interface DirectLoader {
+ JavaPairRDD buildVertexAndEdge(Dataset ds, Directions directions);
- abstract JavaPairRDD buildVertexAndEdge(Dataset ds);
+ String generateFiles(JavaPairRDD buildAndSerRdd);
- abstract String generateFiles(JavaPairRDD buildAndSerRdd);
+ void loadFiles(String path,Directions directions);
- abstract void loadFiles(String path);
+ void bulkload(Dataset ds);
}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java
index dfc9fd998..510beb56a 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java
@@ -17,17 +17,9 @@
package org.apache.hugegraph.loader.direct.loader;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.SecureRandom;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -38,13 +30,14 @@
import org.apache.hugegraph.loader.builder.ElementBuilder;
import org.apache.hugegraph.loader.constant.Constants;
import org.apache.hugegraph.loader.direct.util.SinkToHBase;
+import org.apache.hugegraph.loader.executor.LoadContext;
import org.apache.hugegraph.loader.executor.LoadOptions;
-import org.apache.hugegraph.loader.mapping.ElementMapping;
import org.apache.hugegraph.loader.mapping.InputStruct;
-import org.apache.hugegraph.loader.metrics.LoadDistributeMetrics;
+import org.apache.hugegraph.loader.metrics.DistributedLoadMetrics;
import org.apache.hugegraph.loader.util.HugeClientHolder;
+import org.apache.hugegraph.serializer.GraphElementSerializer;
import org.apache.hugegraph.serializer.direct.HBaseSerializer;
-import org.apache.hugegraph.structure.GraphElement;
+import org.apache.hugegraph.serializer.direct.struct.Directions;
import org.apache.hugegraph.structure.graph.Edge;
import org.apache.hugegraph.structure.graph.Vertex;
import org.apache.hugegraph.util.Log;
@@ -54,14 +47,18 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.slf4j.Logger;
-
import scala.Tuple2;
-public class HBaseDirectLoader extends DirectLoader {
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+public class HBaseDirectLoader extends AbstractDirectLoader {
private SinkToHBase sinkToHBase;
- private LoadDistributeMetrics loadDistributeMetrics;
-
private static final int RANDOM_VALUE1;
private static final short RANDOM_VALUE2;
private static final AtomicInteger NEXT_COUNTER;
@@ -125,78 +122,43 @@ public static String fileID() {
return Bytes.toHex(byteBuffer.array());
}
- public HBaseDirectLoader(LoadOptions loadOptions,
- InputStruct struct,
- LoadDistributeMetrics loadDistributeMetrics) {
- super(loadOptions, struct);
- this.loadDistributeMetrics = loadDistributeMetrics;
- this.sinkToHBase = new SinkToHBase(loadOptions);
-
- }
-
- public String getTableName() {
-
- String tableName = null;
- if (struct.edges().size() > 0) {
- tableName = this.loadOptions.edgeTableName;
-
- } else if (struct.vertices().size() > 0) {
- tableName = this.loadOptions.vertexTableName;
-
- }
- return tableName;
- }
-
- public Integer getTablePartitions() {
- return struct.edges().size() > 0 ?
- loadOptions.edgePartitions : loadOptions.vertexPartitions;
- }
-
@Override
- public JavaPairRDD buildVertexAndEdge(Dataset ds) {
+ public JavaPairRDD buildVertexAndEdge(Dataset ds, Directions directions) {
LOG.info("Start build vertexes and edges");
- JavaPairRDD tuple2KeyValueJavaPairRDD;
- tuple2KeyValueJavaPairRDD = ds.toJavaRDD().mapPartitionsToPair(
+ return ds.toJavaRDD().mapPartitionsToPair(
(PairFlatMapFunction, ImmutableBytesWritable, KeyValue>) rowIter -> {
- HBaseSerializer ser;
- ser = new HBaseSerializer(HugeClientHolder.create(loadOptions),
- loadOptions.vertexPartitions,
- loadOptions.edgePartitions);
- List buildersForGraphElement = getElementBuilders();
+ HBaseSerializer ser = new HBaseSerializer(HugeClientHolder.create(loadOptions), loadOptions.vertexPartitions, loadOptions.edgePartitions);
+ LoadContext loaderContext = new LoadContext(super.loadOptions);
+ loaderContext.init(struct);// 准备好elementBuilder以及schema数据的更新
+ List buildersForGraphElement = getElementBuilders(loaderContext);
List> result = new LinkedList<>();
while (rowIter.hasNext()) {
Row row = rowIter.next();
List> serList;
- serList = buildAndSer(ser, row, buildersForGraphElement);
+ serList = buildAndSer(ser, row, buildersForGraphElement,directions);
result.addAll(serList);
}
ser.close();
return result.iterator();
}
);
- return tuple2KeyValueJavaPairRDD;
}
@Override
- String generateFiles(JavaPairRDD buildAndSerRdd) {
+ public String generateFiles(JavaPairRDD buildAndSerRdd) {
LOG.info("Start to generate hfile");
try {
- Tuple2 tuple =
- sinkToHBase.getPartitionerByTableName(getTablePartitions(), getTableName());
+ Tuple2 tuple = sinkToHBase.getPartitionerByTableName(getTablePartitions(), getTableName());
Partitioner partitioner = tuple._1;
TableDescriptor tableDescriptor = tuple._2;
- JavaPairRDD repartitionedRdd =
- buildAndSerRdd.repartitionAndSortWithinPartitions(partitioner);
+ JavaPairRDD repartitionedRdd = buildAndSerRdd.repartitionAndSortWithinPartitions(partitioner);
Configuration conf = sinkToHBase.getHBaseConfiguration().get();
Job job = Job.getInstance(conf);
HFileOutputFormat2.configureIncrementalLoadMap(job, tableDescriptor);
- conf.set("hbase.mapreduce.hfileoutputformat.table.name",
- tableDescriptor.getTableName().getNameAsString());
+ conf.set("hbase.mapreduce.hfileoutputformat.table.name", tableDescriptor.getTableName().getNameAsString());
String path = getHFilePath(job.getConfiguration());
- repartitionedRdd.saveAsNewAPIHadoopFile(path, ImmutableBytesWritable.class,
- KeyValue.class, HFileOutputFormat2.class,
- conf);
+ repartitionedRdd.saveAsNewAPIHadoopFile(path, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, conf);
LOG.info("Saved HFiles to: '{}'", path);
flushPermission(conf, path);
return path;
@@ -206,7 +168,24 @@ String generateFiles(JavaPairRDD buildAndSerRd
return Constants.EMPTY_STR;
}
- public String getHFilePath(Configuration conf) throws IOException {
+ @Override
+ public void loadFiles(String path,Directions directions) {
+ try {
+ sinkToHBase.loadHfiles(path, getTableName());
+ } catch (Exception e) {
+ LOG.error("Failed to load hfiles", e);
+ }
+ }
+
+ private String getTableName() {
+ return struct.edges().size() > 0 ? loadOptions.edgeTableName : loadOptions.vertexTableName;
+ }
+
+ private Integer getTablePartitions() {
+ return struct.edges().size() > 0 ? loadOptions.edgePartitions : loadOptions.vertexPartitions;
+ }
+
+ private String getHFilePath(Configuration conf) throws IOException {
FileSystem fs = FileSystem.get(conf);
String fileID = fileID();
String pathStr = fs.getWorkingDirectory().toString() + "/hfile-gen" + "/" + fileID + "/";
@@ -218,96 +197,35 @@ public String getHFilePath(Configuration conf) throws IOException {
return pathStr;
}
- @Override
- public void loadFiles(String path) {
- try {
- // BulkLoad HFile to HBase
- sinkToHBase.loadHfiles(path, getTableName());
- } catch (Exception e) {
- LOG.error(" Failed to load hfiles", e);
- }
- }
-
- private void flushPermission(Configuration conf, String path) {
- FsShell shell = new FsShell(conf);
- try {
- LOG.info("Chmod hfile directory permission");
- shell.run(new String[]{"-chmod", "-R", "777", path});
- shell.close();
- } catch (Exception e) {
- LOG.error("Couldn't change the file permissions " + e + " Please run command:" +
- "hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles " + path +
- " '" + "test" + "'\n" + " to load generated HFiles into HBase table");
- }
+ public HBaseDirectLoader(LoadOptions loadOptions, InputStruct struct, DistributedLoadMetrics loadDistributeMetrics) {
+ super(loadOptions, struct, loadDistributeMetrics);
+ this.sinkToHBase = new SinkToHBase(loadOptions);
}
- List> buildAndSer(HBaseSerializer serializer, Row row,
- List builders) {
- List elementsElement;
- List> result = new LinkedList<>();
-
- for (ElementBuilder builder : builders) {
- ElementMapping elementMapping = builder.mapping();
- if (elementMapping.skip()) {
- continue;
- }
- if ("".equals(row.mkString())) {
- break;
- }
- switch (struct.input().type()) {
- case FILE:
- case HDFS:
- elementsElement = builder.build(row);
- break;
- default:
- throw new AssertionError(String.format("Unsupported input source '%s'",
- struct.input().type()));
- }
-
- boolean isVertex = builder.mapping().type().isVertex();
- if (isVertex) {
- for (Vertex vertex : (List) (Object) elementsElement) {
- LOG.debug("vertex already build done {} ", vertex.toString());
- Tuple2 tuple2 =
- vertexSerialize(serializer, vertex);
- loadDistributeMetrics.increaseDisVertexInsertSuccess(builder.mapping());
- result.add(tuple2);
- }
- } else {
- for (Edge edge : (List) (Object) elementsElement) {
- LOG.debug("edge already build done {}", edge.toString());
- Tuple2 tuple2 =
- edgeSerialize(serializer, edge);
- loadDistributeMetrics.increaseDisEdgeInsertSuccess(builder.mapping());
- result.add(tuple2);
-
- }
- }
- }
- return result;
+ public HBaseDirectLoader(LoadOptions loadOptions, InputStruct struct) {
+ super(loadOptions, struct);
+ this.sinkToHBase = new SinkToHBase(loadOptions);
}
- private Tuple2 edgeSerialize(HBaseSerializer serializer,
- Edge edge) {
- LOG.debug("edge start serialize {}", edge.toString());
- byte[] rowkey = serializer.getKeyBytes(edge);
- byte[] values = serializer.getValueBytes(edge);
+ @Override
+ protected Tuple2 vertexSerialize(GraphElementSerializer serializer, Vertex vertex) {
+ LOG.debug("vertex start serialize {}", vertex.toString());
+ byte[] rowkey = serializer.getKeyBytes(vertex, null)._1;
+ byte[] values = serializer.getValueBytes(vertex);
ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
rowKey.set(rowkey);
- KeyValue keyValue = new KeyValue(rowkey, Bytes.toBytes(Constants.HBASE_COL_FAMILY),
- Bytes.toBytes(Constants.EMPTY_STR), values);
+ KeyValue keyValue = new KeyValue(rowkey, Bytes.toBytes(Constants.HBASE_COL_FAMILY), Bytes.toBytes(Constants.EMPTY_STR), values);
return new Tuple2<>(rowKey, keyValue);
}
- private Tuple2 vertexSerialize(HBaseSerializer serializer,
- Vertex vertex) {
- LOG.debug("vertex start serialize {}", vertex.toString());
- byte[] rowkey = serializer.getKeyBytes(vertex);
- byte[] values = serializer.getValueBytes(vertex);
+ @Override
+ protected Tuple2 edgeSerialize(GraphElementSerializer serializer, Edge edge,Directions direction) {
+ LOG.debug("edge start serialize {}", edge.toString());
+ byte[] rowkey = serializer.getKeyBytes(edge, direction)._1;
+ byte[] values = serializer.getValueBytes(edge);
ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
rowKey.set(rowkey);
- KeyValue keyValue = new KeyValue(rowkey, Bytes.toBytes(Constants.HBASE_COL_FAMILY),
- Bytes.toBytes(Constants.EMPTY_STR), values);
+ KeyValue keyValue = new KeyValue(rowkey, Bytes.toBytes(Constants.HBASE_COL_FAMILY), Bytes.toBytes(Constants.EMPTY_STR), values);
return new Tuple2<>(rowKey, keyValue);
}
}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HStoreDirectLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HStoreDirectLoader.java
new file mode 100644
index 000000000..87c0612b4
--- /dev/null
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HStoreDirectLoader.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.loader.direct.loader;
+
+
+import lombok.Data;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hugegraph.client.RestClient;
+import org.apache.hugegraph.loader.builder.ElementBuilder;
+import org.apache.hugegraph.loader.direct.outputformat.SSTFileOutputFormat;
+import org.apache.hugegraph.loader.direct.partitioner.HstorePartitioner;
+import org.apache.hugegraph.loader.executor.LoadContext;
+import org.apache.hugegraph.loader.executor.LoadOptions;
+import org.apache.hugegraph.loader.mapping.InputStruct;
+import org.apache.hugegraph.loader.metrics.DistributedLoadMetrics;
+import org.apache.hugegraph.rest.RestClientConfig;
+import org.apache.hugegraph.rest.RestResult;
+import org.apache.hugegraph.serializer.GraphElementSerializer;
+import org.apache.hugegraph.serializer.direct.struct.Directions;
+import org.apache.hugegraph.structure.graph.Edge;
+import org.apache.hugegraph.structure.graph.Vertex;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.hugegraph.serializer.direct.HStoreSerializer.processAddresses;
+
+public class HStoreDirectLoader extends AbstractDirectLoader, byte[]> {
+
+ public HStoreDirectLoader(LoadOptions loadOptions, InputStruct struct, DistributedLoadMetrics loadDistributeMetrics) {
+ super(loadOptions, struct, loadDistributeMetrics);
+ }
+
+ public HStoreDirectLoader(LoadOptions loadOptions, InputStruct struct) {
+ super(loadOptions, struct);
+ }
+
+ @Override
+ public JavaPairRDD, byte[]> buildVertexAndEdge(Dataset ds, Directions directions) {
+ LOG.info("Start build vertexes and edges");
+ return ds.toJavaRDD().mapPartitionsToPair(
+ (PairFlatMapFunction, Tuple2, byte[]>) rowIter -> {
+ LoadContext loaderContext = new LoadContext(super.loadOptions);
+ loaderContext.init(struct);
+ List buildersForGraphElement = getElementBuilders(loaderContext);
+ List, byte[]>> result = new LinkedList<>();
+ while (rowIter.hasNext()) {
+ Row row = rowIter.next();
+ List, byte[]>> serList;
+ serList = buildAndSer(loaderContext.getSerializer(), row, buildersForGraphElement,directions);
+ result.addAll(serList);
+ }
+ return result.iterator();
+ }
+ );
+ }
+
+ @Override
+ public String generateFiles(JavaPairRDD, byte[]> buildAndSerRdd) {
+ LOG.info("bulkload start execute>>>");
+ try {
+ JavaPairRDD, byte[]> tuple2JavaPairRDD = buildAndSerRdd.partitionBy(new HstorePartitioner(loadOptions.vertexPartitions));
+ // abort partId
+ JavaPairRDD javaPairRDD = tuple2JavaPairRDD.mapToPair(tuple2 -> new Tuple2<>(tuple2._1._1, tuple2._2));
+ JavaPairRDD sortedRdd = javaPairRDD.mapPartitionsToPair(iterator -> {
+ List> partitionData = new ArrayList<>();
+ iterator.forEachRemaining(partitionData::add);
+ Collections.sort(partitionData, new HStoreDirectLoader.TupleComparator());
+ return partitionData.iterator();
+ });
+ Configuration hadoopConf = new Configuration();
+ String sstFilePath = getSSTFilePath(hadoopConf);
+ LOG.info("SSTFile生成的hdfs路径:{}", sstFilePath);
+ sortedRdd.saveAsNewAPIHadoopFile(
+ sstFilePath,
+ byte[].class,
+ byte[].class,
+ SSTFileOutputFormat.class,
+ hadoopConf
+ );
+ flushPermission(hadoopConf, sstFilePath);
+ return sstFilePath;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+
+ return null;
+ }
+
+ @Override
+ public void loadFiles(String sstFilePath,Directions directions) {
+ RestClientConfig config = RestClientConfig.builder()
+ .connectTimeout(5 * 1000) // 连接超时时间 5s
+ .readTimeout(60 * 60 * 1000) // 读取超时时间 1h
+ .maxConns(10) // 最大连接数
+ .build();
+ BulkloadInfo bulkloadInfo = new BulkloadInfo(loadOptions.graph,
+ replaceClusterName(sstFilePath, loadOptions.hdfsUri),
+ getBulkloadType(),
+ directions,
+ loadOptions.maxDownloadRate
+ );
+ String[] urls = processAddresses(loadOptions.pdAddress, loadOptions.pdRestPort);
+
+ for (String url : urls) {
+ LOG.info("submit bulkload task to {}, bulkloadInfo:{}", url, bulkloadInfo);
+ RestClient client = null;
+ try {
+ client = new RestClient(url, config);
+ RestResult restResult = client.post("v1/task/bulkload", bulkloadInfo);
+ Map resMap = restResult.readObject(Map.class);
+ LOG.info("Response :{} ", resMap);
+ break;
+ } catch (Exception e) {
+ LOG.error("Failed to submit bulkload task", e);
+ break;
+ } finally {
+ if (client != null) {
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close RestClient", e);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ protected Tuple2, byte[]> vertexSerialize(GraphElementSerializer serializer, Vertex vertex) {
+ LOG.debug("vertex start serialize {}", vertex.toString());
+ Tuple2 keyBytes = serializer.getKeyBytes(vertex, null);
+ byte[] values = serializer.getValueBytes(vertex);
+ return new Tuple2<>(keyBytes, values);
+ }
+
+ @Override
+ protected Tuple2, byte[]> edgeSerialize(GraphElementSerializer serializer, Edge edge,Directions direction) {
+ Tuple2 keyBytes = serializer.getKeyBytes(edge,direction);
+ byte[] values = serializer.getValueBytes(edge);
+ return new Tuple2<>(keyBytes, values);
+ }
+
+ private BulkloadInfo.LoadType getBulkloadType() {
+ return struct.edges().size() > 0 ? BulkloadInfo.LoadType.EDGE : BulkloadInfo.LoadType.VERTEX;
+ }
+
+ private String getSSTFilePath(Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ long timeStr = System.currentTimeMillis();
+ String pathStr = fs.getWorkingDirectory().toString() + "/hg-1_5/gen-sstfile" + "/" + timeStr + "/";//sstFile 存储路径
+ org.apache.hadoop.fs.Path hfileGenPath = new Path(pathStr);
+ if (fs.exists(hfileGenPath)) {
+ LOG.info("\n delete sstFile path \n");
+ fs.delete(hfileGenPath, true);
+ }
+ return pathStr;
+ }
+
+ static class TupleComparator implements Comparator>, Serializable {
+ @Override
+ public int compare(Tuple2 a, Tuple2 b) {
+ return compareByteArrays(a._1, b._1);
+ }
+
+ private int compareByteArrays(byte[] a, byte[] b) {
+ for (int i = 0, j = 0; i < a.length && j < b.length; i++, j++) {
+ int cmp = Byte.compare(a[i], b[j]);
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+ return Integer.compare(a.length, b.length);
+ }
+ }
+
+ @Data
+ static class BulkloadInfo implements Serializable {
+ String graphName;
+ String tableName;
+ String hdfsPath;
+ Integer maxDownloadRate;
+
+ public BulkloadInfo(String graphName, String path, LoadType loadType,Directions directions,int maxDownloadRate) {
+ this.graphName = processGraphName(graphName);
+ this.tableName = processTableName(loadType,directions);
+ this.hdfsPath = path;
+ this.maxDownloadRate = maxDownloadRate;
+ }
+
+ private String processGraphName(String graphName) {
+ return graphName + "/g";
+ }
+
+ private String processTableName( LoadType loadType,Directions directions) {
+ if (loadType == LoadType.VERTEX) {
+ return "g+v";
+ } else if ( null ==directions && loadType == LoadType.EDGE ) {
+ return "g+oe";
+ } else if ( directions == Directions.IN && loadType == LoadType.EDGE ) {
+ return "g+ie";
+ }else {
+ throw new IllegalArgumentException("Invalid loadType: " + loadType);
+ }
+ }
+
+
+ @Override
+ public String toString() {
+ return "BulkloadInfo{" +
+ "graphName='" + graphName + '\'' +
+ ", tableName='" + tableName + '\'' +
+ ", hdfsPath='" + hdfsPath + '\'' +
+ '}';
+ }
+
+ enum LoadType {
+ VERTEX,
+ EDGE
+ }
+
+ }
+
+
+ public static String replaceClusterName(String originalPath, String replacement) {
+ String regex = "(hdfs://)([^/]+)(/.*)";
+ Pattern pattern = Pattern.compile(regex);
+ Matcher matcher = pattern.matcher(originalPath);
+
+ if (matcher.matches()) {
+ return matcher.group(1) + replacement + matcher.group(3);
+ } else {
+ throw new IllegalArgumentException("The path does not match the expected format.");
+ }
+ }
+}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/outputformat/SSTFileOutputFormat.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/outputformat/SSTFileOutputFormat.java
new file mode 100644
index 000000000..c202b609e
--- /dev/null
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/outputformat/SSTFileOutputFormat.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.loader.direct.outputformat;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.SstFileWriter;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SSTFileOutputFormat extends FileOutputFormat {
+
+ @Override
+ public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException {
+ Path outputPath = getDefaultWorkFile(job, ".sst");
+ FileSystem fs = outputPath.getFileSystem(job.getConfiguration());
+ FSDataOutputStream fileOut = fs.create(outputPath, false);
+ return new RocksDBSSTFileRecordWriter(fileOut, outputPath, fs);
+ }
+
+ public static class RocksDBSSTFileRecordWriter extends RecordWriter {
+ private final FSDataOutputStream out;
+ private final SstFileWriter sstFileWriter;
+ private final Path outputPath;
+ private final FileSystem fs;
+ private final File localSSTFile;
+ private boolean hasData = false;
+
+ public RocksDBSSTFileRecordWriter(FSDataOutputStream out, Path outputPath, FileSystem fs) throws IOException {
+ this.out = out;
+ this.outputPath = outputPath;
+ this.fs = fs;
+ Options options = new Options();
+ options.setCreateIfMissing(true);
+ this.localSSTFile = File.createTempFile("sstfile", ".sst");
+ this.sstFileWriter = new SstFileWriter(new org.rocksdb.EnvOptions(), options);
+ try {
+ this.sstFileWriter.open(localSSTFile.getAbsolutePath());
+ } catch (RocksDBException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void write(byte[] key, byte[] value) throws IOException {
+ try {
+ sstFileWriter.put(key, value);
+ if (!hasData) {
+ hasData = true;
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException {
+ try {
+ if (hasData) {
+ sstFileWriter.finish();
+ try (InputStream in = new FileInputStream(localSSTFile)) {
+ byte[] buffer = new byte[4096];
+ int bytesRead;
+ while ((bytesRead = in.read(buffer)) != -1) {
+ out.write(buffer, 0, bytesRead);
+ }
+ }
+ out.close();
+ } else {
+ localSSTFile.delete();
+ out.close();
+ fs.delete(outputPath, false);
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/partitioner/HstorePartitioner.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/partitioner/HstorePartitioner.java
new file mode 100644
index 000000000..9625414df
--- /dev/null
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/partitioner/HstorePartitioner.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.loader.direct.partitioner;
+
+
+import org.apache.spark.Partitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+public class HstorePartitioner extends Partitioner {
+ private static final Logger LOG = LoggerFactory.getLogger(HstorePartitioner.class);
+
+ private final int numPartitions;
+
+ public HstorePartitioner(int numPartitions) {
+ this.numPartitions = numPartitions;
+ }
+
+
+ @Override
+ public int numPartitions() {
+ return numPartitions;
+ }
+
+ @Override
+ public int getPartition(Object key) {
+
+ try {
+ return ((Tuple2) key)._2;
+ } catch (Exception e) {
+ LOG.error("When trying to get partitionID, encountered exception: {} \t key = {}", e, key);
+ return 0;
+ }
+
+ }
+}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadContext.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadContext.java
index 0be364bb8..c0a405b4c 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadContext.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadContext.java
@@ -19,13 +19,26 @@
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hugegraph.loader.builder.EdgeBuilder;
+import org.apache.hugegraph.loader.builder.ElementBuilder;
+import org.apache.hugegraph.loader.builder.VertexBuilder;
import org.apache.hugegraph.loader.exception.LoadException;
+import org.apache.hugegraph.loader.mapping.EdgeMapping;
+import org.apache.hugegraph.loader.mapping.VertexMapping;
import org.apache.hugegraph.loader.progress.LoadProgress;
import org.apache.hugegraph.loader.util.DateUtil;
import org.apache.hugegraph.loader.util.HugeClientHolder;
+import org.apache.hugegraph.serializer.AbstractGraphElementSerializer;
+import org.apache.hugegraph.serializer.GraphElementSerializer;
+import org.apache.hugegraph.serializer.SerializerFactory;
+import org.apache.hugegraph.serializer.config.SerializerConfig;
+import org.apache.hugegraph.structure.GraphElement;
import org.slf4j.Logger;
import org.apache.hugegraph.driver.HugeClient;
@@ -57,6 +70,9 @@ public final class LoadContext implements Serializable {
private final HugeClient client;
private final SchemaCache schemaCache;
+ private final Map> builders;
+ private GraphElementSerializer serializer;
+
public LoadContext(LoadOptions options) {
this.timestamp = DateUtil.now("yyyyMMdd-HHmmss");
@@ -70,6 +86,8 @@ public LoadContext(LoadOptions options) {
this.loggers = new ConcurrentHashMap<>();
this.client = HugeClientHolder.create(options);
this.schemaCache = new SchemaCache(this.client);
+ builders=new HashMap<>();
+ this.serializer = initSerializer();
}
public LoadContext(ComputerLoadOptions options) {
@@ -83,9 +101,21 @@ public LoadContext(ComputerLoadOptions options) {
this.newProgress = new LoadProgress();
this.loggers = new ConcurrentHashMap<>();
this.client = null;
+ builders=new HashMap<>();
this.schemaCache = options.schemaCache();
}
+ public GraphElementSerializer initSerializer(){
+ SerializerConfig config = new SerializerConfig();
+ config.setBackendStoreType(options.backendStoreType);
+ config.setGraphName(options.graph);
+ config.setEdgePartitions(options.edgePartitions);
+ config.setPdAddress(options.pdAddress);
+ config.setPdRestPort(options.pdRestPort);
+ config.setVertexPartitions(options.edgePartitions);
+ return SerializerFactory.getSerializer(client,config);
+ }
+
public String timestamp() {
return this.timestamp;
}
@@ -132,6 +162,9 @@ public FailLogger failureLogger(InputStruct struct) {
return new FailLogger(this, struct);
});
}
+ public GraphElementSerializer getSerializer() {
+ return serializer;
+ }
public HugeClient client() {
return this.client;
@@ -194,4 +227,12 @@ public void close() {
LOG.info("Close HugeClient successfully");
this.closed = true;
}
+
+ public void init( InputStruct struct) {
+ for (VertexMapping vertexMapping : struct.vertices())
+ this.builders.put(new VertexBuilder(this, struct, vertexMapping), new ArrayList<>());
+ for (EdgeMapping edgeMapping : struct.edges())
+ this.builders.put(new EdgeBuilder(this, struct, edgeMapping), new ArrayList<>());
+ this.updateSchemaCache();
+ }
}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java
index d2a8a4546..6c083d5c3 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/executor/LoadOptions.java
@@ -239,6 +239,27 @@ public class LoadOptions implements Serializable {
description = "HBase zookeeper parent")
public String hbaseZKParent;
+ @Parameter(names = {"--backend"}, arity = 1,
+ description = "backend store type")
+ public String backendStoreType = "hstore";
+
+ @Parameter(names = {"--pd-address"}, arity = 1,
+ description = "pd-address")
+ public String pdAddress ;
+
+
+ @Parameter(names = {"--pd-rest-port"}, arity = 1,
+ description = "pd-rest-port")
+ public String pdRestPort="8620";
+
+ @Parameter(names = {"--hdfs-uri"}, arity = 1,
+ description = "--hdfs-uri")
+ public String hdfsUri;
+
+ @Parameter(names = {"--max-download-rate"}, arity = 1,
+ description = "store download rate limit per second")
+ public int maxDownloadRate = 1024;
+
public String workModeString() {
if (this.incrementalMode) {
return "INCREMENTAL MODE";
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/DistributedLoadMetrics.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/DistributedLoadMetrics.java
new file mode 100644
index 000000000..82d502345
--- /dev/null
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/DistributedLoadMetrics.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.hugegraph.loader.metrics;
+
+import org.apache.hugegraph.loader.mapping.EdgeMapping;
+import org.apache.hugegraph.loader.mapping.ElementMapping;
+import org.apache.hugegraph.loader.mapping.InputStruct;
+import org.apache.hugegraph.loader.mapping.VertexMapping;
+import org.apache.spark.SparkContext;
+import org.apache.spark.util.LongAccumulator;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.LongAdder;
+
+public final class DistributedLoadMetrics implements Serializable {
+ private final InputStruct struct;
+
+ private long readSuccess;
+
+ private long readFailure;
+
+ private boolean inFlight;
+
+ private final LongAdder flightingNums;
+
+ private final Map vertexMetrics;
+
+ private final Map edgeMetrics;
+
+ public DistributedLoadMetrics(InputStruct struct, SparkContext sc) {
+ this.struct = struct;
+ this.readSuccess = 0L;
+ this.readFailure = 0L;
+ this.inFlight = false;
+ this.flightingNums = new LongAdder();
+ this.vertexMetrics = new HashMap<>();
+ this.edgeMetrics = new HashMap<>();
+ for (VertexMapping mapping : struct.vertices()) {
+ DistributedMetrics distributedMetrics = new DistributedMetrics();
+ distributedMetrics.init(sc, mapping.toString());
+ this.vertexMetrics.put(mapping.label(), distributedMetrics);
+ }
+ for (EdgeMapping mapping : struct.edges()) {
+ DistributedMetrics distributedMetrics = new DistributedMetrics();
+ distributedMetrics.init(sc, mapping.toString());
+ this.edgeMetrics.put(mapping.label(), distributedMetrics);
+ }
+ }
+
+ public long readSuccess() {
+ return this.readSuccess;
+ }
+
+ public void readSuccess(long count) {
+ this.readSuccess = count;
+ }
+
+ public void increaseReadSuccess() {
+ this.readSuccess++;
+ }
+
+ public void plusReadSuccess(long count) {
+ this.readSuccess += count;
+ }
+
+ public long readFailure() {
+ return this.readFailure;
+ }
+
+ public void readFailure(long count) {
+ this.readFailure = count;
+ }
+
+ public void increaseReadFailure() {
+ this.readFailure++;
+ }
+
+ public void startInFlight() {
+ this.inFlight = true;
+ }
+
+ public void stopInFlight() {
+ this.inFlight = false;
+ }
+
+ public void plusFlighting(int num) {
+ this.flightingNums.add(num);
+ }
+
+ public void minusFlighting(int num) {
+ this.flightingNums.add(-num);
+ }
+
+ public long parseSuccess(ElementMapping mapping) {
+ return (metrics(mapping)).parseSuccess.value().longValue();
+ }
+
+ public void plusParseSuccess(ElementMapping mapping, long count) {
+ (metrics(mapping)).parseSuccess.add(count);
+ }
+
+ public long parseFailure(ElementMapping mapping) {
+ return
+ (metrics(mapping)).parseFailure.value().longValue();
+ }
+
+ public void increaseParseFailure(ElementMapping mapping) {
+ (metrics(mapping)).parseFailure.add(1L);
+ }
+
+ public long insertSuccess(ElementMapping mapping) {
+ return (metrics(mapping)).insertSuccess.value().longValue();
+ }
+
+ public void plusInsertSuccess(ElementMapping mapping, long count) {
+ (metrics(mapping)).insertSuccess.add(count);
+ }
+
+ public long insertFailure(ElementMapping mapping) {
+ return (metrics(mapping)).insertFailure.value().longValue();
+ }
+
+ public void increaseInsertFailure(ElementMapping mapping) {
+ (metrics(mapping)).insertFailure.add(1L);
+ }
+
+ public Map vertexMetrics() {
+ return this.vertexMetrics;
+ }
+
+ public Map edgeMetrics() {
+ return this.edgeMetrics;
+ }
+
+ public long totalParseFailures() {
+ long total = 0L;
+ for (DistributedMetrics counter : this.vertexMetrics.values())
+ total += counter.parseFailure.value().longValue();
+ for (DistributedMetrics counter : this.edgeMetrics.values())
+ total += counter.parseFailure.value().longValue();
+ return total;
+ }
+
+ public long totalInsertFailures() {
+ long total = 0L;
+ for (DistributedMetrics counter : this.vertexMetrics.values())
+ total += counter.insertFailure.value().longValue();
+ for (DistributedMetrics counter : this.edgeMetrics.values())
+ total += counter.insertFailure.value().longValue();
+ return total;
+ }
+
+ private DistributedMetrics metrics(ElementMapping mapping) {
+ if (mapping.type().isVertex())
+ return this.vertexMetrics.get(mapping.label());
+ return this.edgeMetrics.get(mapping.label());
+ }
+
+ public static class DistributedMetrics implements Serializable {
+ private LongAccumulator parseSuccess;
+
+ private LongAccumulator parseFailure;
+
+ private LongAccumulator insertSuccess;
+
+ private LongAccumulator insertFailure;
+
+ public void init(SparkContext sc, String label) {
+ this.parseSuccess = sc.longAccumulator(label + "_" + "parseSuccess");
+ this.parseFailure = sc.longAccumulator(label + "_" + "parseFailure");
+ this.insertSuccess = sc.longAccumulator(label + "_" + "insertSuccess");
+ this.insertFailure = sc.longAccumulator(label + "_" + "parseFailure");
+ }
+
+ public void plusDisInsertSuccess(Long count) {
+ this.insertSuccess.add(count);
+ }
+
+ public void plusDisParseSuccess(Long count) {
+ this.parseSuccess.add(count);
+ }
+
+ public long parseSuccess() {
+ return this.parseSuccess.value().longValue();
+ }
+
+ public long parseFailure() {
+ return this.parseFailure.value().longValue();
+ }
+
+ public long insertSuccess() {
+ return this.insertSuccess.value().longValue();
+ }
+
+ public long insertFailure() {
+ return this.insertFailure.value().longValue();
+ }
+
+ @Override
+ public String toString() {
+ return
+ "parseSuccess=" + parseSuccess.value() +
+ ", parseFailure=" + parseFailure.value() +
+ ", insertSuccess=" + insertSuccess.value() +
+ ", insertFailure=" + insertFailure.value()
+ ;
+ }
+ }
+}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/LoadReport.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/LoadReport.java
index d0e7dcedd..f3a531765 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/LoadReport.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/LoadReport.java
@@ -34,6 +34,50 @@ public final class LoadReport {
private long edgeInsertSuccess;
private long edgeInsertFailure;
+ public static LoadReport collect(LoadSummary summary) {
+ LoadReport report = new LoadReport();
+ report.totalTime = summary.totalTime();
+ for (LoadMetrics metrics : summary.inputMetricsMap().values()) {
+ report.readSuccess += metrics.readSuccess();
+ report.readFailure += metrics.readFailure();
+ for (LoadMetrics.Metrics labelMetrics : metrics.vertexMetrics().values()) {
+ report.vertexParseSuccess += labelMetrics.parseSuccess();
+ report.vertexParseFailure += labelMetrics.parseFailure();
+ report.vertexInsertSuccess += labelMetrics.insertSuccess();
+ report.vertexInsertFailure += labelMetrics.insertFailure();
+ }
+ for (LoadMetrics.Metrics labelMetrics : metrics.edgeMetrics().values()) {
+ report.edgeParseSuccess += labelMetrics.parseSuccess();
+ report.edgeParseFailure += labelMetrics.parseFailure();
+ report.edgeInsertSuccess += labelMetrics.insertSuccess();
+ report.edgeInsertFailure += labelMetrics.insertFailure();
+ }
+ }
+ return report;
+ }
+
+ public static LoadReport collectDistributed(LoadSummary summary) {
+ LoadReport report = new LoadReport();
+ report.totalTime = summary.totalTime();
+ for (DistributedLoadMetrics metrics : summary.inputDistributedMetricsMap().values()) {
+ report.readSuccess += metrics.readSuccess();
+ report.readFailure += metrics.readFailure();
+ for (DistributedLoadMetrics.DistributedMetrics labelMetrics : metrics.vertexMetrics().values()) {
+ report.vertexParseSuccess += labelMetrics.parseSuccess();
+ report.vertexParseFailure += labelMetrics.parseFailure();
+ report.vertexInsertSuccess += labelMetrics.insertSuccess();
+ report.vertexInsertFailure += labelMetrics.insertFailure();
+ }
+ for (DistributedLoadMetrics.DistributedMetrics labelMetrics : metrics.edgeMetrics().values()) {
+ report.edgeParseSuccess += labelMetrics.parseSuccess();
+ report.edgeParseFailure += labelMetrics.parseFailure();
+ report.edgeInsertSuccess += labelMetrics.insertSuccess();
+ report.edgeInsertFailure += labelMetrics.insertFailure();
+ }
+ }
+ return report;
+ }
+
public long totalTime() {
return this.totalTime;
}
@@ -77,26 +121,4 @@ public long edgeInsertSuccess() {
public long edgeInsertFailure() {
return this.edgeInsertFailure;
}
-
- public static LoadReport collect(LoadSummary summary) {
- LoadReport report = new LoadReport();
- report.totalTime = summary.totalTime();
- for (LoadMetrics metrics : summary.inputMetricsMap().values()) {
- report.readSuccess += metrics.readSuccess();
- report.readFailure += metrics.readFailure();
- for (LoadMetrics.Metrics labelMetrics : metrics.vertexMetrics().values()) {
- report.vertexParseSuccess += labelMetrics.parseSuccess();
- report.vertexParseFailure += labelMetrics.parseFailure();
- report.vertexInsertSuccess += labelMetrics.insertSuccess();
- report.vertexInsertFailure += labelMetrics.insertFailure();
- }
- for (LoadMetrics.Metrics labelMetrics : metrics.edgeMetrics().values()) {
- report.edgeParseSuccess += labelMetrics.parseSuccess();
- report.edgeParseFailure += labelMetrics.parseFailure();
- report.edgeInsertSuccess += labelMetrics.insertSuccess();
- report.edgeInsertFailure += labelMetrics.insertFailure();
- }
- }
- return report;
- }
}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/LoadSummary.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/LoadSummary.java
index dda1dd099..b19f35132 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/LoadSummary.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/metrics/LoadSummary.java
@@ -29,6 +29,7 @@
import org.apache.hugegraph.loader.mapping.InputStruct;
import org.apache.hugegraph.loader.mapping.LoadMapping;
import org.apache.hugegraph.util.InsertionOrderUtil;
+import org.apache.spark.SparkContext;
public final class LoadSummary {
@@ -43,6 +44,8 @@ public final class LoadSummary {
private final RangesTimer loadRangesTimer;
// Every input struct has a metric
private final Map inputMetricsMap;
+ private final Map inputDistributedMetricsMap;
+
public LoadSummary() {
this.vertexLoaded = new LongAdder();
@@ -55,6 +58,7 @@ public LoadSummary() {
this.edgeRangesTimer = new RangesTimer(Constants.TIME_RANGE_CAPACITY);
this.loadRangesTimer = new RangesTimer(Constants.TIME_RANGE_CAPACITY);
this.inputMetricsMap = InsertionOrderUtil.newMap();
+ this.inputDistributedMetricsMap = InsertionOrderUtil.newMap();
}
public void initMetrics(LoadMapping mapping) {
@@ -63,6 +67,11 @@ public void initMetrics(LoadMapping mapping) {
}
}
+ public void initMetrics(LoadMapping mapping, SparkContext sc) {
+ for (InputStruct struct : mapping.structs())
+ this.inputDistributedMetricsMap.put(struct.id(), new DistributedLoadMetrics(struct, sc));
+ }
+
public Map inputMetricsMap() {
return this.inputMetricsMap;
}
@@ -99,38 +108,38 @@ public long totalReadLines() {
public long totalReadSuccess() {
return this.inputMetricsMap.values().stream()
- .map(LoadMetrics::readSuccess)
- .reduce(0L, Long::sum);
+ .map(LoadMetrics::readSuccess)
+ .reduce(0L, Long::sum);
}
public long totalReadFailures() {
return this.inputMetricsMap.values().stream()
- .map(LoadMetrics::readFailure)
- .reduce(0L, Long::sum);
+ .map(LoadMetrics::readFailure)
+ .reduce(0L, Long::sum);
}
public long totalParseFailures() {
return this.inputMetricsMap.values().stream()
- .map(LoadMetrics::totalParseFailures)
- .reduce(0L, Long::sum);
+ .map(LoadMetrics::totalParseFailures)
+ .reduce(0L, Long::sum);
}
public long totalInsertFailures() {
return this.inputMetricsMap.values().stream()
- .map(LoadMetrics::totalInsertFailures)
- .reduce(0L, Long::sum);
+ .map(LoadMetrics::totalInsertFailures)
+ .reduce(0L, Long::sum);
}
public void addTimeRange(ElemType type, long start, long end) {
RangesTimer timer = type.isVertex() ? this.vertexRangesTimer :
- this.edgeRangesTimer;
+ this.edgeRangesTimer;
timer.addTimeRange(start, end);
this.loadRangesTimer.addTimeRange(start, end);
}
public void calculateTotalTime(ElemType type) {
RangesTimer timer = type.isVertex() ? this.vertexRangesTimer :
- this.edgeRangesTimer;
+ this.edgeRangesTimer;
AtomicLong elemTime = type.isVertex() ? this.vertexTime : this.edgeTime;
elemTime.set(timer.totalTime());
loadTime.set(this.loadRangesTimer.totalTime());
@@ -176,4 +185,13 @@ public long loadRate(ElemType type) {
long success = isVertex ? this.vertexLoaded() : this.edgeLoaded();
return success * 1000 / totalTime;
}
+
+ public DistributedLoadMetrics distributedLoadMetrics(InputStruct struct) {
+ return this.inputDistributedMetricsMap.get(struct.id());
+ }
+
+ public Map inputDistributedMetricsMap() {
+ return this.inputDistributedMetricsMap;
+ }
+
}
diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
index 61cf3136c..f27ca9b7f 100644
--- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
+++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/spark/HugeGraphSparkLoader.java
@@ -17,17 +17,23 @@
package org.apache.hugegraph.loader.spark;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hugegraph.driver.GraphManager;
import org.apache.hugegraph.loader.builder.EdgeBuilder;
import org.apache.hugegraph.loader.builder.ElementBuilder;
import org.apache.hugegraph.loader.builder.VertexBuilder;
+
+import org.apache.hugegraph.loader.direct.loader.AbstractDirectLoader;
import org.apache.hugegraph.loader.direct.loader.HBaseDirectLoader;
+import org.apache.hugegraph.loader.direct.loader.HStoreDirectLoader;
import org.apache.hugegraph.loader.exception.LoadException;
import org.apache.hugegraph.loader.executor.LoadContext;
import org.apache.hugegraph.loader.executor.LoadOptions;
-import org.apache.hugegraph.loader.metrics.LoadDistributeMetrics;
+import org.apache.hugegraph.loader.metrics.DistributedLoadMetrics;
+import org.apache.hugegraph.loader.metrics.LoadReport;
+import org.apache.hugegraph.loader.metrics.LoadSummary;
import org.apache.hugegraph.loader.source.InputSource;
import org.apache.hugegraph.loader.source.jdbc.JDBCSource;
import org.apache.hugegraph.loader.util.Printer;
@@ -41,6 +47,7 @@
import org.apache.hugegraph.loader.source.file.FileFormat;
import org.apache.hugegraph.loader.source.file.FileSource;
import org.apache.hugegraph.loader.source.file.SkippedLine;
+import org.apache.hugegraph.serializer.direct.struct.Directions;
import org.apache.hugegraph.structure.GraphElement;
import org.apache.hugegraph.structure.graph.BatchEdgeRequest;
import org.apache.hugegraph.structure.graph.BatchVertexRequest;
@@ -49,14 +56,15 @@
import org.apache.hugegraph.structure.graph.Vertex;
import org.apache.hugegraph.util.Log;
+import org.apache.hugegraph.util.TimeUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructField;
-import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import java.io.Serializable;
@@ -72,6 +80,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import scala.Tuple2;
import scala.collection.JavaConverters;
public class HugeGraphSparkLoader implements Serializable {
@@ -82,6 +91,8 @@ public class HugeGraphSparkLoader implements Serializable {
private final Map> builders;
private final transient ExecutorService executor;
+ private static final String DIVIDE_LINE = StringUtils.repeat('-', 50);
+
public static void main(String[] args) {
HugeGraphSparkLoader loader;
@@ -133,66 +144,197 @@ public void load() throws ExecutionException, InterruptedException {
this.loadOptions.copyBackendStoreInfo(mapping.getBackendStoreInfo());
}
- SparkConf conf = new SparkConf();
- registerKryoClasses(conf);
- SparkSession session = SparkSession.builder().config(conf).getOrCreate();
+ SparkSession session = initializeSparkSession();
SparkContext sc = session.sparkContext();
- LongAccumulator totalInsertSuccess = sc.longAccumulator("totalInsertSuccess");
+ LoadSummary summary = initializeLoadSummary(mapping, sc);
+
List> futures = new ArrayList<>(structs.size());
+ Object[] outEArray = new JavaPairRDD[structs.size()];
+ Object[] inEArray = new JavaPairRDD[structs.size()];
+
+
+ for (int i = 0; i < structs.size(); i++) {
+ InputStruct struct = structs.get(i);
+ DistributedLoadMetrics distributedLoadMetrics = summary.distributedLoadMetrics(struct);
+ int finalI = i;
- for (InputStruct struct : structs) {
Future> future = this.executor.submit(() -> {
- LOG.info("\n Initializes the accumulator corresponding to the {} ",
- struct.input().asFileSource().path());
- LoadDistributeMetrics loadDistributeMetrics = new LoadDistributeMetrics(struct);
- loadDistributeMetrics.init(sc);
- LOG.info("\n Start to load data, data info is: \t {} ",
- struct.input().asFileSource().path());
- Dataset ds = read(session, struct);
if (sinkType) {
- LOG.info("\n Start to load data using spark apis \n");
- ds.foreachPartition((Iterator p) -> {
- LoadContext context = initPartition(this.loadOptions, struct);
- p.forEachRemaining((Row row) -> {
- loadRow(struct, row, p, context);
- });
- context.close();
- });
-
+ processWithAPI(session, struct, distributedLoadMetrics);
} else {
- LOG.info("\n Start to load data using spark bulkload \n");
- // gen-hfile
- HBaseDirectLoader directLoader = new HBaseDirectLoader(loadOptions, struct,
- loadDistributeMetrics);
- directLoader.bulkload(ds);
-
+ processWithBulkLoad(session, struct, distributedLoadMetrics, outEArray,inEArray, finalI);
}
- collectLoadMetrics(loadDistributeMetrics, totalInsertSuccess);
- LOG.info("\n Finished load {} data ", struct.input().asFileSource().path());
});
futures.add(future);
}
+
+ waitForFutures(futures);
+ if (!sinkType) {
+ unifyAndLoad(outEArray, structs,null);
+ if(arrayHasData(inEArray)){
+ unifyAndLoad(inEArray, structs, Directions.IN);
+ }
+ }
+ summary.stopTotalTimer();
+ printDistributedSummary(summary);
+ sc.stop();
+ session.close();
+ session.stop();
+ }
+
+ private void waitForFutures(List> futures) throws ExecutionException, InterruptedException {
for (Future> future : futures) {
future.get();
}
+ }
+ public static boolean arrayHasData(Object[] array) {
+ if (array == null || array.length == 0) {
+ return false;
+ }
+ for (Object element : array) {
+ if (element != null) {
+ return true;
+ }
+ }
+ return false;
+ }
+ private SparkSession initializeSparkSession() {
+ SparkConf conf = new SparkConf();
+ registerKryoClasses(conf);
+ return SparkSession.builder().config(conf).getOrCreate();
+ }
- Long totalInsertSuccessCnt = totalInsertSuccess.value();
- LOG.info("\n ------------The data load task is complete-------------------\n" +
- "\n insertSuccessCnt:\t {} \n ---------------------------------------------\n",
- totalInsertSuccessCnt);
+ private LoadSummary initializeLoadSummary(LoadMapping mapping, SparkContext sc) {
+ LoadSummary summary = new LoadSummary();
+ summary.initMetrics(mapping, sc);
+ summary.startTotalTimer();
+ return summary;
+ }
+ private void processWithAPI(SparkSession session, InputStruct struct, DistributedLoadMetrics distributedLoadMetrics) {
+ LOG.info("\n Initializes the accumulator corresponding to the {} ",
+ struct.input().asFileSource().path());
- sc.stop();
- session.close();
- session.stop();
+ LOG.info("\n Start to load data, data info is: \t {} ",
+ struct.input().asFileSource().path());
+ Dataset ds = read(session, struct);
+ LOG.info("\n Start to load data using spark apis \n");
+ ds.foreachPartition((Iterator p) -> {
+ LoadContext context = initPartition(this.loadOptions, struct);
+ p.forEachRemaining((Row row) -> {
+ loadRow(struct, row, p, context, distributedLoadMetrics);
+ });
+ context.close();
+ });
}
- private void collectLoadMetrics(LoadDistributeMetrics loadMetrics,
- LongAccumulator totalInsertSuccess) {
- Long edgeInsertSuccess = loadMetrics.readEdgeInsertSuccess();
- Long vertexInsertSuccess = loadMetrics.readVertexInsertSuccess();
- totalInsertSuccess.add(edgeInsertSuccess);
- totalInsertSuccess.add(vertexInsertSuccess);
+ private void processWithBulkLoad(SparkSession session, InputStruct struct,
+ DistributedLoadMetrics distributedLoadMetrics, Object[] outEArray, Object[] inEArray,int index) {
+ LOG.info("\n Start to load data, data info is: \t {} ",
+ struct.input().asFileSource().path());
+ Dataset ds = read(session, struct);
+ LOG.info("\n Start to load data using spark bulkload \n");
+ processDirectLoader(struct, distributedLoadMetrics, ds, outEArray,inEArray, index);
+ }
+
+ private void unifyAndLoad(Object[] dstArray, List structs, Directions directions) {
+ JavaPairRDD unionRDD = null;
+ String path = null;
+ switch (loadOptions.backendStoreType.toLowerCase()) {
+ case "hbase":
+ unionRDD = unionRDDs((JavaPairRDD[]) dstArray);
+ HBaseDirectLoader hbaseDirectLoader = new HBaseDirectLoader(this.loadOptions, structs.get(0));
+ path = hbaseDirectLoader.generateFiles(unionRDD);
+ if(null==directions){
+ hbaseDirectLoader.loadFiles(path,null);
+ }else{
+ hbaseDirectLoader.loadFiles(path,Directions.IN);
+ }
+ break;
+ case "hstore":
+ unionRDD = unionRDDs((JavaPairRDD, byte[]>[]) dstArray);
+ HStoreDirectLoader hstoreDirectLoader = new HStoreDirectLoader(this.loadOptions, structs.get(0));
+ path = hstoreDirectLoader.generateFiles(unionRDD);
+
+ if(null ==directions){
+ hstoreDirectLoader.loadFiles(path,null);
+ }else {
+ hstoreDirectLoader.loadFiles(path, Directions.IN);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported backend store type: " + loadOptions.backendStoreType);
+ }
+ }
+
+ private JavaPairRDD unionRDDs(JavaPairRDD[] rdds) {
+ JavaPairRDD unionRDD = rdds[0];
+ for (int i = 1; i < rdds.length; i++) {
+ unionRDD = unionRDD.union(rdds[i]);
+ }
+ return unionRDD;
+ }
+
+
+
+ private static void log(String message) {
+ LOG.info(message);
+ }
+ public static void printDistributedSummary(LoadSummary summary) {
+ log(DIVIDE_LINE);
+ log("detail metrics");
+ summary.inputDistributedMetricsMap().forEach((id, metrics) -> {
+ log("");
+ log(String.format("input-struct '%s'", new Object[] { id }));
+ metrics.vertexMetrics().forEach((name,distributedMetrics)->{
+ log(String.format("vertex '%s'", new Object[] { name }));
+ log(distributedMetrics.toString());
+ });
+ metrics.edgeMetrics().forEach((name,distributedMetrics)->{
+ log(String.format("edge '%s'", new Object[] { name }));
+ log(distributedMetrics.toString());
+ });
+ });
+ log(DIVIDE_LINE);
+ LoadReport loadReport = LoadReport.collectDistributed(summary);
+ printCountReport(loadReport);
+ log(DIVIDE_LINE);
+ printMeterReport(summary, loadReport);
+ }
+
+ private static void printMeterReport(LoadSummary summary, LoadReport report) {
+ long totalTime = summary.totalTime();
+ log("meter metrics");
+ log("total time", TimeUtil.readableTime(totalTime));
+ log("vertex load time", TimeUtil.readableTime(report.vertexInsertSuccess() == 0L?0L:totalTime));
+ log("vertex load rate(vertices/s)",
+ (report.vertexInsertSuccess() == 0L) ? "0.0" : String.format("%.2f", new Object[] { Double.valueOf(report.vertexInsertSuccess() * 1000.0D / totalTime) }));
+ log("edge load time", TimeUtil.readableTime(report.edgeInsertSuccess() == 0L?0L:totalTime ));
+ log("edge load rate(edges/s)",
+ (report.edgeInsertSuccess() == 0L) ? "0.0" : String.format("%.2f", new Object[] { Double.valueOf(report.edgeInsertSuccess() * 1000.0D / totalTime) }));
+ }
+
+ private static void log(String key, String value) {
+ log(String.format(" %-30s: %-20s", new Object[] { key, value }));
+ }
+ private static void printCountReport(LoadReport report) {
+ log("count metrics");
+ log("input read success", report.readSuccess());
+ log("input read failure", report.readFailure());
+ log("vertex parse success", report.vertexParseSuccess());
+ log("vertex parse failure", report.vertexParseFailure());
+ log("vertex insert success", report.vertexInsertSuccess());
+ log("vertex insert failure", report.vertexInsertFailure());
+ log("edge parse success", report.edgeParseSuccess());
+ log("edge parse failure", report.edgeParseFailure());
+ log("edge insert success", report.edgeInsertSuccess());
+ log("edge insert failure", report.edgeInsertFailure());
+ log("total insert success", report.vertexInsertSuccess() + report.edgeInsertSuccess());
+ log("total insert failure", report.vertexInsertFailure() + report.edgeInsertFailure());
+ }
+
+ private static void log(String key, long value) {
+ LOG.info(String.format(" %-30s: %-20d", new Object[] { key, Long.valueOf(value) }));
}
private LoadContext initPartition(
@@ -211,7 +353,7 @@ private LoadContext initPartition(
}
private void loadRow(InputStruct struct, Row row, Iterator p,
- LoadContext context) {
+ LoadContext context,DistributedLoadMetrics distributedLoadMetrics) {
for (Map.Entry> builderMap :
this.builders.entrySet()) {
ElementMapping elementMapping = builderMap.getKey().mapping();
@@ -219,13 +361,13 @@ private void loadRow(InputStruct struct, Row row, Iterator p,
if (elementMapping.skip()) {
continue;
}
- parse(row, builderMap, struct);
+ parse(row, builderMap, struct,distributedLoadMetrics);
// Insert
List graphElements = builderMap.getValue();
if (graphElements.size() >= elementMapping.batchSize() ||
(!p.hasNext() && graphElements.size() > 0)) {
- flush(builderMap, context.client().graph(), this.loadOptions.checkVertex);
+ flush(builderMap, context.client().graph(), this.loadOptions.checkVertex,distributedLoadMetrics);
}
}
}
@@ -282,7 +424,7 @@ private Dataset read(SparkSession ss, InputStruct struct) {
}
private void parse(Row row, Map.Entry> builderMap,
- InputStruct struct) {
+ InputStruct struct,DistributedLoadMetrics loadDistributeMetrics) {
ElementBuilder builder = builderMap.getKey();
List graphElements = builderMap.getValue();
if ("".equals(row.mkString())) {
@@ -300,6 +442,7 @@ private void parse(Row row, Map.Entry> builde
} else {
elements = builder.build(row);
}
+ loadDistributeMetrics.plusParseSuccess(builder.mapping(),elements.size());
break;
case JDBC:
Object[] structFields = JavaConverters.asJavaCollection(row.schema().toList())
@@ -312,6 +455,7 @@ private void parse(Row row, Map.Entry> builde
values[i] = row.get(i);
}
elements = builder.build(headers, values);
+ loadDistributeMetrics.plusParseSuccess(builder.mapping(),elements.size());
break;
default:
throw new AssertionError(String.format("Unsupported input source '%s'",
@@ -321,7 +465,7 @@ private void parse(Row row, Map.Entry> builde
}
private void flush(Map.Entry> builderMap,
- GraphManager g, boolean isCheckVertex) {
+ GraphManager g, boolean isCheckVertex,DistributedLoadMetrics loadDistributeMetrics) {
ElementBuilder builder = builderMap.getKey();
ElementMapping elementMapping = builder.mapping();
List graphElements = builderMap.getValue();
@@ -329,9 +473,11 @@ private void flush(Map.Entry> builderMap,
Map updateStrategyMap = elementMapping.updateStrategies();
if (updateStrategyMap.isEmpty()) {
if (isVertex) {
- g.addVertices((List) (Object) graphElements);
+ List vertices = g.addVertices((List) (Object) graphElements);
+ loadDistributeMetrics.plusInsertSuccess(elementMapping, vertices.size());
} else {
- g.addEdges((List) (Object) graphElements);
+ List edges = g.addEdges((List) (Object) graphElements);
+ loadDistributeMetrics.plusInsertSuccess(elementMapping, edges.size());
}
} else {
// CreateIfNotExist doesn't support false now
@@ -341,16 +487,44 @@ private void flush(Map.Entry> builderMap,
req.vertices((List) (Object) graphElements)
.updatingStrategies(updateStrategyMap)
.createIfNotExist(true);
- g.updateVertices(req.build());
+ List vertices = g.updateVertices(req.build());
+ loadDistributeMetrics.plusInsertSuccess(elementMapping, vertices.size());
} else {
BatchEdgeRequest.Builder req = new BatchEdgeRequest.Builder();
req.edges((List) (Object) graphElements)
.updatingStrategies(updateStrategyMap)
.checkVertex(isCheckVertex)
.createIfNotExist(true);
- g.updateEdges(req.build());
+ List edges = g.updateEdges(req.build());
+ loadDistributeMetrics.plusInsertSuccess(elementMapping, edges.size());
}
}
graphElements.clear();
}
+
+
+ private void processDirectLoader(InputStruct struct, DistributedLoadMetrics distributedLoadMetrics, Dataset ds, Object[] outEArray, Object[] inEArray,int index) {
+ AbstractDirectLoader directLoader;
+ switch (loadOptions.backendStoreType) {
+ case "hbase":
+ directLoader = new HBaseDirectLoader(this.loadOptions, struct, distributedLoadMetrics);
+ ((JavaPairRDD[]) outEArray)[index] = directLoader.buildVertexAndEdge(ds,null);
+ if(struct.edges().size()>0){
+ ((JavaPairRDD[]) inEArray)[index] = directLoader.buildVertexAndEdge(ds, Directions.IN);
+ }
+
+ break;
+ case "hstore":
+ directLoader = new HStoreDirectLoader(this.loadOptions, struct, distributedLoadMetrics);
+ ((JavaPairRDD[]) outEArray)[index] = directLoader.buildVertexAndEdge(ds,null);
+ if(struct.edges().size()>0){
+ ((JavaPairRDD[]) inEArray)[index] = directLoader.buildVertexAndEdge(ds, Directions.IN);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported backend store type: " + loadOptions.backendStoreType);
+ }
+ }
+
+
}
diff --git a/pom.xml b/pom.xml
index eb0ae13cb..3beea0313 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,6 +116,7 @@
provided
provided
2.12
+ 2.12.11
3.0.0
true
true
@@ -299,6 +300,11 @@
gson
${gson.version}
+
+ org.scala-lang
+ scala-library
+ ${scala-lang.version}
+