Skip to content

Commit

Permalink
diskType & streamingEnginer
Browse files Browse the repository at this point in the history
  • Loading branch information
pyalex committed Aug 1, 2020
1 parent a5571ee commit 99b9385
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class DataflowRunnerConfig extends RunnerConfig {
public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
this.project = runnerConfigOptions.getProject();
this.region = runnerConfigOptions.getRegion();
this.zone = runnerConfigOptions.getZone();
this.workerZone = runnerConfigOptions.getWorkerZone();
this.serviceAccount = runnerConfigOptions.getServiceAccount();
this.network = runnerConfigOptions.getNetwork();
this.subnetwork = runnerConfigOptions.getSubnetwork();
Expand All @@ -44,6 +44,8 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
this.deadLetterTableSpec = runnerConfigOptions.getDeadLetterTableSpec();
this.diskSizeGb = runnerConfigOptions.getDiskSizeGb();
this.labels = runnerConfigOptions.getLabelsMap();
this.enableStreamingEngine = runnerConfigOptions.getEnableStreamingEngine();
this.workerDiskType = runnerConfigOptions.getWorkerDiskType();
validate();
}

Expand All @@ -54,7 +56,7 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
@NotBlank public String region;

/* GCP availability zone for operations. */
@NotBlank public String zone;
@NotBlank public String workerZone;

/* Run the job as a specific service account, instead of the default GCE robot. */
public String serviceAccount;
Expand Down Expand Up @@ -91,6 +93,12 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {

public Map<String, String> labels;

/* If true job will be run on StreamingEngine instead of VMs */
public Boolean enableStreamingEngine;

/* Type of persistent disk to be used by workers */
public String workerDiskType;

/** Validates Dataflow runner configuration options */
public void validate() {
ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ feast:
options:
project: my_gcp_project
region: asia-east1
zone: asia-east1-a
workerZone: asia-east1-a
tempLocation: gs://bucket/tempLocation
network: default
subnetwork: regions/asia-east1/subnetworks/mysubnetwork
maxNumWorkers: 1
enableStreamingEngine: false
workerDiskType: compute.googleapis.com/projects/asia-east1-a/diskTypes/pd-ssd
autoscalingAlgorithm: THROUGHPUT_BASED
usePublicIps: false
workerMachineType: n1-standard-1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void setUp() {
Builder optionsBuilder = DataflowRunnerConfigOptions.newBuilder();
optionsBuilder.setProject("project");
optionsBuilder.setRegion("region");
optionsBuilder.setZone("zone");
optionsBuilder.setWorkerZone("zone");
optionsBuilder.setTempLocation("tempLocation");
optionsBuilder.setNetwork("network");
optionsBuilder.setSubnetwork("subnetwork");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException {
DataflowRunnerConfigOptions.newBuilder()
.setProject("my-project")
.setRegion("asia-east1")
.setZone("asia-east1-a")
.setWorkerZone("asia-east1-a")
.setEnableStreamingEngine(true)
.setWorkerDiskType("pd-ssd")
.setTempLocation("gs://bucket/tempLocation")
.setNetwork("default")
.setSubnetwork("regions/asia-east1/subnetworks/mysubnetwork")
Expand All @@ -52,7 +54,7 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException {
Arrays.asList(
"--project=my-project",
"--region=asia-east1",
"--zone=asia-east1-a",
"--workerZone=asia-east1-a",
"--tempLocation=gs://bucket/tempLocation",
"--network=default",
"--subnetwork=regions/asia-east1/subnetworks/mysubnetwork",
Expand All @@ -62,7 +64,9 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException {
"--workerMachineType=n1-standard-1",
"--deadLetterTableSpec=project_id:dataset_id.table_id",
"--diskSizeGb=100",
"--labels={\"key\":\"value\"}")
"--labels={\"key\":\"value\"}",
"--enableStreamingEngine=true",
"--workerDiskType=pd-ssd")
.toArray(String[]::new);
assertThat(args.size(), equalTo(expectedArgs.length));
assertThat(args, containsInAnyOrder(expectedArgs));
Expand All @@ -74,7 +78,7 @@ public void shouldIgnoreOptionalArguments() throws IllegalAccessException {
DataflowRunnerConfigOptions.newBuilder()
.setProject("my-project")
.setRegion("asia-east1")
.setZone("asia-east1-a")
.setWorkerZone("asia-east1-a")
.setTempLocation("gs://bucket/tempLocation")
.setNetwork("default")
.setSubnetwork("regions/asia-east1/subnetworks/mysubnetwork")
Expand All @@ -90,15 +94,16 @@ public void shouldIgnoreOptionalArguments() throws IllegalAccessException {
Arrays.asList(
"--project=my-project",
"--region=asia-east1",
"--zone=asia-east1-a",
"--workerZone=asia-east1-a",
"--tempLocation=gs://bucket/tempLocation",
"--network=default",
"--subnetwork=regions/asia-east1/subnetworks/mysubnetwork",
"--maxNumWorkers=1",
"--autoscalingAlgorithm=THROUGHPUT_BASED",
"--usePublicIps=false",
"--workerMachineType=n1-standard-1",
"--labels={}")
"--labels={}",
"--enableStreamingEngine=false")
.toArray(String[]::new);
assertThat(args.size(), equalTo(expectedArgs.length));
assertThat(args, containsInAnyOrder(expectedArgs));
Expand Down
7 changes: 6 additions & 1 deletion protos/feast/core/Runner.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ message DataflowRunnerConfigOptions {
string region = 2;

/* GCP availability zone for operations. */
string zone = 3;
string workerZone = 3;

/* Run the job as a specific service account, instead of the default GCE robot. */
string serviceAccount = 4;
Expand Down Expand Up @@ -81,4 +81,9 @@ message DataflowRunnerConfigOptions {
/* Disk size to use on each remote Compute Engine worker instance */
int32 diskSizeGb = 14;

/* Run job on Dataflow Streaming Engine instead of creating worker VMs */
bool enableStreamingEngine = 15;

/* Type of persistent disk to be used by workers */
string workerDiskType = 16;
}

0 comments on commit 99b9385

Please sign in to comment.