Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: GCSIO delete_batch breaks dataflow pipeline in 2.53 #30166

Closed
2 of 16 tasks
lgeiger opened this issue Jan 31, 2024 · 6 comments · Fixed by #30188
Closed
2 of 16 tasks

[Bug]: GCSIO delete_batch breaks dataflow pipeline in 2.53 #30166

lgeiger opened this issue Jan 31, 2024 · 6 comments · Fixed by #30188

Comments

@lgeiger
Copy link

lgeiger commented Jan 31, 2024

What happened?

I am upgrading Beam from version 2.52 to 2.53 but it looks like I am running into an issue introduced in #25676.

When trying to call delete_batch inside beam.Map something like this:

    def teardown(self):
        def cleanup(count):
            client = GcsIO()
            blobs = client.list_prefix(self.resultio.path)
            client.delete_batch(blobs)

        return (
            "Count all elements" >> beam.combiners.Count.Globally()
            | "Delete blobs" >> beam.Map(cleanup)
        )

I am seeing the following error with Beam 2.53.0 while Beam 2.52.0 works as expected:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 300, in _execute
    response = task()
               ^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 375, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in do_instruction
    return getattr(self, request_type)(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 677, in process_bundle
    bundle_processor.process_bundle(instruction_id))
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1113, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 570, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 572, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 636, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 636, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 851, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 995, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1547, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 637, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/Users/lgeiger/data-infrastructure/.venv/lib/python3.11/site-packages/apache_beam/transforms/core.py", line 1963, in <lambda>
    wrapper = lambda x: [fn(x)]
                         ^^^^^
  File "/Users/lgeiger/data-infrastructure/test.py", line 60, in cleanup
    client.delete_batch(blobs)
  File "/usr/local/lib/python3.11/site-packages/apache_beam/io/gcp/gcsio.py", line 217, in delete_batch
    current_paths = paths[s:s + MAX_BATCH_OPERATION_SIZE]
                    ~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: unhashable type: 'slice'

I am running the pipeline on Dataflow using Python 3.11.

Beam 2.52.0 used to chunk the paths using itertools.islice which avoids having to explicitly use slicing:

paths_chunk = list(islice(paths, MAX_BATCH_OPERATION_SIZE))

@BjornPrime I guess something similar here would fix the above issue, or am I missing something?

Potentially the current code might already work in Python 3.12 since slices will be hashable, but as far as I know Beam doesn't yet provide Python 3.12 containers.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@Abacn
Copy link
Contributor

Abacn commented Jan 31, 2024

Thanks for the report and investigation. Yes this is because Beam 2.53.0 migrated to use google cloud's official GCS client (before that it was using apitool's generated client which was long deprecated)

CC: @shunping

@shunping
Copy link
Contributor

shunping commented Feb 1, 2024

I checked the code in Beam 2.52 and 2.53. Here is what I found out.

In other words, both function signatures haven't been modified. I think the way it worked before is not the right way to call delete_batch. It happened to work in 2.52 because of its internal implementation: calling iter() on a dictionary returns the key iterator (https://github.com/apache/beam/blob/release-2.52.0/sdks/python/apache_beam/io/gcp/gcsio.py#L301), and then we get a list of keys from there (https://github.com/apache/beam/blob/release-2.52.0/sdks/python/apache_beam/io/gcp/gcsio.py#L304). It doesn't matter whether we use itertools.islice or not.

@shunping
Copy link
Contributor

shunping commented Feb 1, 2024

From the API, the correct way to call delete_batch after list_prefix should be

blobs = client.list_prefix(self.resultio.path)
client.delete_batch(list(blobs.keys(())

@Abacn
Copy link
Contributor

Abacn commented Feb 1, 2024

That's interesting. But I would imagine feeding delete_batch with the result of list_prefix is a common use case. Let's just add that line so delete_batch compatibile both with list and dict input

@shunping
Copy link
Contributor

shunping commented Feb 1, 2024

That's interesting. But I would imagine feeding delete_batch with the result of list_prefix is a common use case. Let's just add that line so delete_batch compatibile both with list and dict input

Ok. SGTM

@github-actions github-actions bot added this to the 2.55.0 Release milestone Feb 1, 2024
@lgeiger
Copy link
Author

lgeiger commented Feb 2, 2024

Great, thanks for the fast fix!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants