Skip to content

Conversation

@damccorm
Copy link
Contributor

@damccorm damccorm commented Jul 17, 2025

Changes the default Python runner to be prism (when prism is supported).

There are a bunch of use cases automatically excluded, but this also required updating a bunch of tests. The 2 main breaking changes this introduces are:

Instead of raising Python-specific exceptions, when Prism jobs fail a RuntimeError is always raised. To fix any tests relying on the old error, change your exception catching logic to look for broader errors and use regexes to verify the contents of the error. There are lots of examples of this in this PR.
Prism names its metrics differently than the local runner. For this PR in Beam, I just kept all of those on the old FnApiRunner for now, but you could also just change the metric name you're looking for (this will be the eventual fix for these tests in this repo).

If this change breaks you in unexpected ways, please:

  • Pin to the FnApiRunner instead of the DirectRunner. There are lots of examples of this in this PR.
  • Comment in [Task]: Make Prism default #34549 describing your broken use case.

Part of #34549


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@damccorm
Copy link
Contributor Author

Running the timing out tests locally, I got:

INFO     root:pipeline.py:203 Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO     apache_beam.yaml.yaml_transform:yaml_transform.py:501 Expanding "Create" at line 26 
INFO     apache_beam.yaml.yaml_transform:yaml_transform.py:501 Expanding "WriteToJson" at line 32 
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:490 Unknown pipeline options received: -v,apache_beam/yaml/integration_tests.py. Ignore if flags are used for internal purposes.
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:526 Discarding flag -v, single dash flags are not allowed.
INFO     root:external.py:1086 Starting a JAR-based expansion service from JAR /Users/dannymccormick/beam/sdks/java/extensions/schemaio-expansion-service/build/libs/beam-sdks-java-extensions-schemaio-expansion-service-2.67.0-SNAPSHOT.jar 
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 Jul 17, 2025 3:32:23 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 INFO: Expanding 'WriteToJson/beam:schematransform:org.apache.beam:json_write:v1' with URN 'beam:expansion:payload:schematransform:v1'
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:534 Discarding unparseable args: ['-v', 'apache_beam/yaml/integration_tests.py']
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:534 Discarding unparseable args: ['-v', 'apache_beam/yaml/integration_tests.py']
INFO     apache_beam.runners.worker.statecache:statecache.py:214 Creating state cache with size 104857600
INFO     apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:522 starting control server on port 58249
INFO     apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:523 starting data server on port 58250
INFO     apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:524 starting state server on port 58251
INFO     apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:525 starting logging server on port 58252
INFO     apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:624 Requesting worker at localhost:58227
INFO     apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:628 self.control_address: localhost:58249
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 Jul 17, 2025 3:32:23 PM org.apache.beam.fn.harness.ExternalWorkerService startWorker
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 INFO: Starting worker worker_7 pointing at localhost:58249.
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 Jul 17, 2025 3:32:23 PM org.apache.beam.fn.harness.ExternalWorkerService lambda$startWorker$0
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 SEVERE: Failed to start worker worker_7.
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 java.util.concurrent.RejectedExecutionException: Task org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask@74521c87[Not completed, task = org.apache.beam.fn.harness.control.ExecutionStateSampler$$Lambda$292/0x0000000800633c40@706d9652] rejected from java.util.concurrent.ThreadPoolExecutor@31509c69[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 9]
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.sdk.util.UnboundedScheduledExecutorService.submit(UnboundedScheduledExecutorService.java:405)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.control.ExecutionStateSampler.<init>(ExecutionStateSampler.java:106)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.FnHarness.main(FnHarness.java:281)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.FnHarness.main(FnHarness.java:236)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.ExternalWorkerService.lambda$startWorker$0(ExternalWorkerService.java:105)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.lang.Thread.run(Thread.java:829)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 
INFO     apache_beam.runners.worker.data_plane:data_plane.py:550 Detected input queue delay longer than 300 seconds. Waiting to receive elements in input queue for instruction: bundle_22 for 300.04 seconds.
INFO     apache_beam.runners.worker.data_plane:data_plane.py:550 Detected input queue delay longer than 300 seconds. Waiting to receive elements in input queue for instruction: bundle_22 for 600.79 seconds.

This isn't showing up in CI because stdout is only logged when a suite fails, but we're timing out before that can happen.

@damccorm
Copy link
Contributor Author

damccorm commented Jul 17, 2025

And iin a different test (which runs first) I see the following (looks like same root cause):

INFO     root:pipeline.py:203 Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO     apache_beam.yaml.yaml_transform:yaml_transform.py:501 Expanding "Create" at line 27 
INFO     apache_beam.yaml.yaml_transform:yaml_transform.py:501 Expanding "Filter" at line 35 
INFO     apache_beam.yaml.yaml_transform:yaml_transform.py:501 Expanding "AssertEqual" at line 39 
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:534 Discarding unparseable args: ['-v', 'apache_beam/yaml/integration_tests.py']
INFO     apache_beam.runners.direct.direct_runner:direct_runner.py:201 Running pipeline with PrismRunner.
INFO     apache_beam.runners.worker.worker_pool_main:worker_pool_main.py:110 Listening for workers at localhost:57070
INFO     apache_beam.runners.portability.prism_runner:prism_runner.py:320 Installing prism from local source into "/Users/dannymccormick/.apache_beam/cache/prism/bin".
INFO     apache_beam.runners.portability.prism_runner:prism_runner.py:182 Prism binary path resolved to: /Users/dannymccormick/.apache_beam/cache/prism/bin/prism
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:213 Starting service with ('/Users/dannymccormick/.apache_beam/cache/prism/bin/prism' '--job_port' '57076' '--web_port' '8079' '--serve_http' 'False')
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 [2025-07-17T15:21:52.01824-04:00]  INFO  Serving JobManagement
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 * endpoint: localhost:57076
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 [2025-07-17T15:21:52.019933-04:00]  INFO  Serving WebUI
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 * endpoint: http://localhost:8079
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:490 Unknown pipeline options received: -v,apache_beam/yaml/integration_tests.py. Ignore if flags are used for internal purposes.
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:526 Discarding flag -v, single dash flags are not allowed.
INFO     apache_beam.runners.portability.portable_runner:portable_runner.py:387 Environment "LOOPBACK" has started a component necessary for the execution. Be sure to run the pipeline using
  with Pipeline() as p:
    p.apply(..)
This ensures that the pipeline finishes before this program exits.
INFO     apache_beam.runners.worker.statecache:statecache.py:214 Creating state cache with size 0
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:189 Creating insecure control channel for localhost:57076.
INFO     apache_beam.runners.portability.portable_runner:portable_runner.py:542 Job state changed to STOPPED
INFO     root:portable_runner.py:535 starting job-001[job]
INFO     root:portable_runner.py:535 running job-001[job]
INFO     apache_beam.runners.portability.portable_runner:portable_runner.py:542 Job state changed to RUNNING
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:197 Control channel established.
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:249 Initializing SDKHarness with unbounded number of workers.
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:923 Creating insecure state channel for localhost:57076.
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:930 State channel established.
INFO     apache_beam.runners.worker.data_plane:data_plane.py:819 Creating client data channel for localhost:57076
INFO     root:portable_runner.py:535 pipeline completed job-001[job]
INFO     root:portable_runner.py:535 terminating job-001[job]
INFO     apache_beam.runners.portability.portable_runner:portable_runner.py:542 Job state changed to DONE
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:290 No more requests from control plane
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:291 SDK Harness waiting for in-flight requests to complete
INFO     apache_beam.runners.worker.data_plane:data_plane.py:852 Closing all cached grpc data channels.
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:942 Closing all cached gRPC state handlers.
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:303 Done consuming work.
INFO     root:pipeline.py:203 Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO     apache_beam.yaml.yaml_transform:yaml_transform.py:501 Expanding "Create" at line 48 
INFO     apache_beam.yaml.yaml_transform:yaml_transform.py:501 Expanding "Filter" at line 56 
INFO     apache_beam.yaml.yaml_transform:yaml_transform.py:501 Expanding "AssertEqual" at line 63 
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:534 Discarding unparseable args: ['-v', 'apache_beam/yaml/integration_tests.py']
INFO     apache_beam.runners.direct.direct_runner:direct_runner.py:201 Running pipeline with PrismRunner.
INFO     apache_beam.runners.worker.worker_pool_main:worker_pool_main.py:110 Listening for workers at localhost:57092
INFO     apache_beam.runners.portability.prism_runner:prism_runner.py:320 Installing prism from local source into "/Users/dannymccormick/.apache_beam/cache/prism/bin".
INFO     apache_beam.runners.portability.prism_runner:prism_runner.py:182 Prism binary path resolved to: /Users/dannymccormick/.apache_beam/cache/prism/bin/prism
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:213 Starting service with ('/Users/dannymccormick/.apache_beam/cache/prism/bin/prism' '--job_port' '57096' '--web_port' '8079' '--serve_http' 'False')
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 [2025-07-17T15:21:54.668759-04:00]  INFO  Serving JobManagement
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 * endpoint: localhost:57096
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 [2025-07-17T15:21:54.670957-04:00]  INFO  Serving WebUI
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 * endpoint: http://localhost:8079
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:490 Unknown pipeline options received: -v,apache_beam/yaml/integration_tests.py. Ignore if flags are used for internal purposes.
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:526 Discarding flag -v, single dash flags are not allowed.
INFO     apache_beam.runners.portability.portable_runner:portable_runner.py:387 Environment "LOOPBACK" has started a component necessary for the execution. Be sure to run the pipeline using
  with Pipeline() as p:
    p.apply(..)
This ensures that the pipeline finishes before this program exits.
INFO     apache_beam.runners.worker.statecache:statecache.py:214 Creating state cache with size 0
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:189 Creating insecure control channel for localhost:57096.
INFO     apache_beam.runners.portability.portable_runner:portable_runner.py:542 Job state changed to STOPPED
INFO     root:portable_runner.py:535 starting job-001[job]
INFO     apache_beam.runners.portability.portable_runner:portable_runner.py:542 Job state changed to STARTING
INFO     root:portable_runner.py:535 running job-001[job]
INFO     apache_beam.runners.portability.portable_runner:portable_runner.py:542 Job state changed to RUNNING
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:197 Control channel established.
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:249 Initializing SDKHarness with unbounded number of workers.
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:923 Creating insecure state channel for localhost:57096.
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:930 State channel established.
INFO     apache_beam.runners.worker.data_plane:data_plane.py:819 Creating client data channel for localhost:57096
INFO     root:portable_runner.py:535 pipeline completed job-001[job]
INFO     root:portable_runner.py:535 terminating job-001[job]
INFO     apache_beam.runners.portability.portable_runner:portable_runner.py:542 Job state changed to DONE
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:290 No more requests from control plane
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:291 SDK Harness waiting for in-flight requests to complete
INFO     apache_beam.runners.worker.data_plane:data_plane.py:852 Closing all cached grpc data channels.
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:942 Closing all cached gRPC state handlers.
INFO     apache_beam.runners.worker.sdk_worker:sdk_worker.py:303 Done consuming work.
INFO     root:pipeline.py:203 Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO     apache_beam.yaml.yaml_transform:yaml_transform.py:501 Expanding "Create" at line 72 
INFO     apache_beam.yaml.yaml_transform:yaml_transform.py:501 Expanding "Filter" at line 80 
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:388 Using pre-built snapshot at /Users/dannymccormick/beam/sdks/java/extensions/sql/expansion-service/build/libs/beam-sdks-java-extensions-sql-expansion-service-2.67.0-SNAPSHOT.jar
INFO     root:external.py:1086 Starting a JAR-based expansion service from JAR /Users/dannymccormick/beam/sdks/java/extensions/sql/expansion-service/build/libs/beam-sdks-java-extensions-sql-expansion-service-2.67.0-SNAPSHOT.jar 
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:213 Starting service with ['/opt/homebrew/Cellar/openjdk@11/11.0.27/libexec/openjdk.jdk/Contents/Home/bin/java' '-jar' '/Users/dannymccormick/beam/sdks/java/extensions/sql/expansion-service/build/libs/beam-sdks-java-extensions-sql-expansion-service-2.67.0-SNAPSHOT.jar' '57107' '--filesToStage=/Users/dannymccormick/beam/sdks/java/extensions/sql/expansion-service/build/libs/beam-sdks-java-extensions-sql-expansion-service-2.67.0-SNAPSHOT.jar' '--alsoStartLoopbackWorker']
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 Starting expansion service at localhost:57107
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 Jul 17, 2025 3:21:56 PM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 INFO: Registering external transforms: [beam:transform:org.apache.beam:pubsublite_write:v1, beam:transform:combine_globally:v1, beam:transform:redistribute_by_key:v1, beam:transform:window_into:v1, beam:transform:org.apache.beam:spanner_update:v1, beam:schematransform:org.apache.beam:iceberg_cdc_read:v1, beam:transform:combine_per_key:v1, beam:schematransform:org.apache.beam:bigquery_write:v1, beam:transform:group_by_key:v1, beam:transform:org.apache.beam:spanner_delete:v1, beam:transform:group_into_batches_with_sharded_key:v1, beam:directrunner:transforms:write_view:v1, beam:transform:org.apache.beam:spanner_replace:v1, beam:schematransform:org.apache.beam:iceberg_write:v1, beam:transform:org.apache.beam:pubsublite_read:v1, beam:schematransform:org.apache.beam:tfrecord_read:v1, beam:transform:flatten:v1, beam:transform:write_files:v1, beam:runners_core:transforms:splittable_process:v1, beam:schematransform:org.apache.beam:tfrecord_write:v1, beam:directrunner:transforms:stateful_pardo:v1, beam:transform:org.apache.beam:bigquery_read:v1, beam:transform:org.apache.beam:kafka_read_without_metadata:v1, beam:transform:org.apache.beam:spanner_insert_or_update:v1, beam:directrunner:transforms:test_stream:v1, beam:transform:org.apache.beam:kafka_write:v1, beam:transform:combine_grouped_values:v1, beam:schematransform:org.apache.beam:iceberg_read:v1, beam:external:java:generate_sequence:v1, beam:schematransform:org.apache.beam:bigquery_storage_read:v1, beam:directrunner:transforms:gabw:v1, beam:transform:org.apache.beam:pubsub_write:v1, beam:schematransform:org.apache.beam:kafka_read:v1, beam:schematransform:org.apache.beam:kafka_write:v1, beam:transform:org.apache.beam:kafka_read_with_metadata:v1, beam:transform:pubsub_read:v1, beam:transform:group_into_batches:v1, beam:transform:org.apache.beam:spanner_read:v1, beam:transform:pubsub_write:v1, beam:transform:org.apache.beam:spanner_insert:v1, beam:transform:pubsub_write:v2, beam:transform:create_view:v1, beam:transform:teststream:v1, beam:transform:sdf_process_keyed_elements:v1, beam:transform:org.apache.beam:pubsub_read:v1, beam:external:java:sql:v1, beam:directrunner:transforms:gbko:v1, beam:transform:org.apache.beam:bigquery_write:v1, beam:transform:impulse:v1, beam:transform:managed:v1, beam:transform:redistribute_arbitrarily:v1, beam:transform:reshuffle:v1]
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 Registered transforms:
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:org.apache.beam:pubsublite_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@23c650a3
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:combine_globally:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@742d4e15
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:redistribute_by_key:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@88a8218
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:window_into:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@50b1f030
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:org.apache.beam:spanner_update:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@4163f1cd
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:iceberg_cdc_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@5fa05212
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:combine_per_key:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@3e681bc
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:bigquery_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@5c09d180
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:group_by_key:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@23aae55
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:org.apache.beam:spanner_delete:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@5f574cc2
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:group_into_batches_with_sharded_key:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@680bddf5
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:directrunner:transforms:write_view:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@7a9c84a5
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:org.apache.beam:spanner_replace:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@2d83c5a5
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:iceberg_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@48d7ad8b
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:org.apache.beam:pubsublite_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@7e053511
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:tfrecord_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@60222fd8
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:flatten:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@53bf7094
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:write_files:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@26f1249d
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:runners_core:transforms:splittable_process:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@710b30ef
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:tfrecord_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@a68df9
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:directrunner:transforms:stateful_pardo:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@28b576a9
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:org.apache.beam:bigquery_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@7cf7aee
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:org.apache.beam:kafka_read_without_metadata:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@2f6bbeb0
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:org.apache.beam:spanner_insert_or_update:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@1b1637e1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:directrunner:transforms:test_stream:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@18151a14
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:org.apache.beam:kafka_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@64711bf2
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:combine_grouped_values:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@169da7f2
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:iceberg_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@3c1e23ff
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@ceb4bd2
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:bigquery_storage_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@60297f36
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:directrunner:transforms:gabw:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@1bf0f6f6
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:org.apache.beam:pubsub_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@56bc3fac
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:kafka_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@df4b72
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:kafka_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@2ba45490
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:org.apache.beam:kafka_read_with_metadata:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@37ff4054
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:pubsub_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@894858
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:group_into_batches:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@7af707e0
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:org.apache.beam:spanner_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@737edcfa
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:pubsub_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@3ecedf21
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:org.apache.beam:spanner_insert:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@74cf8b28
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:pubsub_write:v2: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@36c54a56
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:create_view:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@3359c978
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:teststream:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@7ba63fe5
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:sdf_process_keyed_elements:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@73386d72
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:org.apache.beam:pubsub_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@6f330eb9
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:external:java:sql:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@125c082e
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:directrunner:transforms:gbko:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@584f5497
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:org.apache.beam:bigquery_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@68ace111
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:impulse:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@1f9d6c7b
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:managed:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@6df20ade
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:redistribute_arbitrarily:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@4fbb001b
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:reshuffle:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@4010d494
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 Registered SchemaTransformProviders:
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:bigquery_fileloads:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:yaml:filter-java:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:spanner_cdc_read:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:iceberg_read:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:spanner_write:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:yaml:log_for_testing:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:pubsublite_read:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:bigquery_storage_read:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:spanner_read:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:test_schematransform:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:bigquery_export_read:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:kafka_read:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:kafka_write:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   schematransform:org.apache.beam:sql_transform:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:yaml:window_into_strategy:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:iceberg_cdc_read:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:bigquery_write:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:pubsublite_write:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:generate_sequence:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:yaml:explode:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:yaml:flatten:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:bigtable_write:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:iceberg_write:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:tfrecord_read:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:bigquery_storage_write:v2
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:pubsub_read:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:tfrecord_write:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:pubsub_write:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:schematransform:org.apache.beam:bigtable_read:v1
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   beam:transform:managed:v1
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:490 Unknown pipeline options received: -v,apache_beam/yaml/integration_tests.py. Ignore if flags are used for internal purposes.
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:526 Discarding flag -v, single dash flags are not allowed.
INFO     root:external.py:1086 Starting a JAR-based expansion service from JAR /Users/dannymccormick/beam/sdks/java/extensions/sql/expansion-service/build/libs/beam-sdks-java-extensions-sql-expansion-service-2.67.0-SNAPSHOT.jar 
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 Jul 17, 2025 3:21:56 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 INFO: Expanding 'Filter/beam:schematransform:org.apache.beam:yaml:filter-java:v1' with URN 'beam:expansion:payload:schematransform:v1'
INFO     apache_beam.yaml.yaml_transform:yaml_transform.py:501 Expanding "AssertEqual" at line 84 
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:534 Discarding unparseable args: ['-v', 'apache_beam/yaml/integration_tests.py']
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:534 Discarding unparseable args: ['-v', 'apache_beam/yaml/integration_tests.py']
INFO     apache_beam.runners.worker.statecache:statecache.py:214 Creating state cache with size 104857600
INFO     apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:522 starting control server on port 57133
INFO     apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:523 starting data server on port 57134
INFO     apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:524 starting state server on port 57135
INFO     apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:525 starting logging server on port 57136
INFO     apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:624 Requesting worker at localhost:57107
INFO     apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:628 self.control_address: localhost:57133
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 Jul 17, 2025 3:22:07 PM org.apache.beam.fn.harness.ExternalWorkerService startWorker
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 INFO: Starting worker worker_1 pointing at localhost:57133.
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 Jul 17, 2025 3:22:07 PM org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 SEVERE: *~*~*~ Previous channel ManagedChannelImpl{logId=8, target=localhost:57133} was garbage collected without being shut down! ~*~*~*
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223     Make sure to call shutdown()/shutdownNow()
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 java.lang.RuntimeException: ManagedChannel allocation site
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:102)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:60)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:51)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:710)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingChannelBuilder2.build(ForwardingChannelBuilder2.java:272)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.sdk.fn.channel.ManagedChannelFactory.forDescriptor(ManagedChannelFactory.java:101)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.ExternalWorkerService.startWorker(ExternalWorkerService.java:83)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$MethodHandlers.invoke(BeamFnExternalWorkerPoolGrpc.java:296)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:356)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:861)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.lang.Thread.run(Thread.java:829)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 
INFO     root:worker_handlers.py:425 severity: INFO
timestamp {
  seconds: 1752780127
  nanos: 839000000
}
message: "Fn Harness started"
log_location: "org.apache.beam.fn.harness.FnHarness"
thread: "22"

INFO     root:worker_handlers.py:425 severity: INFO
timestamp {
  seconds: 1752780127
  nanos: 874000000
}
message: "Running JvmInitializer#beforeProcessing for org.apache.beam.sdk.io.kafka.KafkaIOInitializer@5324362f"
log_location: "org.apache.beam.sdk.fn.JvmInitializers"
thread: "22"

INFO     root:worker_handlers.py:425 severity: INFO
timestamp {
  seconds: 1752780127
  nanos: 874000000
}
message: "Completed JvmInitializer#beforeProcessing for org.apache.beam.sdk.io.kafka.KafkaIOInitializer@5324362f"
log_location: "org.apache.beam.sdk.fn.JvmInitializers"
thread: "22"

INFO     root:worker_handlers.py:425 severity: INFO
timestamp {
  seconds: 1752780127
  nanos: 874000000
}
message: "Entering instruction processing loop"
log_location: "org.apache.beam.fn.harness.FnHarness"
thread: "22"

INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 WARNING: An illegal reflective access operation has occurred
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 WARNING: Illegal reflective access by org.github.jamm.utils.InjectedInvoker/0x0000000800756840 (file:/Users/dannymccormick/beam/sdks/java/extensions/sql/expansion-service/build/libs/beam-sdks-java-extensions-sql-expansion-service-2.67.0-SNAPSHOT.jar) to field java.util.TreeMap.comparator
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 WARNING: Please consider reporting this to the maintainers of org.github.jamm.utils.InjectedInvoker/0x0000000800756840
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 WARNING: All illegal access operations will be denied in a future release
INFO     root:worker_handlers.py:425 severity: INFO
timestamp {
  seconds: 1752780128
  nanos: 683000000
}
message: "Hanged up for url: \"localhost:57134\"\n."
log_location: "org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer"
thread: "18"

INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 Jul 17, 2025 3:22:09 PM org.apache.beam.fn.harness.logging.BeamFnLoggingClient flushFinalLogs
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 SEVERE: *~*~*~ Previous channel ManagedChannelImpl{logId=24, target=localhost:57134} was garbage collected without being shut down! ~*~*~*
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223     Make sure to call shutdown()/shutdownNow()
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 java.lang.Throwable: java.lang.RuntimeException: ManagedChannel allocation site
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:102)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:60)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:51)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:710)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingChannelBuilder2.build(ForwardingChannelBuilder2.java:272)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.sdk.fn.channel.ManagedChannelFactory.forDescriptor(ManagedChannelFactory.java:101)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.data.BeamFnDataGrpcClient.lambda$getClientFor$0(BeamFnDataGrpcClient.java:113)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.data.BeamFnDataGrpcClient.getClientFor(BeamFnDataGrpcClient.java:107)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.data.BeamFnDataGrpcClient.createOutboundAggregator(BeamFnDataGrpcClient.java:101)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.control.ProcessBundleHandler$1.lambda$addOutgoingDataEndpoint$0(ProcessBundleHandler.java:381)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1134)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.control.ProcessBundleHandler$1.addOutgoingDataEndpoint(ProcessBundleHandler.java:378)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.BeamFnDataWriteRunner$Factory.addWriteRunner(BeamFnDataWriteRunner.java:98)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.BeamFnDataWriteRunner$Factory.addRunnerForPTransform(BeamFnDataWriteRunner.java:62)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.control.ProcessBundleHandler.addRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:304)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.control.ProcessBundleHandler.addRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:258)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.control.ProcessBundleHandler.addRunnerAndConsumersForPTransformRecursively(ProcessBundleHandler.java:258)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:867)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:501)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:979)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:497)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.lang.Thread.run(Thread.java:829)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.logging.BeamFnLoggingClient.flushFinalLogs(BeamFnLoggingClient.java:380)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.logging.BeamFnLoggingClient.lambda$new$0(BeamFnLoggingClient.java:165)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.lang.Thread.run(Thread.java:829)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 Jul 17, 2025 3:22:09 PM org.apache.beam.fn.harness.FnHarness main
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 INFO: Shutting SDK harness down.
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 Jul 17, 2025 3:22:09 PM org.apache.beam.fn.harness.ExternalWorkerService lambda$startWorker$0
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 INFO: Successfully started worker worker_1.
INFO     root:pipeline.py:203 Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO     apache_beam.yaml.yaml_transform:yaml_transform.py:501 Expanding "Create" at line 93 
INFO     apache_beam.yaml.yaml_transform:yaml_transform.py:501 Expanding "Filter" at line 101 
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:490 Unknown pipeline options received: -v,apache_beam/yaml/integration_tests.py. Ignore if flags are used for internal purposes.
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:526 Discarding flag -v, single dash flags are not allowed.
INFO     root:external.py:1086 Starting a JAR-based expansion service from JAR /Users/dannymccormick/beam/sdks/java/extensions/sql/expansion-service/build/libs/beam-sdks-java-extensions-sql-expansion-service-2.67.0-SNAPSHOT.jar 
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 Jul 17, 2025 3:22:15 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 INFO: Expanding 'Filter/beam:schematransform:org.apache.beam:yaml:filter-java:v1' with URN 'beam:expansion:payload:schematransform:v1'
INFO     apache_beam.yaml.yaml_transform:yaml_transform.py:501 Expanding "AssertEqual" at line 113 
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:534 Discarding unparseable args: ['-v', 'apache_beam/yaml/integration_tests.py']
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:534 Discarding unparseable args: ['-v', 'apache_beam/yaml/integration_tests.py']
INFO     apache_beam.runners.worker.statecache:statecache.py:214 Creating state cache with size 104857600
INFO     apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:522 starting control server on port 57237
INFO     apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:523 starting data server on port 57238
INFO     apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:524 starting state server on port 57239
INFO     apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:525 starting logging server on port 57240
INFO     apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:624 Requesting worker at localhost:57107
INFO     apache_beam.runners.portability.fn_api_runner.worker_handlers:worker_handlers.py:628 self.control_address: localhost:57237
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 Jul 17, 2025 3:22:18 PM org.apache.beam.fn.harness.ExternalWorkerService startWorker
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 INFO: Starting worker worker_3 pointing at localhost:57237.
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 Jul 17, 2025 3:22:18 PM org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 SEVERE: *~*~*~ Previous channel ManagedChannelImpl{logId=16, target=localhost:57133} was garbage collected without being shut down! ~*~*~*
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223     Make sure to call shutdown()/shutdownNow()
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 java.lang.RuntimeException: ManagedChannel allocation site
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:102)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:60)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:51)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:710)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingChannelBuilder2.build(ForwardingChannelBuilder2.java:272)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.sdk.fn.channel.ManagedChannelFactory.forDescriptor(ManagedChannelFactory.java:101)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.FnHarness.main(FnHarness.java:300)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.FnHarness.main(FnHarness.java:236)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.ExternalWorkerService.lambda$startWorker$0(ExternalWorkerService.java:105)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.lang.Thread.run(Thread.java:829)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 Jul 17, 2025 3:22:18 PM org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 SEVERE: *~*~*~ Previous channel ManagedChannelImpl{logId=20, target=localhost:57135} was garbage collected without being shut down! ~*~*~*
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223     Make sure to call shutdown()/shutdownNow()
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 java.lang.RuntimeException: ManagedChannel allocation site
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:102)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:60)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:51)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:710)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingChannelBuilder2.build(ForwardingChannelBuilder2.java:272)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.sdk.fn.channel.ManagedChannelFactory.forDescriptor(ManagedChannelFactory.java:101)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache$GrpcStateClient.<init>(BeamFnStateGrpcClientCache.java:103)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache$GrpcStateClient.<init>(BeamFnStateGrpcClientCache.java:91)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache.forApiServiceDescriptor(BeamFnStateGrpcClientCache.java:82)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.control.ProcessBundleHandler.createBundleProcessor(ProcessBundleHandler.java:810)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.control.ProcessBundleHandler.lambda$processBundle$0(ProcessBundleHandler.java:501)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.control.ProcessBundleHandler$BundleProcessorCache.get(ProcessBundleHandler.java:979)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:497)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.lang.Thread.run(Thread.java:829)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 Jul 17, 2025 3:22:18 PM org.apache.beam.fn.harness.ExternalWorkerService lambda$startWorker$0
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 SEVERE: Failed to start worker worker_3.
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 java.util.concurrent.RejectedExecutionException: Task org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask@8c8a544[Not completed, task = org.apache.beam.fn.harness.control.ExecutionStateSampler$$Lambda$468/0x0000000800799040@cf73002] rejected from java.util.concurrent.ThreadPoolExecutor@4dca3a39[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 4]
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.sdk.util.UnboundedScheduledExecutorService.submit(UnboundedScheduledExecutorService.java:405)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.control.ExecutionStateSampler.<init>(ExecutionStateSampler.java:106)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.FnHarness.main(FnHarness.java:281)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.FnHarness.main(FnHarness.java:236)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at org.apache.beam.fn.harness.ExternalWorkerService.lambda$startWorker$0(ExternalWorkerService.java:105)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223   at java.base/java.lang.Thread.run(Thread.java:829)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:223 
INFO     apache_beam.runners.worker.data_plane:data_plane.py:550 Detected input queue delay longer than 300 seconds. Waiting to receive elements in input queue for instruction: bundle_11 for 300.11 seconds.

@github-actions github-actions bot added the java label Jul 18, 2025
@damccorm
Copy link
Contributor Author

Run Yaml_Xlang_Direct PreCommit

@github-actions
Copy link
Contributor

Test Results

0 tests   - 375   0 ✅  - 375   0s ⏱️ - 5m 48s
0 suites  -  55   0 💤 ±  0 
0 files    -  55   0 ❌ ±  0 

Results for commit aaa59c1. ± Comparison against base commit 6812836.

@damccorm damccorm marked this pull request as ready for review July 28, 2025 14:30
@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@damccorm damccorm marked this pull request as draft July 28, 2025 14:45
@damccorm damccorm marked this pull request as ready for review August 6, 2025 14:32
@damccorm
Copy link
Contributor Author

damccorm commented Aug 6, 2025

R: @shunping

@github-actions
Copy link
Contributor

github-actions bot commented Aug 6, 2025

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

Copy link
Collaborator

@shunping shunping left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@damccorm damccorm merged commit 486de0f into master Aug 6, 2025
69 of 81 checks passed
@damccorm damccorm deleted the users/damccorm/reenablePrism branch August 6, 2025 15:38

IdGenerator idGenerator = IdGenerators.decrementingLongs();
ShortIdMap metricsShortIds = new ShortIdMap();
ExecutorService executorService =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why this is needed now. Does this suggest current ScheduledExecutorServiceFactory not working as expected?

class ScheduledExecutorServiceFactory implements DefaultValueFactory<ScheduledExecutorService> {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for calling this out - was in the PR description originally, and then I lost it when I updated to include information for folks coming from CHANGES. From the original description:

This fixes a thread safety issue which was causing tests to time out and fail.

The race condition comes from spinning up Java environments - when we do this today (before this PR), we:

  1. Get a singleton ExecutorService object -
    options.as(ExecutorOptions.class).getScheduledExecutorService();
  2. Execute a pipeline
  3. Shutdown the ExecutorService -

When this is all handled in different processes (as the current FnApiRunner implementation does), its not a problem since the ExecutorService doesn't need to outlive the pipeline. In multi-pipeline processes, however, the same ExecutorService object gets reused and teardown can get called while it is executing or before it starts. This leads to a long execution and an eventual timeout (see comments in PR below for details on error thrown).

This PR fixes the issue by creating a new ExecutorService object per worker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see #35621 (comment), could be due to main() get called multiple times, and the Executor embedded in pipeline option already shutdown

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's right

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh racing condition comment. This attracted attention at first glance because there was a thread leak bug due to Executor created every time spin up a runner - #32272 . As long as shutdown is called when Executor no longer used I think it's fine. Thanks again for the details!

parveensania pushed a commit to parveensania/beam-dp that referenced this pull request Aug 17, 2025
* Reenable prism as default

* Fix race condition

* Spotless

* Portable test

* Fix a few new test issues

* Update CHANGES
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants