Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"revision": 1}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 2
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"https://github.com/apache/beam/pull/32648": "testing flink 1.19 support"
"revision": 1
}
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_XVR_Direct.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"https://github.com/apache/beam/pull/32648": "testing Flink 1.19 support",
"modification": 1
"modification": 2
}
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_XVR_Samza.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{}
{"modification": 1}
17 changes: 2 additions & 15 deletions .github/workflows/beam_PostCommit_XVR_Direct.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,29 +74,16 @@ jobs:
- name: Setup environment
uses: ./.github/actions/setup-environment-action
with:
python-version: |
3.9
${{ matrix.python_version }}
python-version: ${{ matrix.python_version }}
- name: run PostCommit XVR Direct script
env:
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
if: ${{ matrix.python_version != '3.9' }}
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:python:test-suites:direct:xlang:validatesCrossLanguageRunner
arguments: |
-PpythonVersion=${{ matrix.python_version }} \
-PskipNonPythonTask=true \
- name: run PostCommit XVR Direct script
env:
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
if: ${{ matrix.python_version == '3.9' }}
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:python:test-suites:direct:xlang:validatesCrossLanguageRunner
arguments: |
-PpythonVersion=${{ matrix.python_version }} \
-PskipNonPythonTask=false \
-PskipNonPythonTask=${{ (matrix.python_version == '3.9' && true) || false }} \
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
Expand Down
17 changes: 2 additions & 15 deletions .github/workflows/beam_PostCommit_XVR_Flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,29 +75,16 @@ jobs:
- name: Setup environment
uses: ./.github/actions/setup-environment-action
with:
python-version: |
3.9
${{ matrix.python_version }}
python-version: ${{ matrix.python_version }}
- name: run PostCommit XVR Flink script
env:
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
if: ${{ matrix.python_version != '3.9' }}
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:flink:${{ env.FlinkVersion }}:job-server:validatesCrossLanguageRunner
arguments: |
-PpythonVersion=${{ matrix.python_version }} \
-PskipNonPythonTask=true \
- name: run PostCommit XVR Flink script
env:
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
if: ${{ matrix.python_version == '3.9' }}
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:flink:${{ env.FlinkVersion }}:job-server:validatesCrossLanguageRunner
arguments: |
-PpythonVersion=${{ matrix.python_version }} \
-PskipNonPythonTask=false \
-PskipNonPythonTask=${{ (matrix.python_version == '3.9' && true) || false }} \
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
Expand Down
19 changes: 5 additions & 14 deletions .github/workflows/beam_PostCommit_XVR_Samza.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,21 @@ jobs:
uses: ./.github/actions/setup-environment-action
# TODO(https://github.com/apache/beam/issues/32208) move to Java11 after bump to Samza 1.8
with:
java-version: 8
java-version: |
11
8
python-version: |
3.9
${{ matrix.python_version }}
- name: run PostCommit XVR Samza script
env:
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
if: ${{ matrix.python_version != '3.9' }}
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:samza:job-server:validatesCrossLanguageRunner
arguments: |
-PpythonVersion=${{ matrix.python_version }} \
-PskipNonPythonTask=true \
- name: run PostCommit XVR Samza script
env:
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
if: ${{ matrix.python_version == '3.9' }}
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:samza:job-server:validatesCrossLanguageRunner
arguments: |
-PpythonVersion=${{ matrix.python_version }} \
-PskipNonPythonTask=false \
-Pjava11Home=$JAVA_HOME_11_X64 \
-PskipNonPythonTask=${{ (matrix.python_version == '3.9' && true) || false }} \
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
Expand Down
17 changes: 2 additions & 15 deletions .github/workflows/beam_PostCommit_XVR_Spark3.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,29 +74,16 @@ jobs:
- name: Setup environment
uses: ./.github/actions/setup-environment-action
with:
python-version: |
3.9
${{ matrix.python_version }}
python-version: ${{ matrix.python_version }}
- name: run PostCommit XVR Spark3 script
env:
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
if: ${{ matrix.python_version != '3.9' }}
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:spark:3:job-server:validatesCrossLanguageRunner
arguments: |
-PpythonVersion=${{ matrix.python_version }} \
-PskipNonPythonTask=true \
- name: run PostCommit XVR Spark3 script
env:
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH}}
if: ${{ matrix.python_version == '3.9' }}
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:spark:3:job-server:validatesCrossLanguageRunner
arguments: |
-PpythonVersion=${{ matrix.python_version }} \
-PskipNonPythonTask=false \
-PskipNonPythonTask=${{ (matrix.python_version == '3.9' && true) || false }} \
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
Expand Down
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
## Beam 3.0.0 Development Highlights

* New highly anticipated feature ([X](https://github.com/apache/beam/issues/X)) to address Milestone Y ([#Y](https://github.com/apache/beam/issues/Y)).
* [Java] Java 8 support is now deprecated. It is still supported until Beam 3.
From now, pipeline submitted by Java 8 client uses Java 11 SDK container for
remote pipeline execution ([35064](https://github.com/apache/beam/pull/35064)).

## Highlights

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,18 +454,15 @@ class BeamModulePlugin implements Plugin<Project> {
return 'beam' + p.path.replace(':', '-')
}

static def getSupportedJavaVersion() {
if (JavaVersion.current() == JavaVersion.VERSION_1_8) {
return 'java8'
} else if (JavaVersion.current() == JavaVersion.VERSION_11) {
/** Get version for Java SDK container */
static def getSupportedJavaVersion(String assignedVersion = null) {
JavaVersion ver = assignedVersion ? JavaVersion.toVersion(assignedVersion) : JavaVersion.current()
if (ver <= JavaVersion.VERSION_11) {
return 'java11'
} else if (JavaVersion.current() == JavaVersion.VERSION_17) {
} else if (ver <= JavaVersion.VERSION_17) {
return 'java17'
} else if (JavaVersion.current() == JavaVersion.VERSION_21) {
return 'java21'
} else {
String exceptionMessage = "Your Java version is unsupported. You need Java version of 8, 11, 17 or 21 to get started, but your Java version is: " + JavaVersion.current();
throw new GradleException(exceptionMessage)
return 'java21'
}
}

Expand Down Expand Up @@ -2653,8 +2650,8 @@ class BeamModulePlugin implements Plugin<Project> {
// see https://issues.apache.org/jira/browse/BEAM-6698
maxHeapSize = '4g'
if (config.environment == PortableValidatesRunnerConfiguration.Environment.DOCKER) {
def ver = project.findProperty('testJavaVersion')
def javaContainerSuffix = ver ? "java$ver" : getSupportedJavaVersion()
String ver = project.findProperty('testJavaVersion')
def javaContainerSuffix = getSupportedJavaVersion(ver)
dependsOn ":sdks:java:container:${javaContainerSuffix}:docker"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,23 +492,13 @@ && isServiceEndpoint(dataflowOptions.getDataflowEndpoint())) {
+ "' invalid. Please make sure the value is non-negative.");
}

// Verify that if recordJfrOnGcThrashing is set, the pipeline is at least on java 11
if (dataflowOptions.getRecordJfrOnGcThrashing()
&& Environments.getJavaVersion() == Environments.JavaVersion.java8) {
throw new IllegalArgumentException(
"recordJfrOnGcThrashing is only supported on java 9 and up.");
}

if (dataflowOptions.isStreaming() && dataflowOptions.getGcsUploadBufferSizeBytes() == null) {
dataflowOptions.setGcsUploadBufferSizeBytes(GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT);
}

// Adding the Java version to the SDK name for user's and support convenience.
String agentJavaVer = "(JRE 8 environment)";
if (Environments.getJavaVersion() != Environments.JavaVersion.java8) {
agentJavaVer =
String.format("(JRE %s environment)", Environments.getJavaVersion().specification());
}
String agentJavaVer =
String.format("(JRE %s environment)", Environments.getJavaVersion().specification());

DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
String userAgentName = dataflowRunnerInfo.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ fnapi.environment.major.version=@dataflow.fnapi_environment_major_version@
legacy.container.version=@dataflow.legacy_container_version@
fnapi.container.version=@dataflow.fnapi_container_version@
container.base_repository=@dataflow.container_base_repository@



Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import javax.servlet.http.HttpServletResponse;
import org.apache.beam.runners.dataflow.worker.status.DebugCapture.Capturable;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.apache.beam.sdk.util.construction.Environments;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletHandler;
Expand Down Expand Up @@ -59,9 +58,7 @@ public class WorkerStatusPages {
addServlet(threadzServlet);
addServlet(new HealthzServlet(healthyIndicator));
addServlet(new HeapzServlet(memoryMonitor));
if (Environments.getJavaVersion() != Environments.JavaVersion.java8) {
addServlet(new JfrzServlet(memoryMonitor));
}
addServlet(new JfrzServlet(memoryMonitor));
addServlet(statuszServlet);

// Add default capture pages (threadz, statusz)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.sdk.util.construction.Environments;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -246,10 +245,6 @@ public static MemoryMonitor fromOptions(PipelineOptions options) {

Duration jfrProfileDuration;
if (uploadToGCSPath != null && debugOptions.getRecordJfrOnGcThrashing()) {
if (Environments.getJavaVersion() == Environments.JavaVersion.java8) {
throw new IllegalArgumentException(
"recordJfrOnGcThrashing is only supported on java 9 and up.");
}
jfrProfileDuration = Duration.ofSeconds(debugOptions.getJfrRecordingDurationSec());
} else {
jfrProfileDuration = null;
Expand Down Expand Up @@ -314,13 +309,7 @@ private MemoryMonitor(
this.localDumpFolder = localDumpFolder;
this.workerId = workerId;
this.clock = clock;

if (Environments.getJavaVersion() != Environments.JavaVersion.java8) {
LOG.info("Uploading JFR profiles when GC thrashing is detected");
this.jfrInterop = new JfrInterop();
} else {
this.jfrInterop = null;
}
this.jfrInterop = new JfrInterop();
}

/** For testing only: Wait for the monitor to be running. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@
package org.apache.beam.runners.dataflow.worker.util;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeThat;

import java.io.File;
import java.io.IOException;
Expand All @@ -34,7 +31,6 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.util.construction.Environments;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -168,8 +164,6 @@ public void uploadToGcs() throws Exception {

@Test
public void uploadJfrProfilesOnThrashing() throws Exception {
assumeThat(Environments.getJavaVersion(), is(not(Environments.JavaVersion.java8)));

File remoteFolder = tempFolder.newFolder();
monitor =
MemoryMonitor.forTest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public class Environments {
.build();

public enum JavaVersion {
java8("java", "1.8", 8),
java11("java11", "11", 11),
java17("java17", "17", 17),
java21("java21", "21", 21);
Expand Down Expand Up @@ -131,20 +130,30 @@ public static JavaVersion forSpecification(String specification) {
}
}

JavaVersion fallback = null;
if (specification.startsWith("1.")) {
// for Java 8 and below
specification = specification.substring(2);
}
int specificationInt = Integer.parseInt(specification);
JavaVersion fallback = java21;
int minDistance = Integer.MAX_VALUE;
for (JavaVersion candidate : JavaVersion.values()) {
int distance = Math.abs(candidate.specificationInt - specificationInt);
if (distance <= minDistance) {
int distance = candidate.specificationInt - specificationInt;
if (distance >= 0 && distance <= minDistance) {
fallback = candidate;
minDistance = distance;
}
}
LOG.warn(
"Unsupported Java version: {}, falling back to: {}",
specification,
fallback.specification);
if (specification.equals("8")) {
LOG.warn(
"Java8 support is now deprecated and targeted for removal for Beam 3. Falling back to: {}",
fallback.specification);
} else {
LOG.warn(
"Unsupported Java version: {}, falling back to: {}",
specification,
fallback.specification);
}
return fallback;
}

Expand Down
Loading
Loading