diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 962814f0..255d74c1 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -175,7 +175,8 @@ class EdgeProcessor(data: DataFrame, hostAddrs.append(new HostAddress(addr.getHostText, addr.getPort)) } - val partitionId = NebulaUtils.getPartitionId(srcId, partitionNum, vidType) + val srcPartitionId = NebulaUtils.getPartitionId(srcId, partitionNum, vidType) + val dstPartitionId = NebulaUtils.getPartitionId(dstId, partitionNum, vidType) val codec = new NebulaCodecImpl() import java.nio.ByteBuffer @@ -199,13 +200,13 @@ class EdgeProcessor(data: DataFrame, dstId.getBytes() } val positiveEdgeKey = codec.edgeKeyByDefaultVer(spaceVidLen, - partitionId, + srcPartitionId, srcBytes, edgeItem.getEdge_type, ranking, dstBytes) val reverseEdgeKey = codec.edgeKeyByDefaultVer(spaceVidLen, - partitionId, + dstPartitionId, dstBytes, -edgeItem.getEdge_type, ranking,