[DO NOT MERGE] Run all PostCommit and PreCommit Tests against Release Branch #33870
6 fail, 20 skipped, 9 pass in 31m 57s
Annotations
Check warning on line 0 in apache_beam.examples.wordcount_debugging_test.WordCountDebuggingTest
github-actions / Test Results
test_basics (apache_beam.examples.wordcount_debugging_test.WordCountDebuggingTest) failed
sdks/python/pytest_postCommitIT-df-py312-xdist.xml [took 15m 6s]
Raw output
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Workflow failed.
self = <apache_beam.examples.wordcount_debugging_test.WordCountDebuggingTest testMethod=test_basics>
def test_basics(self):
test_pipeline = TestPipeline(is_integration_test=True)
# Setup the files with expected content.
temp_location = test_pipeline.get_option('temp_location')
temp_path = '/'.join([temp_location, str(uuid.uuid4())])
input = create_file('/'.join([temp_path, 'input.txt']), self.SAMPLE_TEXT)
extra_opts = {'input': input, 'output': '%s.result' % temp_path}
expected_words = [('Flourish', 3), ('stomach', 1)]
> wordcount_debugging.run(
test_pipeline.get_full_options_as_args(**extra_opts),
save_main_session=False)
apache_beam/examples/wordcount_debugging_test.py:57:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/examples/wordcount_debugging.py:152: in run
with beam.Pipeline(options=pipeline_options) as p:
apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
apache_beam/pipeline.py:594: in run
self._options).run(False)
apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/dataflow/test_dataflow_runner.py:66: in run_pipeline
self.result.wait_until_finish(duration=wait_duration)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <DataflowPipelineResult <Job
clientRequestId: '20250205181700921434-5807'
createTime: '2025-02-05T18:17:02.478006Z'
...025-02-05T18:17:02.478006Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)> at 0x7b12a33461b0>
duration = None
def wait_until_finish(self, duration=None):
if not self.is_in_terminal_state():
if not self.has_job:
raise IOError('Failed to get the Dataflow job id.')
consoleUrl = (
"Console URL: https://console.cloud.google.com/"
f"dataflow/jobs/<RegionId>/{self.job_id()}"
"?project=<ProjectId>")
thread = threading.Thread(
target=DataflowRunner.poll_for_job_completion,
args=(self._runner, self, duration))
# Mark the thread as a daemon thread so a keyboard interrupt on the main
# thread will terminate everything. This is also the reason we will not
# use thread.join() to wait for the polling thread.
thread.daemon = True
thread.start()
while thread.is_alive():
time.sleep(5.0)
# TODO: Merge the termination code in poll_for_job_completion and
# is_in_terminal_state.
terminated = self.is_in_terminal_state()
assert duration or terminated, (
'Job did not reach to a terminal state after waiting indefinitely. '
'{}'.format(consoleUrl))
if terminated and self.state != PipelineState.DONE:
# TODO(BEAM-1290): Consider converting this to an error log based on
# theresolution of the issue.
_LOGGER.error(consoleUrl)
> raise DataflowRuntimeException(
'Dataflow pipeline failed. State: %s, Error:\n%s' %
(self.state, getattr(self._runner, 'last_error_msg', None)),
E apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
E Workflow failed.
apache_beam/runners/dataflow/dataflow_runner.py:807: DataflowRuntimeException
Check warning on line 0 in apache_beam.examples.complete.game.game_stats_it_test.GameStatsIT
github-actions / Test Results
test_game_stats_it (apache_beam.examples.complete.game.game_stats_it_test.GameStatsIT) failed
sdks/python/pytest_postCommitIT-df-py312-xdist.xml [took 18m 37s]
Raw output
google.api_core.exceptions.NotFound: 404 Not found: Table apache-beam-testing:game_stats_it_dataset17387794206fd5ed.game_stats_sessions was not found in location US; reason: notFound, message: Not found: Table apache-beam-testing:game_stats_it_dataset17387794206fd5ed.game_stats_sessions was not found in location US
Location: US
Job ID: a7701c30-72f6-4654-9ca1-bd205074c1b0
self = <apache_beam.examples.complete.game.game_stats_it_test.GameStatsIT testMethod=test_game_stats_it>
@pytest.mark.it_postcommit
@pytest.mark.examples_postcommit
# TODO(https://github.com/apache/beam/issues/21300) This example only works in
# Dataflow, remove mark to enable for other runners when fixed
@pytest.mark.sickbay_direct
@pytest.mark.sickbay_spark
@pytest.mark.sickbay_flink
def test_game_stats_it(self):
state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
success_condition = 'mean_duration=300 LIMIT 1'
sessions_query = (
'SELECT mean_duration FROM `%s.%s.%s` '
'WHERE %s' % (
self.project,
self.dataset_ref.dataset_id,
self.OUTPUT_TABLE_SESSIONS,
success_condition))
bq_sessions_verifier = BigqueryMatcher(
self.project, sessions_query, self.DEFAULT_EXPECTED_CHECKSUM)
# TODO(mariagh): Add teams table verifier once game_stats.py is fixed.
extra_opts = {
'subscription': self.input_sub.name,
'dataset': self.dataset_ref.dataset_id,
'topic': self.input_topic.name,
'fixed_window_duration': 1,
'user_activity_window_duration': 1,
'wait_until_finish_duration': self.WAIT_UNTIL_FINISH_DURATION,
'on_success_matcher': all_of(state_verifier, bq_sessions_verifier)
}
# Register cleanup before pipeline execution.
# Note that actual execution happens in reverse order.
self.addCleanup(self._cleanup_pubsub)
self.addCleanup(utils.delete_bq_dataset, self.project, self.dataset_ref)
# Generate input data and inject to PubSub.
self._inject_pubsub_game_events(self.input_topic, self.DEFAULT_INPUT_COUNT)
# Get pipeline options from command argument: --test-pipeline-options,
# and start pipeline job by calling pipeline main function.
> game_stats.run(
self.test_pipeline.get_full_options_as_args(**extra_opts),
save_main_session=False)
apache_beam/examples/complete/game/game_stats_it_test.py:150:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/examples/complete/game/game_stats.py:301: in run
with beam.Pipeline(options=options) as p:
apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
apache_beam/pipeline.py:594: in run
self._options).run(False)
apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/dataflow/test_dataflow_runner.py:70: in run_pipeline
hc_assert_that(self.result, pickler.loads(on_success_matcher))
../../build/gradleenv/2050596100/lib/python3.12/site-packages/hamcrest/core/core/allof.py:26: in matches
if not matcher.matches(item):
../../build/gradleenv/2050596100/lib/python3.12/site-packages/hamcrest/core/base_matcher.py:39: in matches
match_result = self._matches(item)
apache_beam/io/gcp/tests/bigquery_matcher.py:110: in _matches
get_checksum()
apache_beam/utils/retry.py:298: in wrapper
return fun(*args, **kwargs)
apache_beam/io/gcp/tests/bigquery_matcher.py:95: in get_checksum
response = self._query_with_retry()
apache_beam/utils/retry.py:311: in wrapper
raise exn.with_traceback(exn_traceback)
apache_beam/utils/retry.py:298: in wrapper
return fun(*args, **kwargs)
apache_beam/io/gcp/tests/bigquery_matcher.py:125: in _query_with_retry
rows = query_job.result(timeout=60)
../../build/gradleenv/2050596100/lib/python3.12/site-packages/google/cloud/bigquery/job/query.py:1688: in result
while not is_job_done():
../../build/gradleenv/2050596100/lib/python3.12/site-packages/google/api_core/retry/retry_unary.py:293: in retry_wrapped_func
return retry_target(
../../build/gradleenv/2050596100/lib/python3.12/site-packages/google/api_core/retry/retry_unary.py:153: in retry_target
_retry_error_helper(
../../build/gradleenv/2050596100/lib/python3.12/site-packages/google/api_core/retry/retry_base.py:212: in _retry_error_helper
raise final_exc from source_exc
../../build/gradleenv/2050596100/lib/python3.12/site-packages/google/api_core/retry/retry_unary.py:144: in retry_target
result = target()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
def is_job_done():
nonlocal restart_query_job
if restart_query_job:
restart_query_job = False
# The original job has failed. Create a new one.
#
# Note that we won't get here if retry_do_query is
# None, because we won't use a retry.
job = retry_do_query()
# Become the new job:
self.__dict__.clear()
self.__dict__.update(job.__dict__)
# It's possible the job fails again and we'll have to
# retry that too.
self._retry_do_query = retry_do_query
self._job_retry = job_retry
# If the job hasn't been created, create it now. Related:
# https://github.com/googleapis/python-bigquery/issues/1940
if self.state is None:
self._begin(retry=retry, **done_kwargs)
# Refresh the job status with jobs.get because some of the
# exceptions thrown by jobs.getQueryResults like timeout and
# rateLimitExceeded errors are ambiguous. We want to know if
# the query job failed and not just the call to
# jobs.getQueryResults.
if self.done(retry=retry, **done_kwargs):
# If it's already failed, we might as well stop.
job_failed_exception = self.exception()
if job_failed_exception is not None:
# Only try to restart the query job if the job failed for
# a retriable reason. For example, don't restart the query
# if the call to reload the job metadata within self.done()
# timed out.
#
# The `restart_query_job` must only be called after a
# successful call to the `jobs.get` REST API and we
# determine that the job has failed.
#
# The `jobs.get` REST API
# (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get)
# is called via `self.done()` which calls
# `self.reload()`.
#
# To determine if the job failed, the `self.exception()`
# is set from `self.reload()` via
# `self._set_properties()`, which translates the
# `Job.status.errorResult` field
# (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.error_result)
# into an exception that can be processed by the
# `job_retry` predicate.
restart_query_job = True
> raise job_failed_exception
E google.api_core.exceptions.NotFound: 404 Not found: Table apache-beam-testing:game_stats_it_dataset17387794206fd5ed.game_stats_sessions was not found in location US; reason: notFound, message: Not found: Table apache-beam-testing:game_stats_it_dataset17387794206fd5ed.game_stats_sessions was not found in location US
E
E Location: US
E Job ID: a7701c30-72f6-4654-9ca1-bd205074c1b0
../../build/gradleenv/2050596100/lib/python3.12/site-packages/google/cloud/bigquery/job/query.py:1630: NotFound
Check warning on line 0 in apache_beam.examples.complete.game.leader_board_it_test.LeaderBoardIT
github-actions / Test Results
test_leader_board_it (apache_beam.examples.complete.game.leader_board_it_test.LeaderBoardIT) failed
sdks/python/pytest_postCommitIT-df-py312-xdist.xml [took 18m 39s]
Raw output
google.api_core.exceptions.NotFound: 404 Not found: Table apache-beam-testing:leader_board_it_dataset173877942017573c.leader_board_users was not found in location US; reason: notFound, message: Not found: Table apache-beam-testing:leader_board_it_dataset173877942017573c.leader_board_users was not found in location US
Location: US
Job ID: 163a35f9-693d-4ee3-8fc0-247111d391e9
self = <apache_beam.examples.complete.game.leader_board_it_test.LeaderBoardIT testMethod=test_leader_board_it>
@pytest.mark.it_postcommit
@pytest.mark.examples_postcommit
# TODO(https://github.com/apache/beam/issues/21300) This example only works
# in Dataflow, remove mark to enable for other runners when fixed
@pytest.mark.sickbay_direct
@pytest.mark.sickbay_spark
@pytest.mark.sickbay_flink
def test_leader_board_it(self):
state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
success_condition = 'total_score=5000 LIMIT 1'
users_query = (
'SELECT total_score FROM `%s.%s.%s` '
'WHERE %s' % (
self.project,
self.dataset_ref.dataset_id,
self.OUTPUT_TABLE_USERS,
success_condition))
bq_users_verifier = BigqueryMatcher(
self.project, users_query, self.DEFAULT_EXPECTED_CHECKSUM)
teams_query = (
'SELECT total_score FROM `%s.%s.%s` '
'WHERE %s' % (
self.project,
self.dataset_ref.dataset_id,
self.OUTPUT_TABLE_TEAMS,
success_condition))
bq_teams_verifier = BigqueryMatcher(
self.project, teams_query, self.DEFAULT_EXPECTED_CHECKSUM)
extra_opts = {
'allow_unsafe_triggers': True,
'subscription': self.input_sub.name,
'dataset': self.dataset_ref.dataset_id,
'topic': self.input_topic.name,
'team_window_duration': 1,
'wait_until_finish_duration': self.WAIT_UNTIL_FINISH_DURATION,
'on_success_matcher': all_of(
state_verifier, bq_users_verifier, bq_teams_verifier)
}
# Register cleanup before pipeline execution.
# Note that actual execution happens in reverse order.
self.addCleanup(self._cleanup_pubsub)
self.addCleanup(utils.delete_bq_dataset, self.project, self.dataset_ref)
# Generate input data and inject to PubSub.
self._inject_pubsub_game_events(self.input_topic, self.DEFAULT_INPUT_COUNT)
# Get pipeline options from command argument: --test-pipeline-options,
# and start pipeline job by calling pipeline main function.
> leader_board.run(
self.test_pipeline.get_full_options_as_args(**extra_opts),
save_main_session=False)
apache_beam/examples/complete/game/leader_board_it_test.py:160:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/examples/complete/game/leader_board.py:319: in run
with beam.Pipeline(options=options) as p:
apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
apache_beam/pipeline.py:594: in run
self._options).run(False)
apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/dataflow/test_dataflow_runner.py:70: in run_pipeline
hc_assert_that(self.result, pickler.loads(on_success_matcher))
../../build/gradleenv/2050596100/lib/python3.12/site-packages/hamcrest/core/core/allof.py:26: in matches
if not matcher.matches(item):
../../build/gradleenv/2050596100/lib/python3.12/site-packages/hamcrest/core/base_matcher.py:39: in matches
match_result = self._matches(item)
apache_beam/io/gcp/tests/bigquery_matcher.py:110: in _matches
get_checksum()
apache_beam/utils/retry.py:298: in wrapper
return fun(*args, **kwargs)
apache_beam/io/gcp/tests/bigquery_matcher.py:95: in get_checksum
response = self._query_with_retry()
apache_beam/utils/retry.py:311: in wrapper
raise exn.with_traceback(exn_traceback)
apache_beam/utils/retry.py:298: in wrapper
return fun(*args, **kwargs)
apache_beam/io/gcp/tests/bigquery_matcher.py:125: in _query_with_retry
rows = query_job.result(timeout=60)
../../build/gradleenv/2050596100/lib/python3.12/site-packages/google/cloud/bigquery/job/query.py:1688: in result
while not is_job_done():
../../build/gradleenv/2050596100/lib/python3.12/site-packages/google/api_core/retry/retry_unary.py:293: in retry_wrapped_func
return retry_target(
../../build/gradleenv/2050596100/lib/python3.12/site-packages/google/api_core/retry/retry_unary.py:153: in retry_target
_retry_error_helper(
../../build/gradleenv/2050596100/lib/python3.12/site-packages/google/api_core/retry/retry_base.py:212: in _retry_error_helper
raise final_exc from source_exc
../../build/gradleenv/2050596100/lib/python3.12/site-packages/google/api_core/retry/retry_unary.py:144: in retry_target
result = target()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
def is_job_done():
nonlocal restart_query_job
if restart_query_job:
restart_query_job = False
# The original job has failed. Create a new one.
#
# Note that we won't get here if retry_do_query is
# None, because we won't use a retry.
job = retry_do_query()
# Become the new job:
self.__dict__.clear()
self.__dict__.update(job.__dict__)
# It's possible the job fails again and we'll have to
# retry that too.
self._retry_do_query = retry_do_query
self._job_retry = job_retry
# If the job hasn't been created, create it now. Related:
# https://github.com/googleapis/python-bigquery/issues/1940
if self.state is None:
self._begin(retry=retry, **done_kwargs)
# Refresh the job status with jobs.get because some of the
# exceptions thrown by jobs.getQueryResults like timeout and
# rateLimitExceeded errors are ambiguous. We want to know if
# the query job failed and not just the call to
# jobs.getQueryResults.
if self.done(retry=retry, **done_kwargs):
# If it's already failed, we might as well stop.
job_failed_exception = self.exception()
if job_failed_exception is not None:
# Only try to restart the query job if the job failed for
# a retriable reason. For example, don't restart the query
# if the call to reload the job metadata within self.done()
# timed out.
#
# The `restart_query_job` must only be called after a
# successful call to the `jobs.get` REST API and we
# determine that the job has failed.
#
# The `jobs.get` REST API
# (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get)
# is called via `self.done()` which calls
# `self.reload()`.
#
# To determine if the job failed, the `self.exception()`
# is set from `self.reload()` via
# `self._set_properties()`, which translates the
# `Job.status.errorResult` field
# (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.error_result)
# into an exception that can be processed by the
# `job_retry` predicate.
restart_query_job = True
> raise job_failed_exception
E google.api_core.exceptions.NotFound: 404 Not found: Table apache-beam-testing:leader_board_it_dataset173877942017573c.leader_board_users was not found in location US; reason: notFound, message: Not found: Table apache-beam-testing:leader_board_it_dataset173877942017573c.leader_board_users was not found in location US
E
E Location: US
E Job ID: 163a35f9-693d-4ee3-8fc0-247111d391e9
../../build/gradleenv/2050596100/lib/python3.12/site-packages/google/cloud/bigquery/job/query.py:1630: NotFound
Check warning on line 0 in apache_beam.examples.wordcount_minimal_test.WordCountMinimalTest
github-actions / Test Results
test_basics (apache_beam.examples.wordcount_minimal_test.WordCountMinimalTest) failed
sdks/python/pytest_postCommitIT-df-py312-xdist.xml [took 14m 35s]
Raw output
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Workflow failed.
self = <apache_beam.examples.wordcount_minimal_test.WordCountMinimalTest testMethod=test_basics>
def test_basics(self):
test_pipeline = TestPipeline(is_integration_test=True)
# Setup the files with expected content.
temp_location = test_pipeline.get_option('temp_location')
temp_path = '/'.join([temp_location, str(uuid.uuid4())])
input = create_file('/'.join([temp_path, 'input.txt']), self.SAMPLE_TEXT)
extra_opts = {'input': input, 'output': '%s.result' % temp_path}
expected_words = collections.defaultdict(int)
for word in re.findall(r'\w+', self.SAMPLE_TEXT):
expected_words[word] += 1
> wordcount_minimal.main(
test_pipeline.get_full_options_as_args(**extra_opts),
save_main_session=False)
apache_beam/examples/wordcount_minimal_test.py:54:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/examples/wordcount_minimal.py:120: in main
with beam.Pipeline(options=pipeline_options) as p:
apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
apache_beam/pipeline.py:594: in run
self._options).run(False)
apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/dataflow/test_dataflow_runner.py:66: in run_pipeline
self.result.wait_until_finish(duration=wait_duration)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <DataflowPipelineResult <Job
clientRequestId: '20250205181700269982-5807'
createTime: '2025-02-05T18:17:01.940277Z'
...025-02-05T18:17:01.940277Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)> at 0x79f8279042c0>
duration = None
def wait_until_finish(self, duration=None):
if not self.is_in_terminal_state():
if not self.has_job:
raise IOError('Failed to get the Dataflow job id.')
consoleUrl = (
"Console URL: https://console.cloud.google.com/"
f"dataflow/jobs/<RegionId>/{self.job_id()}"
"?project=<ProjectId>")
thread = threading.Thread(
target=DataflowRunner.poll_for_job_completion,
args=(self._runner, self, duration))
# Mark the thread as a daemon thread so a keyboard interrupt on the main
# thread will terminate everything. This is also the reason we will not
# use thread.join() to wait for the polling thread.
thread.daemon = True
thread.start()
while thread.is_alive():
time.sleep(5.0)
# TODO: Merge the termination code in poll_for_job_completion and
# is_in_terminal_state.
terminated = self.is_in_terminal_state()
assert duration or terminated, (
'Job did not reach to a terminal state after waiting indefinitely. '
'{}'.format(consoleUrl))
if terminated and self.state != PipelineState.DONE:
# TODO(BEAM-1290): Consider converting this to an error log based on
# theresolution of the issue.
_LOGGER.error(consoleUrl)
> raise DataflowRuntimeException(
'Dataflow pipeline failed. State: %s, Error:\n%s' %
(self.state, getattr(self._runner, 'last_error_msg', None)),
E apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
E Workflow failed.
apache_beam/runners/dataflow/dataflow_runner.py:807: DataflowRuntimeException
Check warning on line 0 in apache_beam.examples.wordcount_test.WordCountTest
github-actions / Test Results
test_basics (apache_beam.examples.wordcount_test.WordCountTest) failed
sdks/python/pytest_postCommitIT-df-py312-xdist.xml [took 17m 35s]
Raw output
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Workflow failed.
self = <apache_beam.examples.wordcount_test.WordCountTest testMethod=test_basics>
def test_basics(self):
test_pipeline = TestPipeline(is_integration_test=True)
# Setup the files with expected content.
temp_location = test_pipeline.get_option('temp_location')
temp_path = '/'.join([temp_location, str(uuid.uuid4())])
input = create_file('/'.join([temp_path, 'input.txt']), self.SAMPLE_TEXT)
extra_opts = {'input': input, 'output': '%s.result' % temp_path}
expected_words = collections.defaultdict(int)
for word in re.findall(r'[\w\']+', self.SAMPLE_TEXT, re.UNICODE):
expected_words[word] += 1
> wordcount.run(
test_pipeline.get_full_options_as_args(**extra_opts),
save_main_session=False)
apache_beam/examples/wordcount_test.py:53:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/examples/wordcount.py:109: in run
result = pipeline.run()
apache_beam/pipeline.py:594: in run
self._options).run(False)
apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/dataflow/test_dataflow_runner.py:66: in run_pipeline
self.result.wait_until_finish(duration=wait_duration)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <DataflowPipelineResult <Job
clientRequestId: '20250205181700545962-5807'
createTime: '2025-02-05T18:17:02.141035Z'
...025-02-05T18:17:02.141035Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)> at 0x7a706b068860>
duration = None
def wait_until_finish(self, duration=None):
if not self.is_in_terminal_state():
if not self.has_job:
raise IOError('Failed to get the Dataflow job id.')
consoleUrl = (
"Console URL: https://console.cloud.google.com/"
f"dataflow/jobs/<RegionId>/{self.job_id()}"
"?project=<ProjectId>")
thread = threading.Thread(
target=DataflowRunner.poll_for_job_completion,
args=(self._runner, self, duration))
# Mark the thread as a daemon thread so a keyboard interrupt on the main
# thread will terminate everything. This is also the reason we will not
# use thread.join() to wait for the polling thread.
thread.daemon = True
thread.start()
while thread.is_alive():
time.sleep(5.0)
# TODO: Merge the termination code in poll_for_job_completion and
# is_in_terminal_state.
terminated = self.is_in_terminal_state()
assert duration or terminated, (
'Job did not reach to a terminal state after waiting indefinitely. '
'{}'.format(consoleUrl))
if terminated and self.state != PipelineState.DONE:
# TODO(BEAM-1290): Consider converting this to an error log based on
# theresolution of the issue.
_LOGGER.error(consoleUrl)
> raise DataflowRuntimeException(
'Dataflow pipeline failed. State: %s, Error:\n%s' %
(self.state, getattr(self._runner, 'last_error_msg', None)),
E apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
E Workflow failed.
apache_beam/runners/dataflow/dataflow_runner.py:807: DataflowRuntimeException
github-actions / Test Results
test_custom_ptransform_output_files_on_small_input (apache_beam.examples.cookbook.custom_ptransform_it_test.CustomPTransformIT) failed
sdks/python/pytest_postCommitIT-df-py312-xdist.xml [took 15m 21s]
Raw output
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Workflow failed.
self = <apache_beam.examples.cookbook.custom_ptransform_it_test.CustomPTransformIT testMethod=test_custom_ptransform_output_files_on_small_input>
@pytest.mark.examples_postcommit
def test_custom_ptransform_output_files_on_small_input(self):
test_pipeline = TestPipeline(is_integration_test=True)
# Setup the files with expected content.
temp_location = test_pipeline.get_option('temp_location')
input = '/'.join([temp_location, str(uuid.uuid4()), 'input.txt'])
output = '/'.join([temp_location, str(uuid.uuid4()), 'result'])
create_file(input, ' '.join(self.WORDS))
extra_opts = {'input': input, 'output': output}
> custom_ptransform.run(test_pipeline.get_full_options_as_args(**extra_opts))
apache_beam/examples/cookbook/custom_ptransform_it_test.py:61:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/examples/cookbook/custom_ptransform.py:124: in run
run_count1(known_args, PipelineOptions(pipeline_args))
apache_beam/examples/cookbook/custom_ptransform.py:49: in run_count1
with beam.Pipeline(options=options) as p:
apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
apache_beam/pipeline.py:594: in run
self._options).run(False)
apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/dataflow/test_dataflow_runner.py:66: in run_pipeline
self.result.wait_until_finish(duration=wait_duration)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <DataflowPipelineResult <Job
clientRequestId: '20250205181702110692-5794'
createTime: '2025-02-05T18:17:03.502033Z'
...025-02-05T18:17:03.502033Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)> at 0x7d578bc35040>
duration = None
def wait_until_finish(self, duration=None):
if not self.is_in_terminal_state():
if not self.has_job:
raise IOError('Failed to get the Dataflow job id.')
consoleUrl = (
"Console URL: https://console.cloud.google.com/"
f"dataflow/jobs/<RegionId>/{self.job_id()}"
"?project=<ProjectId>")
thread = threading.Thread(
target=DataflowRunner.poll_for_job_completion,
args=(self._runner, self, duration))
# Mark the thread as a daemon thread so a keyboard interrupt on the main
# thread will terminate everything. This is also the reason we will not
# use thread.join() to wait for the polling thread.
thread.daemon = True
thread.start()
while thread.is_alive():
time.sleep(5.0)
# TODO: Merge the termination code in poll_for_job_completion and
# is_in_terminal_state.
terminated = self.is_in_terminal_state()
assert duration or terminated, (
'Job did not reach to a terminal state after waiting indefinitely. '
'{}'.format(consoleUrl))
if terminated and self.state != PipelineState.DONE:
# TODO(BEAM-1290): Consider converting this to an error log based on
# theresolution of the issue.
_LOGGER.error(consoleUrl)
> raise DataflowRuntimeException(
'Dataflow pipeline failed. State: %s, Error:\n%s' %
(self.state, getattr(self._runner, 'last_error_msg', None)),
E apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
E Workflow failed.
apache_beam/runners/dataflow/dataflow_runner.py:807: DataflowRuntimeException