Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: beam_LoadTests_Python_Combine_Dataflow_Streaming failing #23904

Closed
kileys opened this issue Oct 31, 2022 · 12 comments · Fixed by #30655
Closed

[Bug]: beam_LoadTests_Python_Combine_Dataflow_Streaming failing #23904

kileys opened this issue Oct 31, 2022 · 12 comments · Fixed by #30655

Comments

@kileys
Copy link
Contributor

kileys commented Oct 31, 2022

What happened?

Jenkins test has been failing since October

Issue Priority

Priority: 1

Issue Component

Component: test-failures

@kileys
Copy link
Contributor Author

kileys commented Oct 31, 2022

@jrmccluskey

@github-actions github-actions bot added the P1 label Oct 31, 2022
@tvalentyn
Copy link
Contributor

This is a known issue, I believe this test has not been working successfully since almost when it was added.

@tvalentyn tvalentyn added P2 and removed P1 labels Nov 1, 2022
@riteshghorse
Copy link
Contributor

riteshghorse commented Nov 1, 2022

Yeah, I found two similar bugs #22692, #22436. I checked the console logs for few jobs in last 15 days and all of them had a same pattern where it is initializing SDK harness and a cancel request is committed within 5 minutes of it. Unfortunately, there is not much information apart from this. Memory and CPU utilization looks alright.

@tvalentyn
Copy link
Contributor

maybe we need to increase the timeout?

@Abacn
Copy link
Contributor

Abacn commented Jan 11, 2023

This is not due to timeout; this is a streaming pipeline it does not end until SyntheticSource has emitted all results (200,000,000). However from Dataflow UI it shows this counter has never reached. Then the pipeline runs to 4 hour until the test task timeout reached.

GBK Streaming load test runs a similar pipeline but the pipeline ends properly. Not sure where is the difference.

@tvalentyn
Copy link
Contributor

Took a look and reached same conclusion, looks like the job gets stuck in Synthetic source.

Test code: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/testing/load_tests/combine_test.py

Test configuration: https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_LoadTests_Combine_Python.groovy#L26-L50

For reference: GBK test configuration:

title : 'GroupByKey Python Load test: 2GB of 100kB records',
test : 'apache_beam.testing.load_tests.group_by_key_test',
runner : CommonTestProperties.Runner.DATAFLOW,
pipelineOptions: [
job_name : "load-tests-python-dataflow-${mode}-gbk-3-${now}",
project : 'apache-beam-testing',
region : 'us-central1',
temp_location : 'gs://temp-storage-for-perf-tests/loadtests',
publish_to_big_query : true,
metrics_dataset : datasetName,
metrics_table : "python_dataflow_${mode}_gbk_3",
influx_measurement : "python_${mode}_gbk_3",
input_options : '\'{"num_records": 20000,' +
'"key_size": 10000,' +
'"value_size": 90000,' +
'"algorithm": "lcg"}\'',
iterations : 1,
fanout : 1,
num_workers : 5,
autoscaling_algorithm: 'NONE',
]

Synthetic source: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/testing/synthetic_pipeline.py

@tvalentyn
Copy link
Contributor

next step would be to investigate performance of synthetic source with different parameters. Seems like it may fail when running at a bigger scale.

@tvalentyn
Copy link
Contributor

After many hours of running, we can see active bundles with threads in Synthetic source code:

--- Thread #139811008521984 name: Thread-16 and other 4 threads---
threads: [(139811008521984, 'Thread-16'), (139811025307392, 'Thread-17'), (139810463258368, 'Thread-20'), (139810446472960, 'Thread-21'), (139810429687552, 'Thread-23')]
  File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 53, in run
    self._work_item.run()
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.8/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
    self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 371, in task
    self._execute(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 297, in _execute
    response = task()
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 372, in 
    lambda: self.create_worker().do_instruction(request), request)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 625, in do_instruction
    return getattr(self, request_type)(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 663, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1040, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 232, in process_encoded
    self.output(decoded_value)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.8/site-packages/apache_beam/testing/synthetic_pipeline.py", line 492, in read
    time.sleep(self._sleep_per_input_record_sec)

@tvalentyn
Copy link
Contributor

Ok. It's a resource starvation issue. The job struggles for hours to fit in memory for hours, but doesn't oom.

Takes ~10 minutes on n1-highmem-2.

@tvalentyn
Copy link
Contributor

tvalentyn commented Jul 22, 2023

Ok. It's a resource starvation issue. The job struggles for hours to fit in memory for hours, but doesn't oom.

Takes ~10 minutes on n1-highmem-2.

actually, that's not right. Turns out, I ran a batch job, and it finished fast, but running this test in streaming mode results in poor performance and memory pressure.

@liferoad liferoad self-assigned this Dec 7, 2023
@github-actions github-actions bot added this to the 2.56.0 Release milestone Mar 19, 2024
@Abacn
Copy link
Contributor

Abacn commented Mar 21, 2024

Bump to using 50 workers the test passed. It tokes 2 h to run. Throughput is like this:

input/output PCollection of GBK:

image


However, using 5 worker the test isn't a matter of not finish in time, the pipeline just stucks after some time:

input/output PCollection of GBK:

image

and there were worker crash happened throughout the pipeline ran:

number of worker:

image

memory usage:

image

In summary, what happens is

  • If the num of worker is not large, each worker appear to accumulate more works, and causing OOM eventually, and the pipeline get stuck (persumably repeat retry - oom - retry - oom)

  • If there are sufficient number of workers, no worker crash, the data can be processed in time, though slowly

@Abacn
Copy link
Contributor

Abacn commented Mar 29, 2024

THe memory profile (pipeline option --profile_memory --profile_cpu) wasn't quite helpful. The generated memory profiles look like the following

Partition of a set of 22679 objects. Total size = 2069184 bytes.
 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
     0   9796  43   912749  44    912749  44 str
     1   4565  20   324624  16   1237373  60 tuple
     2    962   4   269480  13   1506853  73 dict (no owner)
     3    194   1   180983   9   1687836  82 bytes
     4   5185  23   124440   6   1812276  88 float
     5     39   0    91312   4   1903588  92 list
     6   1180   5    33192   2   1936780  94 int
     7     50   0    18992   1   1955772  95 frozenset
     8     92   0    16200   1   1971972  95 types.CodeType
     9    110   0    14960   1   1986932  96 function
<125 more rows. Type e.g. '_.more' to view.>
 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
    10      9   0     9984   0   1996916  97 abc.ABCMeta
    11      9   0     8976   0   2005892  97 dict of module
    12     72   0     5760   0   2011652  97
                                             org.apache.beam.model.pipeline.v1.metrics_pb2.Monitorin
                                             gInfo
    13      9   0     4896   0   2016548  97 dict of abc.ABCMeta
    14      4   0     4256   0   2020804  98 type
    15      6   0     3476   0   2024280  98 re.Pattern
    16      5   0     3200   0   2027480  98 dict of
                                             apache_beam.testing.synthetic_pipeline.SyntheticSource
    17      1   0     2552   0   2030032  98 random.Random
    18      2   0     2400   0   2032432  98 urllib.parse.Quoter
    19      3   0     1920   0   2034352  98 dict of threading.Thread
<115 more rows. Type e.g. '_.more' to view.>

or

Partition of a set of 16762 objects. Total size = 3649165 bytes.
 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
     0      9   0  2189046  60   2189046  60 bytes
     1   9966  59   901079  25   3090125  85 str
     2   2322  14   162448   4   3252573  89 tuple
     3    528   3   141944   4   3394517  93 dict (no owner)
     4     51   0    97704   3   3492221  96 list
     5   2671  16    64104   2   3556325  97 float
     6    703   4    19800   1   3576125  98 int
     7     19   0    11856   0   3587981  98 collections.deque
     8     32   0     6912   0   3594893  99 frozenset
     9      8   0     6048   0   3600941  99 re.Pattern
<73 more rows. Type e.g. '_.more' to view.>
 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
    10     68   0     5440   0   3606381  99
                                             org.apache.beam.model.pipeline.v1.metrics_pb2.Monitorin
                                             gInfo
    11     19   0     4408   0   3610789  99 dict of threading.Condition
    12      6   0     3840   0   3614629  99 dict of
                                             apache_beam.testing.synthetic_pipeline.SyntheticSource
    13     21   0     2856   0   3617485  99 function
    14     39   0     2808   0   3620293  99 types.BuiltinMethodType
    15      4   0     2560   0   3622853  99 dict of threading.Thread
    16      2   0     2128   0   3624981  99 type
    17      5   0     2000   0   3626981  99 dict of urllib3.connection.HTTPSConnection
    18      3   0     1368   0   3628349  99 types.FrameType
    19     34   0     1360   0   3629709  99 types.CellType
<63 more rows. Type e.g. '_.more' to view.>

or

Partition of a set of 3959 objects. Total size = 271112 bytes.
 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
     0   1419  36   100336  37    100336  37 tuple
     1    344   9    98784  36    199120  73 dict (no owner)
     2   1696  43    40704  15    239824  88 float
     3    130   3    11315   4    251139  93 str
     4    289   7     8104   3    259243  96 int
     5      4   0     3024   1    262267  97 re.Pattern
     6      3   0     1368   1    263635  97 types.FrameType
     7     17   0     1360   1    264995  98
                                             org.apache.beam.model.pipeline.v1.metrics_pb2.Monitorin
                                             gInfo
     8      2   0      781   0    265776  98 bytes
     9      5   0      680   0    266456  98 function
<24 more rows. Type e.g. '_.more' to view.>
 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
    10      1   0      640   0    267096  99 dict of threading.Thread
    11      1   0      624   0    267720  99 collections.deque
    12      7   0      608   0    268328  99 list
    13      9   0      360   0    268688  99 types.CellType
    14      8   0      320   0    269008  99 _thread.lock
    15      2   0      256   0    269264  99 grpc._cython.cygrpc.SendMessageOperation
    16      1   0      232   0    269496  99 dict of threading.Condition
    17      2   0      160   0    269656  99
                                             org.apache.beam.model.fn_execution.v1.beam_fn_api_pb2.D
                                             ata
    18      2   0      160   0    269816 100 urllib.parse.SplitResult
    19      2   0      144   0    269960 100 builtins.weakref
<14 more rows. Type e.g. '_.more' to view.>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment