diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index afdc7f7012a8..2504db607e46 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 11 + "modification": 12 } diff --git a/CHANGES.md b/CHANGES.md index 206ac3ba11ad..57db95312b90 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -64,6 +64,7 @@ * New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). * New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)). +* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues. ## I/Os @@ -81,6 +82,7 @@ * Upgraded Beam vendored Calcite to 1.40.0 for Beam SQL ([#35483](https://github.com/apache/beam/issues/35483)), which improves support for BigQuery and other SQL dialects. Note: Minor behavior changes are observed such as output significant digits related to casting. +* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues. ## Deprecations @@ -100,7 +102,6 @@ * New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). * New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)). -* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues. ## I/Os @@ -127,7 +128,6 @@ ## Breaking Changes -* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues. * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). * Go: The pubsubio.Read transform now accepts ReadOptions as a value type instead of a pointer, and requires exactly one of Topic or Subscription to be set (they are mutually exclusive). Additionally, the ReadOptions struct now includes a Topic field for specifying the topic directly, replacing the previous topic parameter in the Read function signature ([#35369])(https://github.com/apache/beam/pull/35369). * SQL: The `ParquetTable` external table provider has changed its handling of the `LOCATION` property. To read from a directory, the path must now end with a trailing slash (e.g., `LOCATION '/path/to/data/'`). Previously, a trailing slash was not required. This change was made to enable support for glob patterns and single-file paths ([#35582])(https://github.com/apache/beam/pull/35582). diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index e0b63527bec5..034695237d83 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.function.Function; import javax.annotation.Nullable; import org.apache.beam.fn.harness.control.BeamFnControlClient; @@ -64,6 +63,7 @@ import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.SdkHarnessOptions; +import org.apache.beam.sdk.util.UnboundedScheduledExecutorService; import org.apache.beam.sdk.util.construction.CoderTranslation; import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.TextFormat; @@ -276,8 +276,8 @@ public static void main( IdGenerator idGenerator = IdGenerators.decrementingLongs(); ShortIdMap metricsShortIds = new ShortIdMap(); - ExecutorService executorService = - options.as(ExecutorOptions.class).getScheduledExecutorService(); + UnboundedScheduledExecutorService executorService = new UnboundedScheduledExecutorService(); + options.as(ExecutorOptions.class).setScheduledExecutorService(executorService); CompletableFuture samplerTerminationFuture = new CompletableFuture<>(); ExecutionStateSampler executionStateSampler = new ExecutionStateSampler( diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index f4448578fb6c..5005290ad9e8 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -884,7 +884,9 @@ def dynamic_destination_resolver(element, *side_inputs): # For now we don't care about the return value. mock_insert_copy_job.return_value = None - with TestPipeline('DirectRunner') as p: + # Pin to FnApiRunner for now to make mocks act appropriately. + # TODO(https://github.com/apache/beam/issues/34549) + with TestPipeline('FnApiRunner') as p: _ = ( p | beam.Create([ diff --git a/sdks/python/apache_beam/ml/rag/ingestion/bigquery_it_test.py b/sdks/python/apache_beam/ml/rag/ingestion/bigquery_it_test.py index 7df662ab0554..b21da0443467 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/bigquery_it_test.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/bigquery_it_test.py @@ -117,7 +117,7 @@ def test_default_schema_missing_embedding(self): Chunk(id="1", content=Content(text="foo"), metadata={"a": "b"}), Chunk(id="2", content=Content(text="bar"), metadata={"c": "d"}) ] - with self.assertRaises(ValueError): + with self.assertRaisesRegex(Exception, "must contain dense embedding"): with beam.Pipeline() as p: _ = (p | beam.Create(chunks) | config.create_write_transform()) diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index a629c12a058d..0af0ca8d3175 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -184,6 +184,13 @@ def visit_transform(self, applied_ptransform): for state in state_specs: if isinstance(state, userstate.CombiningValueStateSpec): self.supported_by_prism_runner = False + if isinstance( + dofn, + beam.transforms.combiners._PartialGroupByKeyCombiningValues): + if len(transform.side_inputs) > 0: + # Prism doesn't support side input combiners (this is within spec) + self.supported_by_prism_runner = False + # TODO(https://github.com/apache/beam/issues/33623): Prism seems to # not handle session windows correctly. Examples are: # util_test.py::ReshuffleTest::test_reshuffle_window_fn_preserved @@ -195,21 +202,9 @@ def visit_transform(self, applied_ptransform): # Use BundleBasedDirectRunner if other runners are missing needed features. runner = BundleBasedDirectRunner() - # Check whether all transforms used in the pipeline are supported by the - # FnApiRunner, and the pipeline was not meant to be run as streaming. - if _FnApiRunnerSupportVisitor().accept(pipeline): - from apache_beam.portability.api import beam_provision_api_pb2 - from apache_beam.runners.portability.fn_api_runner import fn_runner - from apache_beam.runners.portability.portable_runner import JobServiceHandle - all_options = options.get_all_options() - encoded_options = JobServiceHandle.encode_pipeline_options(all_options) - provision_info = fn_runner.ExtendedProvisionInfo( - beam_provision_api_pb2.ProvisionInfo( - pipeline_options=encoded_options)) - runner = fn_runner.FnApiRunner(provision_info=provision_info) # Check whether all transforms used in the pipeline are supported by the # PrismRunner - elif _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive): + if _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive): _LOGGER.info('Running pipeline with PrismRunner.') from apache_beam.runners.portability import prism_runner runner = prism_runner.PrismRunner() @@ -233,6 +228,19 @@ def visit_transform(self, applied_ptransform): _LOGGER.info('Falling back to DirectRunner') runner = BundleBasedDirectRunner() + # Check whether all transforms used in the pipeline are supported by the + # FnApiRunner, and the pipeline was not meant to be run as streaming. + if _FnApiRunnerSupportVisitor().accept(pipeline): + from apache_beam.portability.api import beam_provision_api_pb2 + from apache_beam.runners.portability.fn_api_runner import fn_runner + from apache_beam.runners.portability.portable_runner import JobServiceHandle + all_options = options.get_all_options() + encoded_options = JobServiceHandle.encode_pipeline_options(all_options) + provision_info = fn_runner.ExtendedProvisionInfo( + beam_provision_api_pb2.ProvisionInfo( + pipeline_options=encoded_options)) + runner = fn_runner.FnApiRunner(provision_info=provision_info) + return runner.run_pipeline(pipeline, options)