diff --git a/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml b/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml index 2c0c61007cd2..c40dd5678264 100644 --- a/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml +++ b/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml @@ -89,9 +89,9 @@ jobs: test-type: load test-language: python argument-file-paths: | - ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt - ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt - ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt + ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt + ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt + ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt - name: Start Flink with parallelism 5 env: FLINK_NUM_WORKERS: 5 @@ -108,8 +108,9 @@ jobs: gradle-command: :sdks:python:apache_beam:testing:load_tests:run arguments: | --info \ + -PpythonVersion=3.9 \ -PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \ - -Prunner=FlinkRunner \ + -Prunner=PortableRunner \ '-PloadTest.args=${{ env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_1 }} --job_name=load-tests-python-flink-batch-cogbk-1-${{ steps.datetime.outputs.datetime }}' \ - name: run CoGBK 2GB of 100B records with multiple keys uses: ./.github/actions/gradle-command-self-hosted-action @@ -117,8 +118,9 @@ jobs: gradle-command: :sdks:python:apache_beam:testing:load_tests:run arguments: | --info \ + -PpythonVersion=3.9 \ -PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \ - -Prunner=FlinkRunner \ + -Prunner=PortableRunner \ '-PloadTest.args=${{ env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_2 }} --job_name=load-tests-python-flink-batch-cogbk-2-${{ steps.datetime.outputs.datetime }}' \ - name: run CoGBK reiterate 4 times 10kB values uses: ./.github/actions/gradle-command-self-hosted-action @@ -126,10 +128,11 @@ jobs: gradle-command: :sdks:python:apache_beam:testing:load_tests:run arguments: | --info \ + -PpythonVersion=3.9 \ -PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \ - -Prunner=FlinkRunner \ + -Prunner=PortableRunner \ '-PloadTest.args=${{ env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_2 }} --job_name=load-tests-python-flink-batch-cogbk-3-${{ steps.datetime.outputs.datetime }}' \ - name: Teardown Flink if: always() run: | - ${{ github.workspace }}/.test-infra/dataproc/flink_cluster.sh delete \ No newline at end of file + ${{ github.workspace }}/.test-infra/dataproc/flink_cluster.sh delete diff --git a/.github/workflows/beam_Publish_Docker_Snapshots.yml b/.github/workflows/beam_Publish_Docker_Snapshots.yml index ad3f0da22962..098e06e447cf 100644 --- a/.github/workflows/beam_Publish_Docker_Snapshots.yml +++ b/.github/workflows/beam_Publish_Docker_Snapshots.yml @@ -83,7 +83,7 @@ jobs: arguments: | -Pdocker-repository-root=gcr.io/apache-beam-testing/beam_portability \ -Pdocker-tag-list=${{ github.sha }}${LATEST_TAG} - - name: run Publish Docker Snapshots script for Flink + - name: run Publish Docker Snapshots script for Flink 1.17 uses: ./.github/actions/gradle-command-self-hosted-action with: gradle-command: :runners:flink:1.17:job-server-container:dockerPush diff --git a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt similarity index 74% rename from .github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt rename to .github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt index 4b8a2f72010b..6e26ee72a77c 100644 --- a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt +++ b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt @@ -14,15 +14,15 @@ # See the License for the specific language governing permissions and # limitations under the License. ---temp_location=gs://temp-storage-for-perf-tests/loadtests --publish_to_big_query=true --metrics_dataset=load_test --metrics_table=python_flink_batch_cogbk_2 --influx_measurement=python_batch_cogbk_2 ---input_options=''{\\"num_records\\":20000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}'' ---co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}'' +--input_options=''{\\"num_records\\":200000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}'' +--co_input_options=''{\\"num_records\\":20000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}'' --iterations=1 --parallelism=5 ---endpoint=localhost:8099 +--runner=PortableRunner +--job_endpoint=localhost:8099 --environment_type=DOCKER ---environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest \ No newline at end of file +--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest diff --git a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt similarity index 69% rename from .github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt rename to .github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt index 3aeb927f04ee..e1df7e3fd5f9 100644 --- a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt +++ b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt @@ -14,15 +14,15 @@ # See the License for the specific language governing permissions and # limitations under the License. ---temp_location=gs://temp-storage-for-perf-tests/loadtests --publish_to_big_query=true --metrics_dataset=load_test --metrics_table=python_flink_batch_cogbk_1 --influx_measurement=python_batch_cogbk_1 ---input_options=''{\\"num_records\\":20000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1,\\"hot_key_fraction\\":1}'' ---co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1000,\\"hot_key_fraction\\":1}'' +--input_options=''{\\"num_records\\":200000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1,\\"hot_key_fraction\\":1}'' +--co_input_options=''{\\"num_records\\":20000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":100,\\"hot_key_fraction\\":1}'' --iterations=1 --parallelism=5 ---endpoint=localhost:8099 +--runner=PortableRunner +--job_endpoint=localhost:8099 --environment_type=DOCKER ---environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest \ No newline at end of file +--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest diff --git a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt similarity index 69% rename from .github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt rename to .github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt index e350e2d29944..b1f95027c9da 100644 --- a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt +++ b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt @@ -14,15 +14,15 @@ # See the License for the specific language governing permissions and # limitations under the License. ---temp_location=gs://temp-storage-for-perf-tests/loadtests --publish_to_big_query=true --metrics_dataset=load_test --metrics_table=python_flink_batch_cogbk_3 --influx_measurement=python_batch_cogbk_3 ---input_options=''{\\"num_records\\":20000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":200000,\\"hot_key_fraction\\":1}'' ---co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1000,\\"hot_key_fraction\\":1}'' +--input_options=''{\\"num_records\\":200000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":200000,\\"hot_key_fraction\\":1}'' +--co_input_options=''{\\"num_records\\":20000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1000,\\"hot_key_fraction\\":1}'' --iterations=4 --parallelism=5 ---endpoint=localhost:8099 +--runner=PortableRunner +--job_endpoint=localhost:8099 --environment_type=DOCKER ---environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest \ No newline at end of file +--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index bc5d8c2a6131..1c60fa3ee019 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -28,6 +28,7 @@ import logging import os import platform +import re import shutil import stat import subprocess @@ -121,7 +122,18 @@ def filter(self, record): try: message = record.getMessage() json_record = json.loads(message) - record.levelno = getattr(logging, json_record["level"]) + level_str = json_record["level"] + # Example level with offset: 'ERROR+2' + if "+" in level_str or "-" in level_str: + match = re.match(r"([A-Z]+)([+-]\d+)", level_str) + if match: + base, offset = match.groups() + base_level = getattr(logging, base, logging.INFO) + record.levelno = base_level + int(offset) + else: + record.levelno = getattr(logging, level_str, logging.INFO) + else: + record.levelno = getattr(logging, level_str, logging.INFO) record.levelname = logging.getLevelName(record.levelno) if "source" in json_record: record.funcName = json_record["source"]["function"]