Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Investigate] ParallelRunner does not work with S3-linked data catalog #2162

Open
ethan-isaacson-perch opened this issue Dec 29, 2022 · 8 comments
Labels
Issue: Bug Report 🐞 Bug that needs to be fixed

Comments

@ethan-isaacson-perch
Copy link

Description

The ParallelRunner fails if data catalog entries point to Amazon S3.

Context

We use the ParallelRunner to run a large and highly parallelized pipeline. When our data catalog is connected to the local disk filesystem, everything works. When we attempt to switch the file locations to a functionally identical S3 bucket (using the out-of-the-box location specifications as documented here), we see errors. Further details below, but I believe this is caused by some tricky imports and a pickling failure.

Steps to Reproduce

The code is a bit too involved to wireframe directly here, but in general I believe any session that couples ParallelRunner with S3 catalog objects will throw errors.

Expected Result

The pipeline should run to completion.

Actual Result

We see errors related to the serializability of the catalog objects. Namely:

AttributeError: The following data sets cannot be used with multiprocessing: [<all of our catalog entries connected to s3>]

This error is accompanied by the following message:

In order to utilize multiprocessing you need to make sure all data sets are serialisable, i.e. data sets should not make use of lambda functions, nested functions, closures etc.
If you are using custom decorators ensure they are correctly decorated using functools.wraps().

Further up the traceback we see that the error was tripped here, in runner.parallel_runner.py:

│   221 │   │   if unserialisable:                                                                 │
│ ❱ 222 │   │   │   raise AttributeError(                                                          │
│   223 │   │   │   │   f"The following data sets cannot be used with multiprocessing: "           │
│   224 │   │   │   │   f"{sorted(unserialisable)}\nIn order to utilize multiprocessing you "      │
│   225 │   │   │   │   f"need to make sure all data sets are serialisable, i.e. data sets "    

Your Environment

Kedro version 0.18.2
Python version 3.9.1
Running on Windows 10 Pro 21H2 (also replicated on a Linux instance although I don't have the distro / version details at the moment).

Temporary fix

I have found a way to fix this problem, i.e. allow ParallelRunner to work with S3 datasets, by modifying the Kedro source code locally. I am not sure that this fix is the correct approach, but sharing in case helpful as a head start.

I found that what was happening was the S3FS-enabled catalog objects were unable to be serialized by ForkingPickler. The specific problem seems to be in the creation of glob_func, which uses s3fs.core.S3FileSystem._glob in the case of S3 files, but (I think because of the sequence of imports, somehow), the inherited function's signature does not match what the pickler expects from s3fs.core.S3FileSystem._glob. In general, my solution involves re-instantiating that glob_func at various places so that the signatures match and serialization is possible. (I think. I don't really fully understand what's going on here, and my knowledge / vocabulary of the underlying dynamics is not very good, but the following is what worked for me).

Changes to individual datasets

First, I modified the individual datasets as follows. I did this for each dataset type that I used (e.g. CSVDataSet, ParquetDataSet ... etc.

In __init__(), I added:

...
 self._fs = fsspec.filesystem(self._protocol, **_credentials, **_fs_args)
      
+ if self._protocol == "s3":
+     # this seems to prevent pickling errors
       # it SHOULD be the same thing -- S3FileSystem._glob is what it should be pointing to anyway -- but the pickler thinks it isn't
+    glob_func = s3fs.core.S3FileSystem._glob
+ else:
+    glob_func = self._fs.glob
       
 super().__init__(
...

Theoretically, I could have just defined my own runners without submitting an issue if the above were sufficient. But I found I also needed to make a small modification to core to get things to run:

Changes to io.core.py

In __init__(), in _fetch_latest_load_version(self), I changed the following line:

version_paths = sorted(self._glob_function(pattern), reverse=True)

to:

        try:
            version_paths = sorted(self._glob_function(pattern), reverse=True)
        except TypeError:
           # for some reason _glob_function gets into trouble with the pickler again, whereas
           # self._fs.glob (which should be the same function) does not
            version_paths = sorted(self._fs.glob(pattern), reverse=True)

Again, I have no conviction that the above changes were the right way to do this, but it did get multiprocessing working with S3.

@jmholzer
Copy link
Contributor

jmholzer commented Jan 3, 2023

Hey, thanks so much for this detailed report. This an important problem to investigate, I'll push for it to be added to our next sprint.

@jmholzer jmholzer added Issue: Bug Report 🐞 Bug that needs to be fixed Community Issue/PR opened by the open-source community labels Jan 3, 2023
@merelcht merelcht removed the Community Issue/PR opened by the open-source community label Apr 12, 2023
@astrojuanlu
Copy link
Member

We'll try to reproduce this error, should appear with any dataset and an S3 bucket when using the parallel runner.

@astrojuanlu astrojuanlu changed the title ParallelRunner does not work with S3-linked data catalog. [Investigate] ParallelRunner does not work with S3-linked data catalog Apr 24, 2023
@merelcht
Copy link
Member

Closing this issue as it hasn't had any recent activity.

@merelcht merelcht closed this as not planned Won't fix, can't repro, duplicate, stale Dec 13, 2023
@BielStela
Copy link

BielStela commented Jan 24, 2024

Hi! I had the same problem. I've been using ParallelRunner happily with two datasets. Then added a dataset stored in s3 and exactly the same issue described by OP happened to me.

The Pickle error is:

PicklingError: Can't pickle <function S3FileSystem._glob at 0x7fb02c74f4c0>: it's not the same object as s3fs.core.S3FileSystem._glob

@astrojuanlu
Copy link
Member

Thanks @BielStela, reopening. Can you confirm you were using the latest Kedro version? Also let us know Python version and operating system

@astrojuanlu astrojuanlu reopened this Jan 24, 2024
@BielStela
Copy link

Sure, I'm using

  • Kedro 0.19.1
  • Linux 6.5.0-14-generic 22.04.1-Ubuntu x86_64 GNU/Linux
  • Python 3.11.7

@BielStela
Copy link

for more context, a fix that worked is what OP did to the dataset. I'm using a custom dataset and adding just this to the __init__ of the class fixes the issue

if self._protocol == "s3":
    glob_func = s3fs.core.S3FileSystem._glob
else:
    glob_func = self._fs.glob

super().__init__(
    glob_function=glob_func,
    ...

@atetich3211
Copy link

atetich3211 commented May 30, 2024

Hey,
this is not an S3-only problem, I have the same problem if we have other systems than local e.g. abfs if we use ParallelRunner.
I took a generic approach the following solution worked for me on any fsspec.

class NoFsspecProblemCSVDataset(CSVDataset):
    def __init__(self, ....) -> None:
        super().__init__(...)
    @property
    def _glob_function(self):
        return self._fs.glob

    @_glob_function.setter
    def _glob_function(self, value):
        pass

This solution prevents the problem where glob_function is non-serializable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Issue: Bug Report 🐞 Bug that needs to be fixed
Projects
Status: To Do
Development

No branches or pull requests

6 participants