From 143498cdb0e43dec3ed40a119611e9ab005411c4 Mon Sep 17 00:00:00 2001 From: bradmiro Date: Wed, 20 May 2020 16:52:26 -0400 Subject: [PATCH] adding hadoopfs and autoscaling samples --- .../java/CreateClusterWithAutoscaling.java | 174 ++++++++++++++++++ dataproc/src/main/java/SubmitHadoopFsJob.java | 101 ++++++++++ .../CreateClusterWithAutoscalingTest.java | 105 +++++++++++ ...InstantiateInlineWorkflowTemplateTest.java | 1 - .../src/test/java/SubmitHadoopFsJobTest.java | 102 ++++++++++ 5 files changed, 482 insertions(+), 1 deletion(-) create mode 100644 dataproc/src/main/java/CreateClusterWithAutoscaling.java create mode 100644 dataproc/src/main/java/SubmitHadoopFsJob.java create mode 100644 dataproc/src/test/java/CreateClusterWithAutoscalingTest.java create mode 100644 dataproc/src/test/java/SubmitHadoopFsJobTest.java diff --git a/dataproc/src/main/java/CreateClusterWithAutoscaling.java b/dataproc/src/main/java/CreateClusterWithAutoscaling.java new file mode 100644 index 00000000000..2a8a6285caf --- /dev/null +++ b/dataproc/src/main/java/CreateClusterWithAutoscaling.java @@ -0,0 +1,174 @@ +/* +* Copyright 2020 Google LLC +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* This sample creates a Dataproc cluster with an autoscaling policy enabled. +* The policy we will be creating mirrors the following YAML representation: +* + workerConfig: + minInstances: 2 + maxInstances: 100 + weight: 1 + secondaryWorkerConfig: + minInstances: 0 + maxInstances: 100 + weight: 1 + basicAlgorithm: + cooldownPeriod: 4m + yarnConfig: + scaleUpFactor: 0.05 + scaleDownFactor: 1.0 + scaleUpMinWorkerFraction: 0.0 + scaleDownMinWorkerFraction: 0.0 + gracefulDecommissionTimeout: 1h +*/ + +// [START dataproc_create_autoscaling_cluster] + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.dataproc.v1.AutoscalingConfig; +import com.google.cloud.dataproc.v1.AutoscalingPolicy; +import com.google.cloud.dataproc.v1.AutoscalingPolicyServiceClient; +import com.google.cloud.dataproc.v1.AutoscalingPolicyServiceSettings; +import com.google.cloud.dataproc.v1.BasicAutoscalingAlgorithm; +import com.google.cloud.dataproc.v1.BasicYarnAutoscalingConfig; +import com.google.cloud.dataproc.v1.Cluster; +import com.google.cloud.dataproc.v1.ClusterConfig; +import com.google.cloud.dataproc.v1.ClusterControllerClient; +import com.google.cloud.dataproc.v1.ClusterControllerSettings; +import com.google.cloud.dataproc.v1.ClusterOperationMetadata; +import com.google.cloud.dataproc.v1.InstanceGroupAutoscalingPolicyConfig; +import com.google.cloud.dataproc.v1.InstanceGroupConfig; +import com.google.cloud.dataproc.v1.RegionName; +import com.google.protobuf.Duration; +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +public class CreateClusterWithAutoscaling { + + public static void createClusterwithAutoscaling() throws IOException, InterruptedException { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String region = "your-project-region"; + String clusterName = "your-cluster-name"; + String autoscalingPolicyName = "your-autoscaling-policy"; + createClusterwithAutoscaling(projectId, region, clusterName, autoscalingPolicyName); + } + + public static void createClusterwithAutoscaling( + String projectId, String region, String clusterName, String autoscalingPolicyName) + throws IOException, InterruptedException { + String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region); + + // Configure the settings for the cluster controller client. + ClusterControllerSettings clusterControllerSettings = + ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build(); + + // Configure the settings for the autoscaling policy service client. + AutoscalingPolicyServiceSettings autoscalingPolicyServiceSettings = + AutoscalingPolicyServiceSettings.newBuilder().setEndpoint(myEndpoint).build(); + + // Create a cluster controller client and an autoscaling controller client with the configured + // settings. The clients only need to be created once and can be reused for multiple requests. + // Using a + // try-with-resources closes the client, but this can also be done manually with the .close() + // method. + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings); + AutoscalingPolicyServiceClient autoscalingPolicyServiceClient = + AutoscalingPolicyServiceClient.create(autoscalingPolicyServiceSettings)) { + + // Create the Autoscaling policy. + InstanceGroupAutoscalingPolicyConfig workerInstanceGroupAutoscalingPolicyConfig = + InstanceGroupAutoscalingPolicyConfig.newBuilder() + .setMinInstances(2) + .setMaxInstances(100) + .setWeight(1) + .build(); + InstanceGroupAutoscalingPolicyConfig secondaryWorkerInstanceGroupAutoscalingPolicyConfig = + InstanceGroupAutoscalingPolicyConfig.newBuilder() + .setMinInstances(0) + .setMaxInstances(100) + .setWeight(1) + .build(); + BasicYarnAutoscalingConfig basicYarnApplicationConfig = + BasicYarnAutoscalingConfig.newBuilder() + .setScaleUpFactor(0.05) + .setScaleDownFactor(1.0) + .setScaleUpMinWorkerFraction(0.0) + .setScaleUpMinWorkerFraction(0.0) + .setGracefulDecommissionTimeout(Duration.newBuilder().setSeconds(3600).build()) + .build(); + BasicAutoscalingAlgorithm basicAutoscalingAlgorithm = + BasicAutoscalingAlgorithm.newBuilder() + .setCooldownPeriod(Duration.newBuilder().setSeconds(240).build()) + .setYarnConfig(basicYarnApplicationConfig) + .build(); + AutoscalingPolicy autoscalingPolicy = + AutoscalingPolicy.newBuilder() + .setId(autoscalingPolicyName) + .setWorkerConfig(workerInstanceGroupAutoscalingPolicyConfig) + .setSecondaryWorkerConfig(secondaryWorkerInstanceGroupAutoscalingPolicyConfig) + .setBasicAlgorithm(basicAutoscalingAlgorithm) + .build(); + RegionName parent = RegionName.of(projectId, region); + + // Policy is uploaded here. + autoscalingPolicyServiceClient.createAutoscalingPolicy(parent, autoscalingPolicy); + + // Now the policy can be referenced when creating a cluster. + String autoscalingPolicyUri = + String.format( + "projects/%s/locations/%s/autoscalingPolicies/%s", + projectId, region, autoscalingPolicyName); + AutoscalingConfig autoscalingConfig = + AutoscalingConfig.newBuilder().setPolicyUri(autoscalingPolicyUri).build(); + + // Configure the settings for our cluster. + InstanceGroupConfig masterConfig = + InstanceGroupConfig.newBuilder() + .setMachineTypeUri("n1-standard-1") + .setNumInstances(1) + .build(); + InstanceGroupConfig workerConfig = + InstanceGroupConfig.newBuilder() + .setMachineTypeUri("n1-standard-1") + .setNumInstances(2) + .build(); + ClusterConfig clusterConfig = + ClusterConfig.newBuilder() + .setMasterConfig(masterConfig) + .setWorkerConfig(workerConfig) + .setAutoscalingConfig(autoscalingConfig) + .build(); + + // Create the cluster object with the desired cluster config. + Cluster cluster = + Cluster.newBuilder().setClusterName(clusterName).setConfig(clusterConfig).build(); + + // Create the Dataproc cluster. + OperationFuture createClusterAsyncRequest = + clusterControllerClient.createClusterAsync(projectId, region, cluster); + Cluster response = createClusterAsyncRequest.get(); + + // Print out a success message. + System.out.printf("Cluster created successfully: %s", response.getClusterName()); + + } catch (ExecutionException e) { + // If cluster creation does not complete successfully, print the error message. + System.err.println(String.format("createClusterWithAutoscaling: %s ", e.getMessage())); + } + } +} +// [END dataproc_create_autoscaling_cluster] diff --git a/dataproc/src/main/java/SubmitHadoopFsJob.java b/dataproc/src/main/java/SubmitHadoopFsJob.java new file mode 100644 index 00000000000..0c26416c41b --- /dev/null +++ b/dataproc/src/main/java/SubmitHadoopFsJob.java @@ -0,0 +1,101 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// [START dataproc_submit_hadoop_fs_job] + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.dataproc.v1.HadoopJob; +import com.google.cloud.dataproc.v1.Job; +import com.google.cloud.dataproc.v1.JobControllerClient; +import com.google.cloud.dataproc.v1.JobControllerSettings; +import com.google.cloud.dataproc.v1.JobMetadata; +import com.google.cloud.dataproc.v1.JobPlacement; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.concurrent.ExecutionException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class SubmitHadoopFsJob { + + public static ArrayList stringToList(String s) { + return new ArrayList<>(Arrays.asList(s.split(" "))); + } + + public static void submitHadoopFsJob() throws IOException, InterruptedException { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String region = "your-project-region"; + String clusterName = "your-cluster-name"; + String hadoopFsQuery = "your-hadoop-fs-query"; + submitHadoopFsJob(projectId, region, clusterName, hadoopFsQuery); + } + + public static void submitHadoopFsJob( + String projectId, String region, String clusterName, String hadoopFsQuery) + throws IOException, InterruptedException { + String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region); + + // Configure the settings for the job controller client. + JobControllerSettings jobControllerSettings = + JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build(); + + // Create a job controller client with the configured settings. Using a try-with-resources + // closes the client, + // but this can also be done manually with the .close() method. + try (JobControllerClient jobControllerClient = + JobControllerClient.create(jobControllerSettings)) { + + // Configure cluster placement for the job. + JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build(); + + // Configure Hadoop job settings. The HadoopFS query is set here. + HadoopJob hadoopJob = + HadoopJob.newBuilder() + .setMainClass("org.apache.hadoop.fs.FsShell") + .addAllArgs(stringToList(hadoopFsQuery)) + .build(); + + Job job = Job.newBuilder().setPlacement(jobPlacement).setHadoopJob(hadoopJob).build(); + + // Submit an asynchronous request to execute the job. + OperationFuture submitJobAsOperationAsyncRequest = + jobControllerClient.submitJobAsOperationAsync(projectId, region, job); + + Job response = submitJobAsOperationAsyncRequest.get(); + + // Print output from Google Cloud Storage. + Matcher matches = + Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri()); + matches.matches(); + + Storage storage = StorageOptions.getDefaultInstance().getService(); + Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2))); + + System.out.println( + String.format("Job finished successfully: %s", new String(blob.getContent()))); + + } catch (ExecutionException e) { + // If the job does not complete successfully, print the error message. + System.err.println(String.format("submitHadoopFSJob: %s ", e.getMessage())); + } + } +} +// [END dataproc_submit_hadoop_fs_job] diff --git a/dataproc/src/test/java/CreateClusterWithAutoscalingTest.java b/dataproc/src/test/java/CreateClusterWithAutoscalingTest.java new file mode 100644 index 00000000000..188ef7afe23 --- /dev/null +++ b/dataproc/src/test/java/CreateClusterWithAutoscalingTest.java @@ -0,0 +1,105 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static junit.framework.TestCase.assertNotNull; +import static org.hamcrest.MatcherAssert.assertThat; + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.dataproc.v1.AutoscalingPolicyName; +import com.google.cloud.dataproc.v1.AutoscalingPolicyServiceClient; +import com.google.cloud.dataproc.v1.AutoscalingPolicyServiceSettings; +import com.google.cloud.dataproc.v1.ClusterControllerClient; +import com.google.cloud.dataproc.v1.ClusterControllerSettings; +import com.google.cloud.dataproc.v1.ClusterOperationMetadata; +import com.google.protobuf.Empty; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class CreateClusterWithAutoscalingTest { + + private static final String CLUSTER_NAME = + String.format("java-as-test-%s", UUID.randomUUID().toString()); + private static final String REGION = "us-central1"; + private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT"); + private static final String AUTOSCALING_POLICY_NAME = + String.format("java-as-test-%s", UUID.randomUUID().toString()); + + private ByteArrayOutputStream bout; + + private static void requireEnv(String varName) { + assertNotNull( + String.format("Environment variable '%s' is required to perform these tests.", varName), + System.getenv(varName)); + } + + @BeforeClass + public static void checkRequirements() { + requireEnv("GOOGLE_APPLICATION_CREDENTIALS"); + requireEnv("GOOGLE_CLOUD_PROJECT"); + } + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + System.setOut(new PrintStream(bout)); + } + + @After + public void tearDown() throws IOException, InterruptedException, ExecutionException { + String myEndpoint = String.format("%s-dataproc.googleapis.com:443", REGION); + + ClusterControllerSettings clusterControllerSettings = + ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build(); + + AutoscalingPolicyServiceSettings autoscalingPolicyServiceSettings = + AutoscalingPolicyServiceSettings.newBuilder().setEndpoint(myEndpoint).build(); + + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings); + AutoscalingPolicyServiceClient autoscalingPolicyServiceClient = + AutoscalingPolicyServiceClient.create(autoscalingPolicyServiceSettings)) { + + OperationFuture deleteClusterAsyncRequest = + clusterControllerClient.deleteClusterAsync(PROJECT_ID, REGION, CLUSTER_NAME); + deleteClusterAsyncRequest.get(); + + AutoscalingPolicyName name = + AutoscalingPolicyName.ofProjectLocationAutoscalingPolicyName( + PROJECT_ID, REGION, AUTOSCALING_POLICY_NAME); + autoscalingPolicyServiceClient.deleteAutoscalingPolicy(name); + } + } + + @Test + public void createClusterWithAutoscalingTest() throws IOException, InterruptedException { + CreateClusterWithAutoscaling.createClusterwithAutoscaling( + PROJECT_ID, REGION, CLUSTER_NAME, AUTOSCALING_POLICY_NAME); + String output = bout.toString(); + + assertThat(output, CoreMatchers.containsString(CLUSTER_NAME)); + } +} diff --git a/dataproc/src/test/java/InstantiateInlineWorkflowTemplateTest.java b/dataproc/src/test/java/InstantiateInlineWorkflowTemplateTest.java index 2422c221a74..0216ff36bea 100644 --- a/dataproc/src/test/java/InstantiateInlineWorkflowTemplateTest.java +++ b/dataproc/src/test/java/InstantiateInlineWorkflowTemplateTest.java @@ -30,7 +30,6 @@ @RunWith(JUnit4.class) public class InstantiateInlineWorkflowTemplateTest { - private static final String REGION = "us-central1"; private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT"); diff --git a/dataproc/src/test/java/SubmitHadoopFsJobTest.java b/dataproc/src/test/java/SubmitHadoopFsJobTest.java new file mode 100644 index 00000000000..973efa77916 --- /dev/null +++ b/dataproc/src/test/java/SubmitHadoopFsJobTest.java @@ -0,0 +1,102 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static junit.framework.TestCase.assertNotNull; +import static org.hamcrest.MatcherAssert.assertThat; + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.dataproc.v1.Cluster; +import com.google.cloud.dataproc.v1.ClusterControllerClient; +import com.google.cloud.dataproc.v1.ClusterControllerSettings; +import com.google.cloud.dataproc.v1.ClusterOperationMetadata; +import com.google.protobuf.Empty; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class SubmitHadoopFsJobTest { + + private static final String CLUSTER_NAME = + String.format("java-fs-test--%s", UUID.randomUUID().toString()); + private static final String REGION = "us-central1"; + private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT"); + private static final String ENDPOINT = String.format("%s-dataproc.googleapis.com:443", REGION); + private static final String HADOOP_FS_QUERY = "-ls /"; + + private ByteArrayOutputStream bout; + + private static void requireEnv(String varName) { + assertNotNull( + String.format("Environment variable '%s' is required to perform these tests.", varName), + System.getenv(varName)); + } + + @BeforeClass + public static void checkRequirements() { + requireEnv("GOOGLE_APPLICATION_CREDENTIALS"); + requireEnv("GOOGLE_CLOUD_PROJECT"); + } + + @Before + public void setUp() throws IOException, ExecutionException, InterruptedException { + bout = new ByteArrayOutputStream(); + System.setOut(new PrintStream(bout)); + + ClusterControllerSettings clusterControllerSettings = + ClusterControllerSettings.newBuilder().setEndpoint(ENDPOINT).build(); + + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings)) { + // Create the Dataproc cluster. + Cluster cluster = Cluster.newBuilder().setClusterName(CLUSTER_NAME).build(); + OperationFuture createClusterAsyncRequest = + clusterControllerClient.createClusterAsync(PROJECT_ID, REGION, cluster); + createClusterAsyncRequest.get(); + } + } + + @Test + public void submitHadoopFsJobTest() throws IOException, InterruptedException { + SubmitHadoopFsJob.submitHadoopFsJob(PROJECT_ID, REGION, CLUSTER_NAME, HADOOP_FS_QUERY); + String output = bout.toString(); + + assertThat(output, CoreMatchers.containsString("/tmp")); + } + + @After + public void tearDown() throws IOException, InterruptedException, ExecutionException { + + ClusterControllerSettings clusterControllerSettings = + ClusterControllerSettings.newBuilder().setEndpoint(ENDPOINT).build(); + + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings)) { + OperationFuture deleteClusterAsyncRequest = + clusterControllerClient.deleteClusterAsync(PROJECT_ID, REGION, CLUSTER_NAME); + deleteClusterAsyncRequest.get(); + } + } +}