Skip to content

Commit

Permalink
[BEAM-9487] Revert PR15340 for Beam 2.33
Browse files Browse the repository at this point in the history
  • Loading branch information
zhoufek committed Sep 28, 2021
1 parent a3c869d commit 1a7271d
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 18 deletions.
5 changes: 2 additions & 3 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@

## Breaking Changes

* Python GBK by defualt will fail on unbounded PCollections that have global windowing and a default trigger. The `--allow_unsafe_triggers` flag can be used to override this. ([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
* Python GBK will fail if it detects an unsafe trigger unless the `--allow_unsafe_triggers` flag is set. ([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
* Go SDK pipelines require new import paths to use this release due to migration to Go Modules.
* `go.mod` files will need to change to require `github.com/apache/beam/sdks/v2`.
* Code depending on beam imports need to include v2 on the module path.
Expand All @@ -83,7 +81,8 @@

## Deprecations

* X behavior is deprecated and will be removed in X versions ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Python GBK will stop supporting unbounded PCollections that have global windowing and a default trigger in Beam 2.34. This can be overriden with `--allow_unsafe_triggers`. ([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
* Python GBK will start requiring safe triggers or the `--allow_unsafe_triggers` flag starting with Beam 2.34. ([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).

## Known Issues

Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,8 @@ def _add_argparse_args(cls, parser):
'compatibility. See BEAM-11719.')
parser.add_argument(
'--allow_unsafe_triggers',
default=False,
# TODO(BEAM-9487): Set to False for Beam 2.34
default=True,
action='store_true',
help='Allow the use of unsafe triggers. Unsafe triggers have the '
'potential to cause data loss due to finishing and/or never having '
Expand Down
31 changes: 17 additions & 14 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2318,12 +2318,12 @@ def expand(self, pcoll):
windowing.windowfn, GlobalWindows) and isinstance(trigger,
DefaultTrigger):
if pcoll.pipeline.allow_unsafe_triggers:
# TODO(BEAM-9487) Change comment for Beam 2.33
# TODO(BEAM-9487) Change comment for Beam 2.34
_LOGGER.warning(
'%s: PCollection passed to GroupByKey is unbounded, has a global '
'window, and uses a default trigger. This is being allowed '
'because --allow_unsafe_triggers is set, but it may prevent '
'data from making it through the pipeline.',
'PCollection passed to GroupByKey (label: %s) is unbounded, has a '
'global window, and uses a default trigger. This will no longer '
'be allowed starting with Beam 2.34 unless '
'--allow_unsafe_triggers is set.',
self.label)
else:
raise ValueError(
Expand All @@ -2332,19 +2332,22 @@ def expand(self, pcoll):

unsafe_reason = trigger.may_lose_data(windowing)
if unsafe_reason != DataLossReason.NO_POTENTIAL_LOSS:
reason_msg = str(unsafe_reason).replace('DataLossReason.', '')
if pcoll.pipeline.allow_unsafe_triggers:
# TODO(BEAM-9487): Switch back to this log for Beam 2.34.
# _LOGGER.warning(
# 'Skipping trigger safety check. '
# 'This could lead to incomplete or missing groups.')
_LOGGER.warning(
'%s: Unsafe trigger `%s` detected (reason: %s). This is '
'being allowed because --allow_unsafe_triggers is set. This could '
'lead to missing or incomplete groups.',
'%s: Unsafe trigger type (%s) detected. Starting with '
'Beam 2.34, this will raise an error by default. '
'Either change the pipeline to use a safe trigger or '
'set the --allow_unsafe_triggers flag.',
self.label,
trigger,
reason_msg)
unsafe_reason)
else:
msg = '{}: Unsafe trigger: `{}` may lose data. '.format(
self.label, trigger)
msg += 'Reason: {}. '.format(reason_msg)
msg = 'Unsafe trigger: `{}` may lose data. '.format(trigger)
msg += 'Reason: {}. '.format(
str(unsafe_reason).replace('DataLossReason.', ''))
msg += 'This can be overriden with the --allow_unsafe_triggers flag.'
raise ValueError(msg)

Expand Down

0 comments on commit 1a7271d

Please sign in to comment.