Skip to content

Commit

Permalink
Merge branch 'master' into ordered-list-state-2
Browse files Browse the repository at this point in the history
  • Loading branch information
shunping committed Jun 13, 2024
2 parents c158b32 + 0bf4307 commit f4c3440
Show file tree
Hide file tree
Showing 66 changed files with 1,544 additions and 216 deletions.
1 change: 1 addition & 0 deletions .github/workflows/beam_PreCommit_Java_IOs_Direct.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ on:
paths:
- "sdks/java/io/common/**"
- "sdks/java/core/src/main/**"
- "buildSrc/**"
- ".github/workflows/beam_PreCommit_Java_IOs_Direct.yml"
pull_request_target:
branches: ['master', 'release-*']
Expand Down
126 changes: 126 additions & 0 deletions .github/workflows/beam_PreCommit_Java_Solace_IO_Direct.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: PreCommit Java Solace IO Direct

on:
push:
tags: ['v*']
branches: ['master', 'release-*']
paths:
- "sdks/java/io/solace/**"
- ".github/workflows/beam_PreCommit_Java_Solace_IO_Direct.yml"
pull_request_target:
branches: ['master', 'release-*']
paths:
- "sdks/java/io/solace/**"
- 'release/trigger_all_tests.json'
- '.github/trigger_files/beam_PreCommit_Java_Solace_IO_Direct.json'
issue_comment:
types: [created]
schedule:
- cron: '45 1/6 * * *'
workflow_dispatch:

#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
permissions:
actions: write
pull-requests: write
checks: write
contents: read
deployments: read
id-token: none
issues: write
discussions: read
packages: read
pages: read
repository-projects: read
security-events: read
statuses: read

# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}'
cancel-in-progress: true

env:
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}

jobs:
beam_PreCommit_Java_Solace_IO_Direct:
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
strategy:
matrix:
job_name: ["beam_PreCommit_Java_Solace_IO_Direct"]
job_phrase: ["Run Java_Solace_IO_Direct PreCommit"]
timeout-minutes: 60
if: |
github.event_name == 'push' ||
github.event_name == 'pull_request_target' ||
(github.event_name == 'schedule' && github.repository == 'apache/beam') ||
github.event_name == 'workflow_dispatch' ||
github.event.comment.body == 'Run Java_Solace_IO_Direct PreCommit'
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v4
- name: Setup repository
uses: ./.github/actions/setup-action
with:
comment_phrase: ${{ matrix.job_phrase }}
github_token: ${{ secrets.GITHUB_TOKEN }}
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
- name: Setup environment
uses: ./.github/actions/setup-environment-action
- name: run Solace IO build script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:java:io:solace:build
arguments: |
-PdisableSpotlessCheck=true \
-PdisableCheckStyle=true \
- name: run Solace IO IT script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:java:io:solace:integrationTest
arguments: |
-PdisableSpotlessCheck=true \
-PdisableCheckStyle=true \
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
with:
name: JUnit Test Results
path: "**/build/reports/tests/"
- name: Publish JUnit Test Results
uses: EnricoMi/publish-unit-test-result-action@v2
if: always()
with:
commit: '${{ env.prsha || env.GITHUB_SHA }}'
comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }}
files: '**/build/test-results/**/*.xml'
- name: Archive SpotBugs Results
uses: actions/upload-artifact@v4
if: always()
with:
name: SpotBugs Results
path: '**/build/reports/spotbugs/*.html'
- name: Publish SpotBugs Results
uses: jwgmeligmeyling/spotbugs-github-action@v1.2
if: always()
with:
name: Publish SpotBugs
path: '**/build/reports/spotbugs/*.html'
1 change: 1 addition & 0 deletions .github/workflows/playground_backend_precommit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ jobs:
sudo chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg
sudo apt-get update --yes
sudo apt-get install sbt --yes
sudo wget https://codeload.github.com/spotify/scio.g8/zip/7c1ba7c1651dfd70976028842e721da4107c0d6d -O scio.g8.zip && unzip scio.g8.zip && mv scio.g8-7c1ba7c1651dfd70976028842e721da4107c0d6d /opt/scio.g8
- name: Set up Cloud SDK and its components
uses: google-github-actions/setup-gcloud@v0
with:
Expand Down
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@
* DataFrame API now supports pandas 2.1.x and adds 12 more string functions for Series.([#31185](https://github.com/apache/beam/pull/31185)).
* Added BigQuery handler for enrichment transform (Python) ([#31295](https://github.com/apache/beam/pull/31295))
* Disable soft delete policy when creating the default bucket for a project (Java) ([#31324](https://github.com/apache/beam/pull/31324)).
* Added `DoFn.SetupContextParam` and `DoFn.BundleContextParam` which can be used
as a python `DoFn.process`, `Map`, or `FlatMap` parameter to invoke a context
manager per DoFn setup or bundle (analogous to using `setup`/`teardown`
or `start_bundle`/`finish_bundle` respectively.)
* Go SDK Prism Runner
* Pre-built Prism binaries are now part of the release and are available via the Github release page. ([#29697](https://github.com/apache/beam/issues/29697)).
* ProcessingTime is now handled synthetically with TestStream pipelines and Non-TestStream pipelines, for fast test pipeline execution by default. ([#30083](https://github.com/apache/beam/issues/30083)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ class BeamModulePlugin implements Plugin<Project> {
def spark2_version = "2.4.8"
def spark3_version = "3.2.2"
def spotbugs_version = "4.0.6"
def testcontainers_version = "1.17.3"
def testcontainers_version = "1.19.7"
// [bomupgrader] determined by: org.apache.arrow:arrow-memory-core, consistent with: google_cloud_platform_libraries_bom
def arrow_version = "15.0.2"
def jmh_version = "1.34"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ private void generateDataAndWrite(BigQueryIO.Write<byte[]> writeIO) throws IOExc
.withSchema(schema)
.withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(tempLocation)));

String experiments =
configuration.writeMethod.equals(STORAGE_API_AT_LEAST_ONCE_METHOD)
? GcpOptions.STREAMING_ENGINE_EXPERIMENT + ",streaming_mode_at_least_once"
: GcpOptions.STREAMING_ENGINE_EXPERIMENT;

PipelineLauncher.LaunchConfig options =
PipelineLauncher.LaunchConfig.builder("write-bigquery")
.setSdk(PipelineLauncher.Sdk.JAVA)
Expand All @@ -307,7 +312,7 @@ private void generateDataAndWrite(BigQueryIO.Write<byte[]> writeIO) throws IOExc
.toString())
.addParameter("numWorkers", String.valueOf(configuration.numWorkers))
.addParameter("maxNumWorkers", String.valueOf(configuration.maxNumWorkers))
.addParameter("experiments", GcpOptions.STREAMING_ENGINE_EXPERIMENT)
.addParameter("experiments", experiments)
.build();

PipelineLauncher.LaunchInfo launchInfo = pipelineLauncher.launch(project, region, options);
Expand All @@ -329,9 +334,17 @@ private void generateDataAndWrite(BigQueryIO.Write<byte[]> writeIO) throws IOExc

// Depending on writing method there might be duplicates on different sides (read or write).
if (configuration.writeMethod.equals(STORAGE_API_AT_LEAST_ONCE_METHOD)) {
assertTrue(rowCount >= numRecords);
assertTrue(
String.format(
"Number of rows in the table (%d) is less than the expected number (%d). Missing records: %d",
rowCount, (long) numRecords, (long) numRecords - rowCount),
rowCount >= numRecords);
} else {
assertTrue(numRecords >= rowCount);
assertTrue(
String.format(
"Number of rows in the table (%d) is greater than the expected number (%d).",
rowCount, (long) numRecords),
numRecords >= rowCount);
}

// export metrics
Expand Down
4 changes: 2 additions & 2 deletions local-env-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ if [ "$kernelname" = "Linux" ]; then
exit
fi

for ver in 3.8 3.9 3.10 3.11 3; do
for ver in 3.8 3.9 3.10 3.11 3.12 3; do
apt install --yes python$ver-venv
done

Expand Down Expand Up @@ -89,7 +89,7 @@ elif [ "$kernelname" = "Darwin" ]; then
echo "Installing openjdk@8"
brew install openjdk@8
fi
for ver in 3.8 3.9 3.10 3.11; do
for ver in 3.8 3.9 3.10 3.11 3.12; do
if brew ls --versions python@$ver > /dev/null; then
echo "python@$ver already installed. Skipping"
brew info python@$ver
Expand Down
6 changes: 6 additions & 0 deletions playground/backend/containers/scio/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ RUN chown -R appuser:appgroup /opt/playground/backend/executable_files/ && chmod
RUN mkdir -p /opt/sbt-template
RUN chown -R appuser:appgroup /opt/sbt-template

#Download spotify g8 template at specific commit
ARG g8_commit=7c1ba7c1651dfd70976028842e721da4107c0d6d
RUN wget https://codeload.github.com/spotify/scio.g8/zip/$g8_commit -O scio.g8.zip && unzip scio.g8.zip && mv scio.g8-$g8_commit /opt/scio.g8

# Switch to appuser
USER appuser

Expand All @@ -88,6 +92,8 @@ WORKDIR /opt/sbt-template
RUN /opt/playground/backend/new_scio_project.sh
WORKDIR /opt/sbt-template/scio
RUN sbt "+compile"
#chmod is a fix for sbt scala output permission issue that is known in scala-js https://github.com/scala-js/scala-js/issues/4212
RUN chmod -R 0644 /opt/sbt-template/scio/target/scala-*/zinc/*.zip
WORKDIR /

USER root
Expand Down
2 changes: 1 addition & 1 deletion playground/backend/env_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
GO_LINTER_VERSION=1.51.2
GO_LINTER_VERSION=1.59.1

# Install GO Linter
#wget https://github.com/golangci/golangci-lint/releases/download/v1.42.1/golangci-lint-$GO_LINTER_VERSION-linux-amd64.deb
Expand Down
3 changes: 2 additions & 1 deletion playground/backend/new_scio_project.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
if [ -d /opt/sbt-template/scio ]; then
cp -r /opt/sbt-template/scio scio
else
{ printf scio\\nscio\\n; yes; } | sbt new spotify/scio-template.g8
{ printf scio\\nscio\\n; yes; } | sbt new file:///opt/scio.g8

echo "Compile / run / fork := false" >> scio/build.sbt
echo "updateOptions := updateOptions.value.withCachedResolution(true)" >> scio/build.sbt
echo "libraryDependencies ++= Seq(\"org.apache.beam\" % \"beam-runners-direct-java\" % beamVersion)" >> scio/build.sbt
fi

Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
clientId,
configFetcherAndWindmillClient.getLeft(),
computationStateCache,
WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()),
windmillStateCache,
workExecutor,
IntrinsicMapTaskExecutorFactory.defaultFactory(),
options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
*/
@NotThreadSafe
@Internal
public class Work {
public final class Work {
private final ShardedKey shardedKey;
private final WorkItem workItem;
private final ProcessingContext processingContext;
Expand Down Expand Up @@ -196,8 +196,7 @@ public String getLatencyTrackingId() {
return latencyTrackingId;
}

public final void queueCommit(
WorkItemCommitRequest commitRequest, ComputationState computationState) {
public void queueCommit(WorkItemCommitRequest commitRequest, ComputationState computationState) {
setState(State.COMMIT_QUEUED);
processingContext.workCommitter().accept(Commit.create(commitRequest, computationState, this));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard;

Expand Down Expand Up @@ -224,9 +225,10 @@ private void executeMonitorHeld(Runnable work, long workBytes) {
() -> {
String threadName = Thread.currentThread().getName();
try {
if (work instanceof Work) {
if (work instanceof ExecutableWork) {
String workToken =
String.format("%016x", ((Work) work).getWorkItem().getWorkToken());
debugFormattedWorkToken(
((ExecutableWork) work).work().getWorkItem().getWorkToken());
Thread.currentThread().setName(threadName + ":" + workToken);
}
work.run();
Expand All @@ -242,6 +244,11 @@ private void executeMonitorHeld(Runnable work, long workBytes) {
}
}

@VisibleForTesting
public static String debugFormattedWorkToken(long workToken) {
return String.format("%016x", workToken);
}

private void decrementCounters(long workBytes) {
monitor.enter();
--elementsOutstanding;
Expand Down
Loading

0 comments on commit f4c3440

Please sign in to comment.