Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically switch to upload_graph when the graph is large #28621

Merged
merged 13 commits into from
Oct 27, 2023
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ should handle this. ([#25252](https://github.com/apache/beam/issues/25252)).

## Breaking Changes

* `upload_graph` is not needed since the graph now is uploaded automatically when it is larger than 10MB for Java SDK ([PR#28621](https://github.com/apache/beam/pull/28621).
liferoad marked this conversation as resolved.
Show resolved Hide resolved
liferoad marked this conversation as resolved.
Show resolved Hide resolved

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* `org.apache.beam.sdk.io.CountingSource.CounterMark` uses custom `CounterMarkCoder` as a default coder since all Avro-dependent
classes finally moved to `extensions/avro`. In case if it's still required to use `AvroCoder` for `CounterMark`, then,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Utf8;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -1330,15 +1329,26 @@ public DataflowPipelineJob run(Pipeline pipeline) {
hooks.modifyEnvironmentBeforeSubmission(newJob.getEnvironment());
}

// enable upload_graph when the graph is too large
byte[] jobGraphBytes = DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8);
int jobGraphByteSize = jobGraphBytes.length;
if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES
&& !hasExperiment(options, "upload_graph")) {
List<String> experiments = firstNonNull(options.getExperiments(), new ArrayList<>());
experiments.add("upload_graph");
options.setExperiments(ImmutableList.copyOf(experiments));
LOG.info(
"The job graph size ({} in bytes) is larger than {}. Automatically add "
+ "the upload_graph option to experiments.",
jobGraphByteSize,
CREATE_JOB_REQUEST_LIMIT_BYTES);
}

// Upload the job to GCS and remove the graph object from the API call. The graph
// will be downloaded from GCS by the service.
if (hasExperiment(options, "upload_graph")) {
DataflowPackage stagedGraph =
options
.getStager()
.stageToFile(
DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8),
DATAFLOW_GRAPH_FILE_NAME);
options.getStager().stageToFile(jobGraphBytes, DATAFLOW_GRAPH_FILE_NAME);
newJob.getSteps().clear();
newJob.setStepsLocation(stagedGraph.getLocation());
}
Expand Down Expand Up @@ -1398,7 +1408,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
} catch (GoogleJsonResponseException e) {
String errorMessages = "Unexpected errors";
if (e.getDetails() != null) {
if (Utf8.encodedLength(newJob.toString()) >= CREATE_JOB_REQUEST_LIMIT_BYTES) {
if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES) {
errorMessages =
"The size of the serialized JSON representation of the pipeline "
+ "exceeds the allowable limit. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,22 @@ private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
return p;
}

private Pipeline buildDataflowPipelineWithLargeGraph(DataflowPipelineOptions options) {
liferoad marked this conversation as resolved.
Show resolved Hide resolved
options.setStableUniqueNames(CheckEnabled.ERROR);
options.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options);

for (int i = 0; i < 100; i++) {
p.apply("ReadMyFile_" + i, TextIO.read().from("gs://bucket/object"))
.apply("WriteMyFile_" + i, TextIO.write().to("gs://bucket/object"));
}

// Enable the FileSystems API to know about gs:// URIs in this test.
FileSystems.setDefaultPipelineOptions(options);

return p;
}

private static Dataflow buildMockDataflow(Dataflow.Projects.Locations.Jobs mockJobs)
throws IOException {
Dataflow mockDataflowClient = mock(Dataflow.class);
Expand Down Expand Up @@ -824,6 +840,24 @@ public void testUploadGraph() throws IOException {
.startsWith("gs://valid-bucket/temp/staging/dataflow_graph"));
}

/** Test for automatically using upload_graph when the job graph is too large (>10MB). */
@Test
public void testUploadGraphWithAutoUpload() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
Pipeline p = buildDataflowPipelineWithLargeGraph(options);
p.run();

ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
assertTrue(jobCaptor.getValue().getSteps().isEmpty());
assertTrue(
jobCaptor
.getValue()
.getStepsLocation()
.startsWith("gs://valid-bucket/temp/staging/dataflow_graph"));
liferoad marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
public void testUpdateNonExistentPipeline() throws IOException {
thrown.expect(IllegalArgumentException.class);
Expand Down
Loading