Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 11
"modification": 12
}
4 changes: 2 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues.

## I/Os

Expand All @@ -81,6 +82,7 @@
* Upgraded Beam vendored Calcite to 1.40.0 for Beam SQL ([#35483](https://github.com/apache/beam/issues/35483)), which
improves support for BigQuery and other SQL dialects. Note: Minor behavior changes are observed such as output
significant digits related to casting.
* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues.

## Deprecations

Expand All @@ -100,7 +102,6 @@

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues.

## I/Os

Expand All @@ -127,7 +128,6 @@

## Breaking Changes

* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues.
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* Go: The pubsubio.Read transform now accepts ReadOptions as a value type instead of a pointer, and requires exactly one of Topic or Subscription to be set (they are mutually exclusive). Additionally, the ReadOptions struct now includes a Topic field for specifying the topic directly, replacing the previous topic parameter in the Read function signature ([#35369])(https://github.com/apache/beam/pull/35369).
* SQL: The `ParquetTable` external table provider has changed its handling of the `LOCATION` property. To read from a directory, the path must now end with a trailing slash (e.g., `LOCATION '/path/to/data/'`). Previously, a trailing slash was not required. This change was made to enable support for glob patterns and single-file paths ([#35582])(https://github.com/apache/beam/pull/35582).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.beam.fn.harness.control.BeamFnControlClient;
Expand Down Expand Up @@ -64,6 +63,7 @@
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.sdk.util.UnboundedScheduledExecutorService;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.TextFormat;
Expand Down Expand Up @@ -276,8 +276,8 @@ public static void main(

IdGenerator idGenerator = IdGenerators.decrementingLongs();
ShortIdMap metricsShortIds = new ShortIdMap();
ExecutorService executorService =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why this is needed now. Does this suggest current ScheduledExecutorServiceFactory not working as expected?

class ScheduledExecutorServiceFactory implements DefaultValueFactory<ScheduledExecutorService> {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for calling this out - was in the PR description originally, and then I lost it when I updated to include information for folks coming from CHANGES. From the original description:

This fixes a thread safety issue which was causing tests to time out and fail.

The race condition comes from spinning up Java environments - when we do this today (before this PR), we:

  1. Get a singleton ExecutorService object -
    options.as(ExecutorOptions.class).getScheduledExecutorService();
  2. Execute a pipeline
  3. Shutdown the ExecutorService -

When this is all handled in different processes (as the current FnApiRunner implementation does), its not a problem since the ExecutorService doesn't need to outlive the pipeline. In multi-pipeline processes, however, the same ExecutorService object gets reused and teardown can get called while it is executing or before it starts. This leads to a long execution and an eventual timeout (see comments in PR below for details on error thrown).

This PR fixes the issue by creating a new ExecutorService object per worker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see #35621 (comment), could be due to main() get called multiple times, and the Executor embedded in pipeline option already shutdown

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's right

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh racing condition comment. This attracted attention at first glance because there was a thread leak bug due to Executor created every time spin up a runner - #32272 . As long as shutdown is called when Executor no longer used I think it's fine. Thanks again for the details!

options.as(ExecutorOptions.class).getScheduledExecutorService();
UnboundedScheduledExecutorService executorService = new UnboundedScheduledExecutorService();
options.as(ExecutorOptions.class).setScheduledExecutorService(executorService);
CompletableFuture<Void> samplerTerminationFuture = new CompletableFuture<>();
ExecutionStateSampler executionStateSampler =
new ExecutionStateSampler(
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,9 @@ def dynamic_destination_resolver(element, *side_inputs):
# For now we don't care about the return value.
mock_insert_copy_job.return_value = None

with TestPipeline('DirectRunner') as p:
# Pin to FnApiRunner for now to make mocks act appropriately.
# TODO(https://github.com/apache/beam/issues/34549)
with TestPipeline('FnApiRunner') as p:
_ = (
p
| beam.Create([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def test_default_schema_missing_embedding(self):
Chunk(id="1", content=Content(text="foo"), metadata={"a": "b"}),
Chunk(id="2", content=Content(text="bar"), metadata={"c": "d"})
]
with self.assertRaises(ValueError):
with self.assertRaisesRegex(Exception, "must contain dense embedding"):
with beam.Pipeline() as p:
_ = (p | beam.Create(chunks) | config.create_write_transform())

Expand Down
34 changes: 21 additions & 13 deletions sdks/python/apache_beam/runners/direct/direct_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ def visit_transform(self, applied_ptransform):
for state in state_specs:
if isinstance(state, userstate.CombiningValueStateSpec):
self.supported_by_prism_runner = False
if isinstance(
dofn,
beam.transforms.combiners._PartialGroupByKeyCombiningValues):
if len(transform.side_inputs) > 0:
# Prism doesn't support side input combiners (this is within spec)
self.supported_by_prism_runner = False

# TODO(https://github.com/apache/beam/issues/33623): Prism seems to
# not handle session windows correctly. Examples are:
# util_test.py::ReshuffleTest::test_reshuffle_window_fn_preserved
Expand All @@ -195,21 +202,9 @@ def visit_transform(self, applied_ptransform):
# Use BundleBasedDirectRunner if other runners are missing needed features.
runner = BundleBasedDirectRunner()

# Check whether all transforms used in the pipeline are supported by the
# FnApiRunner, and the pipeline was not meant to be run as streaming.
if _FnApiRunnerSupportVisitor().accept(pipeline):
from apache_beam.portability.api import beam_provision_api_pb2
from apache_beam.runners.portability.fn_api_runner import fn_runner
from apache_beam.runners.portability.portable_runner import JobServiceHandle
all_options = options.get_all_options()
encoded_options = JobServiceHandle.encode_pipeline_options(all_options)
provision_info = fn_runner.ExtendedProvisionInfo(
beam_provision_api_pb2.ProvisionInfo(
pipeline_options=encoded_options))
runner = fn_runner.FnApiRunner(provision_info=provision_info)
# Check whether all transforms used in the pipeline are supported by the
# PrismRunner
elif _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive):
if _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive):
_LOGGER.info('Running pipeline with PrismRunner.')
from apache_beam.runners.portability import prism_runner
runner = prism_runner.PrismRunner()
Expand All @@ -233,6 +228,19 @@ def visit_transform(self, applied_ptransform):
_LOGGER.info('Falling back to DirectRunner')
runner = BundleBasedDirectRunner()

# Check whether all transforms used in the pipeline are supported by the
# FnApiRunner, and the pipeline was not meant to be run as streaming.
if _FnApiRunnerSupportVisitor().accept(pipeline):
from apache_beam.portability.api import beam_provision_api_pb2
from apache_beam.runners.portability.fn_api_runner import fn_runner
from apache_beam.runners.portability.portable_runner import JobServiceHandle
all_options = options.get_all_options()
encoded_options = JobServiceHandle.encode_pipeline_options(all_options)
provision_info = fn_runner.ExtendedProvisionInfo(
beam_provision_api_pb2.ProvisionInfo(
pipeline_options=encoded_options))
runner = fn_runner.FnApiRunner(provision_info=provision_info)

return runner.run_pipeline(pipeline, options)


Expand Down
Loading