diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowRunnerConfig.java b/core/src/main/java/feast/core/job/dataflow/DataflowRunnerConfig.java index 804d258f46..a87afa1bcf 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowRunnerConfig.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowRunnerConfig.java @@ -32,7 +32,7 @@ public class DataflowRunnerConfig extends RunnerConfig { public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) { this.project = runnerConfigOptions.getProject(); this.region = runnerConfigOptions.getRegion(); - this.zone = runnerConfigOptions.getZone(); + this.workerZone = runnerConfigOptions.getWorkerZone(); this.serviceAccount = runnerConfigOptions.getServiceAccount(); this.network = runnerConfigOptions.getNetwork(); this.subnetwork = runnerConfigOptions.getSubnetwork(); @@ -44,6 +44,8 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) { this.deadLetterTableSpec = runnerConfigOptions.getDeadLetterTableSpec(); this.diskSizeGb = runnerConfigOptions.getDiskSizeGb(); this.labels = runnerConfigOptions.getLabelsMap(); + this.enableStreamingEngine = runnerConfigOptions.getEnableStreamingEngine(); + this.workerDiskType = runnerConfigOptions.getWorkerDiskType(); validate(); } @@ -54,7 +56,7 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) { @NotBlank public String region; /* GCP availability zone for operations. */ - @NotBlank public String zone; + @NotBlank public String workerZone; /* Run the job as a specific service account, instead of the default GCE robot. */ public String serviceAccount; @@ -91,6 +93,12 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) { public Map labels; + /* If true job will be run on StreamingEngine instead of VMs */ + public Boolean enableStreamingEngine; + + /* Type of persistent disk to be used by workers */ + public String workerDiskType; + /** Validates Dataflow runner configuration options */ public void validate() { ValidatorFactory factory = Validation.buildDefaultValidatorFactory(); diff --git a/core/src/main/resources/application.yml b/core/src/main/resources/application.yml index d3d5b90952..69ed090a0f 100644 --- a/core/src/main/resources/application.yml +++ b/core/src/main/resources/application.yml @@ -42,11 +42,13 @@ feast: options: project: my_gcp_project region: asia-east1 - zone: asia-east1-a + workerZone: asia-east1-a tempLocation: gs://bucket/tempLocation network: default subnetwork: regions/asia-east1/subnetworks/mysubnetwork maxNumWorkers: 1 + enableStreamingEngine: false + workerDiskType: compute.googleapis.com/projects/asia-east1-a/diskTypes/pd-ssd autoscalingAlgorithm: THROUGHPUT_BASED usePublicIps: false workerMachineType: n1-standard-1 diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java index e110bf6ee2..3250c1d42b 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java @@ -81,7 +81,7 @@ public void setUp() { Builder optionsBuilder = DataflowRunnerConfigOptions.newBuilder(); optionsBuilder.setProject("project"); optionsBuilder.setRegion("region"); - optionsBuilder.setZone("zone"); + optionsBuilder.setWorkerZone("zone"); optionsBuilder.setTempLocation("tempLocation"); optionsBuilder.setNetwork("network"); optionsBuilder.setSubnetwork("subnetwork"); diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowRunnerConfigTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowRunnerConfigTest.java index 925e48aec1..9c6b5a085c 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowRunnerConfigTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowRunnerConfigTest.java @@ -33,7 +33,9 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException { DataflowRunnerConfigOptions.newBuilder() .setProject("my-project") .setRegion("asia-east1") - .setZone("asia-east1-a") + .setWorkerZone("asia-east1-a") + .setEnableStreamingEngine(true) + .setWorkerDiskType("pd-ssd") .setTempLocation("gs://bucket/tempLocation") .setNetwork("default") .setSubnetwork("regions/asia-east1/subnetworks/mysubnetwork") @@ -52,7 +54,7 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException { Arrays.asList( "--project=my-project", "--region=asia-east1", - "--zone=asia-east1-a", + "--workerZone=asia-east1-a", "--tempLocation=gs://bucket/tempLocation", "--network=default", "--subnetwork=regions/asia-east1/subnetworks/mysubnetwork", @@ -62,7 +64,9 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException { "--workerMachineType=n1-standard-1", "--deadLetterTableSpec=project_id:dataset_id.table_id", "--diskSizeGb=100", - "--labels={\"key\":\"value\"}") + "--labels={\"key\":\"value\"}", + "--enableStreamingEngine=true", + "--workerDiskType=pd-ssd") .toArray(String[]::new); assertThat(args.size(), equalTo(expectedArgs.length)); assertThat(args, containsInAnyOrder(expectedArgs)); @@ -74,7 +78,7 @@ public void shouldIgnoreOptionalArguments() throws IllegalAccessException { DataflowRunnerConfigOptions.newBuilder() .setProject("my-project") .setRegion("asia-east1") - .setZone("asia-east1-a") + .setWorkerZone("asia-east1-a") .setTempLocation("gs://bucket/tempLocation") .setNetwork("default") .setSubnetwork("regions/asia-east1/subnetworks/mysubnetwork") @@ -90,7 +94,7 @@ public void shouldIgnoreOptionalArguments() throws IllegalAccessException { Arrays.asList( "--project=my-project", "--region=asia-east1", - "--zone=asia-east1-a", + "--workerZone=asia-east1-a", "--tempLocation=gs://bucket/tempLocation", "--network=default", "--subnetwork=regions/asia-east1/subnetworks/mysubnetwork", @@ -98,7 +102,8 @@ public void shouldIgnoreOptionalArguments() throws IllegalAccessException { "--autoscalingAlgorithm=THROUGHPUT_BASED", "--usePublicIps=false", "--workerMachineType=n1-standard-1", - "--labels={}") + "--labels={}", + "--enableStreamingEngine=false") .toArray(String[]::new); assertThat(args.size(), equalTo(expectedArgs.length)); assertThat(args, containsInAnyOrder(expectedArgs)); diff --git a/protos/feast/core/Runner.proto b/protos/feast/core/Runner.proto index 0684356f8d..9bb4457d4f 100644 --- a/protos/feast/core/Runner.proto +++ b/protos/feast/core/Runner.proto @@ -45,7 +45,7 @@ message DataflowRunnerConfigOptions { string region = 2; /* GCP availability zone for operations. */ - string zone = 3; + string workerZone = 3; /* Run the job as a specific service account, instead of the default GCE robot. */ string serviceAccount = 4; @@ -81,4 +81,9 @@ message DataflowRunnerConfigOptions { /* Disk size to use on each remote Compute Engine worker instance */ int32 diskSizeGb = 14; + /* Run job on Dataflow Streaming Engine instead of creating worker VMs */ + bool enableStreamingEngine = 15; + + /* Type of persistent disk to be used by workers */ + string workerDiskType = 16; } \ No newline at end of file