Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions .github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ jobs:
test-type: load
test-language: python
argument-file-paths: |
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt
- name: Start Flink with parallelism 5
env:
FLINK_NUM_WORKERS: 5
Expand All @@ -108,28 +108,31 @@ jobs:
gradle-command: :sdks:python:apache_beam:testing:load_tests:run
arguments: |
--info \
-PpythonVersion=3.9 \
-PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \
-Prunner=FlinkRunner \
-Prunner=PortableRunner \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we do this, we're defeating the point of the tests - the goal here is to test against the flink runner

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PortableRunner should be correct one, see other workflows: https://github.com/apache/beam/blob/590ece2cd8e586d063686ce4cae86bc7da5a319a/.github/workflows/beam_LoadTests_Python_Combine_Flink_Batch.yml these tests work by setting up a DataProc cluster and a portable runner endpoint that map to DataProc, then submit Beam pipelines to the portable runner endpoint.

The question is why it runs on direct runner previously when setting this parameter to "FlinkRunner"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, it's because the .txt pipeline option files did not specify runner type

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, it's because the .txt pipeline option files did not specify runner type

Yes, correct

'-PloadTest.args=${{ env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_1 }} --job_name=load-tests-python-flink-batch-cogbk-1-${{ steps.datetime.outputs.datetime }}' \
- name: run CoGBK 2GB of 100B records with multiple keys
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:python:apache_beam:testing:load_tests:run
arguments: |
--info \
-PpythonVersion=3.9 \
-PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \
-Prunner=FlinkRunner \
-Prunner=PortableRunner \
'-PloadTest.args=${{ env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_2 }} --job_name=load-tests-python-flink-batch-cogbk-2-${{ steps.datetime.outputs.datetime }}' \
- name: run CoGBK reiterate 4 times 10kB values
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:python:apache_beam:testing:load_tests:run
arguments: |
--info \
-PpythonVersion=3.9 \
-PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \
-Prunner=FlinkRunner \
-Prunner=PortableRunner \
'-PloadTest.args=${{ env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_2 }} --job_name=load-tests-python-flink-batch-cogbk-3-${{ steps.datetime.outputs.datetime }}' \
- name: Teardown Flink
if: always()
run: |
${{ github.workspace }}/.test-infra/dataproc/flink_cluster.sh delete
${{ github.workspace }}/.test-infra/dataproc/flink_cluster.sh delete
2 changes: 1 addition & 1 deletion .github/workflows/beam_Publish_Docker_Snapshots.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ jobs:
arguments: |
-Pdocker-repository-root=gcr.io/apache-beam-testing/beam_portability \
-Pdocker-tag-list=${{ github.sha }}${LATEST_TAG}
- name: run Publish Docker Snapshots script for Flink
- name: run Publish Docker Snapshots script for Flink 1.17
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:flink:1.17:job-server-container:dockerPush
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

--temp_location=gs://temp-storage-for-perf-tests/loadtests
--publish_to_big_query=true
--metrics_dataset=load_test
--metrics_table=python_flink_batch_cogbk_2
--influx_measurement=python_batch_cogbk_2
--input_options=''{\\"num_records\\":20000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}''
--co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}''
--input_options=''{\\"num_records\\":200000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}''
--co_input_options=''{\\"num_records\\":20000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}''
--iterations=1
--parallelism=5
--endpoint=localhost:8099
--runner=PortableRunner
--job_endpoint=localhost:8099
--environment_type=DOCKER
--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest
--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

--temp_location=gs://temp-storage-for-perf-tests/loadtests
--publish_to_big_query=true
--metrics_dataset=load_test
--metrics_table=python_flink_batch_cogbk_1
--influx_measurement=python_batch_cogbk_1
--input_options=''{\\"num_records\\":20000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1,\\"hot_key_fraction\\":1}''
--co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1000,\\"hot_key_fraction\\":1}''
--input_options=''{\\"num_records\\":200000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1,\\"hot_key_fraction\\":1}''
--co_input_options=''{\\"num_records\\":20000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":100,\\"hot_key_fraction\\":1}''
--iterations=1
--parallelism=5
--endpoint=localhost:8099
--runner=PortableRunner
--job_endpoint=localhost:8099
--environment_type=DOCKER
--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest
--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

--temp_location=gs://temp-storage-for-perf-tests/loadtests
--publish_to_big_query=true
--metrics_dataset=load_test
--metrics_table=python_flink_batch_cogbk_3
--influx_measurement=python_batch_cogbk_3
--input_options=''{\\"num_records\\":20000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":200000,\\"hot_key_fraction\\":1}''
--co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1000,\\"hot_key_fraction\\":1}''
--input_options=''{\\"num_records\\":200000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":200000,\\"hot_key_fraction\\":1}''
--co_input_options=''{\\"num_records\\":20000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1000,\\"hot_key_fraction\\":1}''
--iterations=4
--parallelism=5
--endpoint=localhost:8099
--runner=PortableRunner
--job_endpoint=localhost:8099
--environment_type=DOCKER
--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest
--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest
14 changes: 13 additions & 1 deletion sdks/python/apache_beam/runners/portability/prism_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import logging
import os
import platform
import re
import shutil
import stat
import subprocess
Expand Down Expand Up @@ -121,7 +122,18 @@ def filter(self, record):
try:
message = record.getMessage()
json_record = json.loads(message)
record.levelno = getattr(logging, json_record["level"])
level_str = json_record["level"]
# Example level with offset: 'ERROR+2'
if "+" in level_str or "-" in level_str:
match = re.match(r"([A-Z]+)([+-]\d+)", level_str)
if match:
base, offset = match.groups()
base_level = getattr(logging, base, logging.INFO)
record.levelno = base_level + int(offset)
else:
record.levelno = getattr(logging, level_str, logging.INFO)
else:
record.levelno = getattr(logging, level_str, logging.INFO)
record.levelname = logging.getLevelName(record.levelno)
if "source" in json_record:
record.funcName = json_record["source"]["function"]
Expand Down
Loading