Skip to content

Commit

Permalink
[GIE] Refine FFI interface build_physical_plan (#2911)
Browse files Browse the repository at this point in the history
<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?
Return physical plan bytes from ffi interface `build_physical_plan`
directly, instead of wrapping the physical plan in the JobRequest, which
was previously bound to the pegasus engine API.

<!-- Please give a short brief about these changes. -->

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes

---------

Co-authored-by: BingqingLyu <bingqing.lbq@alibaba-inc.com>
  • Loading branch information
shirly121 and BingqingLyu authored Jun 16, 2023
1 parent d0dc2b7 commit 7637aaf
Show file tree
Hide file tree
Showing 9 changed files with 359 additions and 314 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.alibaba.pegasus.RpcClient;
import com.alibaba.pegasus.intf.ResultProcessor;
import com.alibaba.pegasus.service.protocol.PegasusClient;
import com.google.protobuf.ByteString;

import io.grpc.Status;

Expand All @@ -49,7 +50,9 @@ public RpcExecutionClient(Configs graphConfig, ChannelFetcher<RpcChannel> channe
public void submit(ExecutionRequest request, ExecutionResponseListener listener)
throws Exception {
PegasusClient.JobRequest jobRequest =
PegasusClient.JobRequest.parseFrom((byte[]) request.getRequestPhysical().build());
PegasusClient.JobRequest.newBuilder()
.setPlan(ByteString.copyFrom((byte[]) request.getRequestPhysical().build()))
.build();
PegasusClient.JobConfig jobConfig =
PegasusClient.JobConfig.newBuilder()
.setJobId(request.getRequestId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.alibaba.pegasus.intf.ResultProcessor;
import com.alibaba.pegasus.service.protocol.PegasusClient;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import org.antlr.v4.runtime.tree.ParseTree;
Expand Down Expand Up @@ -377,7 +378,10 @@ protected void processTraversal(
byte[] physicalPlanBytes = irPlan.toPhysicalBytes(configs);
irPlan.close();

PegasusClient.JobRequest request = PegasusClient.JobRequest.parseFrom(physicalPlanBytes);
PegasusClient.JobRequest request =
PegasusClient.JobRequest.newBuilder()
.setPlan(ByteString.copyFrom(physicalPlanBytes))
.build();
PegasusClient.JobConfig jobConfig =
PegasusClient.JobConfig.newBuilder()
.setJobId(jobId)
Expand Down Expand Up @@ -423,7 +427,9 @@ protected void processRelNode(
jobName,
physicalBuilder.explain());
PegasusClient.JobRequest request =
PegasusClient.JobRequest.parseFrom(physicalPlanBytes);
PegasusClient.JobRequest.newBuilder()
.setPlan(ByteString.copyFrom(physicalPlanBytes))
.build();
PegasusClient.JobConfig jobConfig =
PegasusClient.JobConfig.newBuilder()
.setJobId(jobId)
Expand Down
Loading

0 comments on commit 7637aaf

Please sign in to comment.