Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allowing public catalog access and efficient file caching #26

Closed
zaneselvans opened this issue Apr 14, 2022 · 11 comments · Fixed by fsspec/filesystem_spec#1143
Closed

Comments

@zaneselvans
Copy link
Contributor

I'm trying to provide convenient public access to data stored in Parquet files in Google cloud storage through an Intake catalog (see this repo) but have run into a few snags. I suspect that some of them may be easy to fix, but I don't have enough context on how the system works and trial-and-error seems slow. I haven't found many public examples of how catalogs are typically set up to learn from.

One big file, or many smaller files?

  • Should we use one big parquet file with many row-groups, or lots of small parquet files each with a single row-group? Right now the row-groups each contain one state-year of data (e.g. Colorado 2015), since users will often only be interested in data from a particular region or time period.
  • I thought that using lots of smaller files along with local file caching would allow users to automatically cache only the parts of the data they need rather than everything, but then all of the files do get accessed (and thus cached) when filtering based on state and year, since the metadata indicating which files contain the requested states / years is in the files.
  • Is there an easy way to avoid this, and still treat all of the files as a single table? Can the metadata be stored outside the files using PyArrow?
  • It also seems like querying a bunch of smaller files remotely slows things down, in a way that doesn't happen locally (where one big, or many small files seem to perform equally well)

Conditionally disable caching

  • Is there an easy way to disable file caching when the data is already local? We often work with local data in development and caching just slows everything down and wastes disk space in this case, but it would be nice to use the same method of access (the intake catalog) in different contexts, with caching enabled/disabled depending on what kind of path is being used as the base location for the catalogs.
  • Is there some way that a user can explicitly turn of caching when accessing a data source in the catalog? e.g. if we're working on a JupyterHub in the same region as the data is stored and it has a very fast net connection but not much local disk, local caching may be more trouble than it's worth.

Allowing anonymous public access

  • Accessing the parquet data via https://storage.googleapis.com seems much less performant than using the gs:// protocol.
  • Trying to filter the Parquet data over https:// doesn't seem to work at all -- it downloads everything no matter what.
  • Wildcards also don't work with https:// URLs (I get a 403 forbidden error)) and neither does providing the URL of the "folder" that all the parquet files is in.
  • The only option that seems to work at all is providing a complete path to a single monolithic parquet file, but then with file caching the entire thing gets downloaded every time. And under some circumstances it seems like it's trying to read the entire Parquet file into memory which doesn't work since it's like 50-100GB uncompressed.
  • Is it possible to provide public access using gs://? It seems like we just get access denied even when all the objects are public.
  • Is there some other way to enumerate all of the different valid paths using user parameters and templating in the catalog paths, and still have them be treated as a single coherent table / dataset?

Current catalog

description: A catalog of open energy system data for use by climate advocates,
  policymakers, journalists, researchers, and other members of civil society.

plugins:
  source:
    - module: intake_parquet

metadata:
  creator:
    title: "Catalyst Cooperative"
    email: "pudl@catalyst.coop"
    path: "https://catalyst.coop"

sources:

  hourly_emissions_epacems:
    driver: parquet
    description: Hourly pollution emissions and plant operational data reported via
      Continuous Emissions Monitoring Systems (CEMS) as required by 40 CFR Part 75.
      Includes CO2, NOx, and SO2, as well as the heat content of fuel consumed and
      gross power output. Hourly values reported by US EIA ORISPL code and emissions
      unit (smokestack) ID.
    metadata:
      title: Continuous Emissions Monitoring System (CEMS) Hourly Data
      type: application/parquet
      provider: US Environmental Protection Agency Air Markets Program
      path: "https://ampd.epa.gov/ampd"
      license:
        name: "CC-BY-4.0"
        title: "Creative Commons Attribution 4.0"
        path: "https://creativecommons.org/licenses/by/4.0"
    args: # These arguments are for dask.dataframe.read_parquet()
      engine: 'pyarrow'
      urlpath: 'simplecache::{{ env(PUDL_INTAKE_PATH) }}/hourly_emissions_epacems.parquet'
      storage_options:
        simplecache:
          cache_storage: '{{ env(PUDL_INTAKE_CACHE) }}'

  hourly_emissions_epacems_partitioned:
    driver: parquet
    description: Hourly pollution emissions and plant operational data reported via
      Continuous Emissions Monitoring Systems (CEMS) as required by 40 CFR Part 75.
      Includes CO2, NOx, and SO2, as well as the heat content of fuel consumed and
      gross power output. Hourly values reported by US EIA ORISPL code and emissions
      unit (smokestack) ID.
    metadata:
      title: Continuous Emissions Monitoring System (CEMS) Hourly Data
      type: application/parquet
      provider: US Environmental Protection Agency Air Markets Program
      path: "https://ampd.epa.gov/ampd"
      license:
        name: "CC-BY-4.0"
        title: "Creative Commons Attribution 4.0"
        path: "https://creativecommons.org/licenses/by/4.0"
    args: # These arguments are for dask.dataframe.read_parquet()
      engine: 'pyarrow'
      urlpath: 'simplecache::{{ env(PUDL_INTAKE_PATH) }}/hourly_emissions_epacems/*.parquet'
      storage_options:
        simplecache:
          cache_storage: '{{ env(PUDL_INTAKE_CACHE) }}'
@martindurant
Copy link
Member

There's quite a lot here, so I'll try to answer a piece at a time

Much of the opinions depend on my assumptions of how you think people will actually access the data. Either of us may be mistaken! I am assuming you intend to use dask, since it is mentioned in the catalogue.

One big file, or many smaller files?

The unit of work for parquet is the row-group, so it doesn't much matter whether whether you have one row group or multiple per file - but that's for local files. As you say, checking out the metadata of the file, and thereby caching the whole thing, defeats the purpose. There are a few things you can do:

  • partition the files by directory, such that the path name contains the information most likely to be filtered on; in this case, there is no need to read all of the metadata, but paths are passed to the dask tasks instead
  • you can create a global _metadata file out of the metadata pieces of each file. This is not so common these days, because this file itself can grown unmanageably big, but seems like it might work for you. Nromally you would make it while writing the data, but you can create it after the fact too. I don't know how you can achieve it with pyarrow, but you could use fastparquet.writer.merge. Obviously you would need to perform (complex) sync operations if your dataset is growing.
  • "simplecache" indeed gets whole files, but "blockcache" can store pieces. It is more complex and less well supported, and relies on a posix blocks storage locally. Perhaps worth a try.
  • fsspec.parquet prospectively reads exactly the parts of the target file requested (look out for a blog on this in the coming week). I notice we don't have a cacheing story around this, but that's something we can definitely do. You activate it with open_file_options={“precache_options”: {“method”: “parquet”}} to read_parquet.

Conditionally disable caching

There isn't an obvious way to do this. I think I would do

      urlpath: '{{cache_opt}}{{ env(PUDL_INTAKE_PATH) }}/hourly_emissions_epacems/*.parquet'
...
    parameters:
      cache_opt:
        description: "Whether to apply caching; select empty string to disable"
        type: str
        default: "simplecache::"
        allowed: ["simplecache::", "blockcache::", ""]

Allowing anonymous access

So long as the user has gcsfs installed, you can access the data anonymously via "gs://"/"gcs://" as long as you include token="anon" in the storage options. Make sure that you grant both public read access to the files in the bucket and LIST access to the bucket itself. If you use a _metadata file, you don't need LIST, since all the file paths are contained in that file.

@zaneselvans
Copy link
Contributor Author

Anonymous Access

I added "token": "anon" to the storage_options dict, and created a new bucket with uniform permissions across all objects and gave allUsers the Storage Object Viewer role (which includes object listing as well as reading), and now public / unauthenticated gs:// access seems to be working!

Caching & Consolidating Metadata

The data only gets updated infrequently, once a month at most, and I think we'd probably just totally replace it (or create a new persistent version) so the _metadata file wouldn't need to have a complicated sync. Unfortunately I got an error when trying to generate the consolidated external metadata:

File ~/mambaforge/envs/pudl-dev/lib/python3.10/site-packages/fastparquet/writer.py:1330, in consolidate_categories(fmd)
   1329 def consolidate_categories(fmd):
-> 1330     key_value = [k for k in fmd.key_value_metadata
   1331                  if k.key == b'pandas'][0]
   1332     meta = json.loads(key_value.value)
   1333     cats = [c for c in meta['columns']
   1334             if 'num_categories' in (c['metadata'] or [])]

IndexError: list index out of range

Which looks like it's maybe related to the lack of pandas metadata in the parquet files, specifying what data types the columns should be use when they're read by pandas, even though the data is coming from a dataframe:

pqwriter.write_table(
    pa.Table.from_pandas(df, schema=schema, preserve_index=False)
)

I was trying to stick with simplecache:: since it sounded more straightforward and widely supported, and most of the individual files are manageable (only a few states are > 100MB per year).

The cache_opt parameter worked! Good enough for now. I gather that the catalogs don't support flow control within the Jinja templates? So we couldn't make cache a boolean and then fill in either simplecache:: or the empty string based on its value?

Single vs. Partitioned Files

The partitioned version of the data that I've created (with one row-group per file) has filenames of the form

epacems-YYYY-SS.parquet

where YYYY is the four digit year, and SS is the 2 letter US state abbreviation.

I originally thought that I could use years and states parameters to limit which files get scanned, and to ensure only valid year and state values are used. Is that a kind of things that parameters get used for? The other catalogs I've seen that are using parameters to construct filenames have a different table in each parquet file, so the parameters are selecting distinct tables rather than a subset of a table. In that case the source corresponds to a whole collection of tables. E.g. this CarbonPlan intake catalog

When I try and access the partitioned dataset remotely it feels like everything takes waaay longer -- even when caching is disabled. Is that expected? There are 1274 individual parquet files. Is that excessive? Selecting one state-year of data took 30 seconds to complete on the monolithic file but 7 minutes on the partitioned files.

Even when the data is cached locally there's a huge difference. On the cached monolithic file the same query takes about 2 seconds, but with the cached partitioned files it took almost 4 minutes. That seems like something must not be right.

import intake
pudl_cat = intake.cat.pudl_cat
filters = [[('year', '=', 2020), ('state', '=', 'CO')]]

# Note: both datasets are already cached locally
# No significant network traffic happened while these were running

epacems_df = (
    pudl_cat.hourly_emissions_epacems_partitioned(filters=filters)
    .to_dask().compute()
)
# CPU times: user 38.2 s, sys: 3.12 s, total: 41.3 s
# Wall time: 3min 46s

epacems_df = (
    pudl_cat.hourly_emissions_epacems(filters=filters)
    .to_dask().compute()
)
# CPU times: user 613 ms, sys: 119 ms, total: 732 ms
# Wall time: 1.48 s

Similarly discover() is hella slow on the (cached) partitioned data:

pudl_cat.hourly_emissions_epacems_partitioned.discover()
# CPU times: user 4.69 s, sys: 383 ms, total: 5.07 s
# Wall time: 1min 59s

pudl_cat.hourly_emissions_epacems.discover()
# CPU times: user 269 ms, sys: 7.04 ms, total: 276 ms
# Wall time: 769 ms

Unless we can make the partitioned files perform better it seems like the one big file is the best way to go for now, and everyone will just have to wait 10 minutes for it to cache locally the first time they access it.

description: A catalog of open energy system data for use by climate advocates,
  policymakers, journalists, researchers, and other members of civil society.

plugins:
  source:
    - module: intake_parquet
#   - module: intake_sqlite

metadata:
  parameters:
    cache_method:
      description: "Whether to cache data locally; empty string to disable caching."
      type: str
      default: "simplecache::"
      allowed: ["simplecache::", ""]
  creator:
    title: "Catalyst Cooperative"
    email: "pudl@catalyst.coop"
    path: "https://catalyst.coop"

sources:

  hourly_emissions_epacems:
    driver: parquet
    description: Hourly pollution emissions and plant operational data reported via
      Continuous Emissions Monitoring Systems (CEMS) as required by 40 CFR Part 75.
      Includes CO2, NOx, and SO2, as well as the heat content of fuel consumed and
      gross power output. Hourly values reported by US EIA ORISPL code and emissions
      unit (smokestack) ID.
    metadata:
      title: Continuous Emissions Monitoring System (CEMS) Hourly Data
      type: application/parquet
      provider: US Environmental Protection Agency Air Markets Program
      path: "https://ampd.epa.gov/ampd"
      license:
        name: "CC-BY-4.0"
        title: "Creative Commons Attribution 4.0"
        path: "https://creativecommons.org/licenses/by/4.0"
    args: # These arguments are for dask.dataframe.read_parquet()
      engine: 'pyarrow'
      urlpath: '{{ cache_method }}{{ env(PUDL_INTAKE_PATH) }}/hourly_emissions_epacems.parquet'
      storage_options:
        token: 'anon'  # Explicitly use anonymous access.
        simplecache:
          cache_storage: '{{ env(PUDL_INTAKE_CACHE) }}'

zaneselvans added a commit to catalyst-cooperative/pudl-catalog that referenced this issue Apr 19, 2022
With some pointers from @martindurant in
[this issue](intake/intake-parquet#26) I got
anonymous public access working, and caching can now be turned off when
appropriate.

Accessing the partitioned data is still very slow in a variety of
contexts for reasons I don't understand. I also hit a snag attempting to
create a consolidated external `_metadata` file to hopefully speed up
access to the partitioned data so... not sure what to do there.

The current Tox/pytest setup expects to find data locally, which won't
work right now on GitHub. Need to set the tests up better for real world
use, and less for exploring different catalog configurations.

Closes #5, #6
@martindurant
Copy link
Member

Note that

pqwriter.write_table(
    pa.Table.from_pandas(df, schema=schema, preserve_index=False)
)

does not do the same thing as pd.to_parquet(..., engine="pyarrow") which is why you lost the metadata. If you are writing the data directly, you should be able to to tell arrow to generate the _metadata file directly, so long as it knows it's "appending" to an existing dataset.

Obviously, it may be worth your while figuring out what is taking all that time; I recommend snakeviz. Likely, all of the files' metadata sections are being read. So you can consolidate them in a single file, as I have described, skip this step (disabling filtering on column values) or have dask do the scan in parallel. For remote data, simply listing the files can take some time, if there are many.

Another possible thing you might include as an option, is to follow the pattern in anadonca-package-data, where we parameterise the URL directly, so the user has to select the year/month of the specific file they want to load.

@zaneselvans
Copy link
Contributor Author

Another confusing hiccup. We've just turned on "requester pays" for the storage bucket containing these parquet files, and I added requester_pays: true to the storage_options in the catalog, and removed token: "anon":

  hourly_emissions_epacems:
    driver: parquet
    args:
      engine: 'pyarrow'
      urlpath: '{{ cache_method }}{{ env(PUDL_INTAKE_PATH) }}/hourly_emissions_epacems.parquet'
      storage_options:
        requester_pays: true
        simplecache:
          cache_storage: '{{ env(PUDL_INTAKE_CACHE) }}'

However, the requester_pays parameter doesn't seem to get passed in when using simplecache, while it works fine with cache_method=""

So this works fine:

epacems_df = (
    pudl_cat.hourly_emissions_epacems(
        filters=filters,
        cache_method="",
        index=False,
        split_row_groups=True,
    )
    .to_dask().compute()
)

But this:

epacems_df = (
    pudl_cat.hourly_emissions_epacems(
        filters=filters,
        # cache_method="",  # Uses the default value of "simplecache::"
        index=False,
        split_row_groups=True,
    )
    .to_dask().compute()
)

Results in:

ValueError: Bucket is requester pays. Set `requester_pays=True` when creating the GCSFileSystem.

Does that make sense to you? Why would they behave differently?

@martindurant
Copy link
Member

Yes: because the URL has multiple components, you need to sspecify which of them is to get the extra argument:

  hourly_emissions_epacems:
    driver: parquet
    args:
      engine: 'pyarrow'
      urlpath: '{{ cache_method }}{{ env(PUDL_INTAKE_PATH) }}/hourly_emissions_epacems.parquet'
      storage_options:
        s3:
          requester_pays: true
        simplecache:
          cache_storage: '{{ env(PUDL_INTAKE_CACHE) }}'

@martindurant
Copy link
Member

(hm, I'll have to think about whether this still works when caching is off - perhaps just test?)

@zaneselvans
Copy link
Contributor Author

Hmm, yes this flips the problem. Now it works when simplecache is enabled and fails when caching is turned off.

If I put requester_pays: true at both the top level of storage_options and under gs it seems to work in both cases.

@martindurant
Copy link
Member

If I put requester_pays: true at both the top level of storage_options and under gs it seems to work in both cases.

hah, ok! This could probably use a little design on our end, but good that it works.

@bendnorman
Copy link

Hi @martindurant! I'm working with @zaneselvans on the PUDL Intake Catalog. I've moved our data to an s3 bucket and I'm running into issues with the conditional caching work around discussed above. This is what our catalog.yml file looks like:

sources:
  hourly_emissions_epacems:
    description:
      Hourly pollution emissions and plant operational data reported via
      Continuous Emissions Monitoring Systems (CEMS) as required by 40 CFR Part 75.
      Includes CO2, NOx, and SO2, as well as the heat content of fuel consumed and
      gross power output. Hourly values reported by US EIA ORISPL code and emissions
      unit (smokestack) ID.
    driver: parquet
    parameters:
      cache_method:
        description: "Whether to cache data locally; empty string to disable caching."
        type: str
        default: "simplecache::"
        allowed: ["simplecache::", ""]
    metadata:
      title: Continuous Emissions Monitoring System (CEMS) Hourly Data
      type: application/parquet
      provider: US Environmental Protection Agency Air Markets Program
      path: "https://ampd.epa.gov/ampd"
      license:
        name: "CC-BY-4.0"
        title: "Creative Commons Attribution 4.0"
        path: "https://creativecommons.org/licenses/by/4.0"
    args: # These arguments are for dask.dataframe.read_parquet()
      engine: "pyarrow"
      split_row_groups: true
      index: false
      urlpath: "{{ cache_method }}{{ env(PUDL_INTAKE_PATH) }}/hourly_emissions_epacems.parquet"
      storage_options:
        s3:
          anon: true
        simplecache:
          cache_storage: "{{ env(PUDL_INTAKE_CACHE) }}"

Using simplecache:: as the cache_methodnis working fine but when cache_method="" I get this error:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/bendnorman/mambaforge/envs/pudl-catalog/lib/python3.11/site-packages/intake_parquet/source.py", line 99, in to_dask
    self._load_metadata()
  File "/Users/bendnorman/mambaforge/envs/pudl-catalog/lib/python3.11/site-packages/intake/source/base.py", line 285, in _load_metadata
    self._schema = self._get_schema()
                   ^^^^^^^^^^^^^^^^^^
  File "/Users/bendnorman/mambaforge/envs/pudl-catalog/lib/python3.11/site-packages/intake_parquet/source.py", line 60, in _get_schema
    self._df = self._to_dask()
               ^^^^^^^^^^^^^^^
  File "/Users/bendnorman/mambaforge/envs/pudl-catalog/lib/python3.11/site-packages/intake_parquet/source.py", line 108, in _to_dask
    self._df = dd.read_parquet(urlpath,
               ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bendnorman/mambaforge/envs/pudl-catalog/lib/python3.11/site-packages/dask/backends.py", line 127, in wrapper
    raise type(e)(
TypeError: An error occurred while calling the read_parquet method registered to the pandas backend.
Original Message: AioSession.__init__() got an unexpected keyword argument 's3'

I've tried structuring the storage options this way but then I get a keyword argument for simplecache:

storage_options:
        anon: true
        simplecache:
          cache_storage: "{{ env(PUDL_INTAKE_CACHE) }}"
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/bendnorman/mambaforge/envs/pudl-catalog/lib/python3.11/site-packages/intake_parquet/source.py", line 99, in to_dask
    self._load_metadata()
  File "/Users/bendnorman/mambaforge/envs/pudl-catalog/lib/python3.11/site-packages/intake/source/base.py", line 285, in _load_metadata
    self._schema = self._get_schema()
                   ^^^^^^^^^^^^^^^^^^
  File "/Users/bendnorman/mambaforge/envs/pudl-catalog/lib/python3.11/site-packages/intake_parquet/source.py", line 60, in _get_schema
    self._df = self._to_dask()
               ^^^^^^^^^^^^^^^
  File "/Users/bendnorman/mambaforge/envs/pudl-catalog/lib/python3.11/site-packages/intake_parquet/source.py", line 108, in _to_dask
    self._df = dd.read_parquet(urlpath,
               ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/bendnorman/mambaforge/envs/pudl-catalog/lib/python3.11/site-packages/dask/backends.py", line 127, in wrapper
    raise type(e)(
TypeError: An error occurred while calling the read_parquet method registered to the pandas backend.
Original Message: AioSession.__init__() got an unexpected keyword argument 'simplecache'

Am I missing something or can I not specific simplecache storage options args when using s3? Thank you for the help!

bendnorman added a commit to catalyst-cooperative/pudl-catalog that referenced this issue Dec 19, 2022
I removed caching disabling because fsspec starting throwing
unexpected keyword argument errors when making requests to s3
with caching disabled. See intake/intake-parquet#26
for the full explanation.
@zaneselvans
Copy link
Contributor Author

It seems like there are two different ways of passing options in to different parts of the file system / fsspec / intake system. One is through the URL that's used as the target that's being read, and another is through options being passed to create the fsspec filesystem. Looking at the fsspec docs on caching these two calls are using similar options, but provided differently:

fs = fsspec.filesystem(
    "filecache",
    target_protocol='s3',
    target_options={'anon': True},
    cache_storage='/tmp/files/')
of = fsspec.open(
    "filecache::s3://bucket/key",
    s3={'anon': True},
    filecache={'cache_storage':'/tmp/files'}
)

Somehow the information in the Intake catalog YAML is being used to construct... one of these. I think the conditional caching was being done by constructing a URL that could start either with simplecache::gs:// or with just gs:// (when the empty string was used as the cache method).

But it's confusing because parameters for lots of different functions / classes are all mingled together in the YAML file and it's not clear which ones are going to end up where. Do they go to dask.dataframe.read_parquet()? Or to fsspec.filesystem()? Or to s3fs.S3FileSystem()? Can you pass some of them in conditional on the values of others?

@martindurant
Copy link
Member

Yes, you are quite right, fsspec.open is the form that accepts chained URLs; and even for this one, if the URL is not a chain, then the protocol={...} kwargs are ignored. This latter short-circuit could maybe be eliminated or worked around (on the assumption that no FS actually has an argument names the same as the protocol), but the current system does make instantiating FSs faster, I suppose. The relevant code is probably in fsspec.core._un_chain.

To answer the other question, fsspec.filesystem creates a single instance. Some instances are able to create child instances with the right set of arguments. However, if you want to create a single filesystem (as opposed to open files) from a URL, you want fsspec.core.url_to_fs. All the Intake drivers and lower level functions ought to be calling those, to allow compound URLs.

I would be happy to consider a PR which either

  • forces even simple URLs to go through the unchaining logic even when not compound, so that s3={...} works
  • implements a do-nothing cache class, so that you can swap "simplecache::" with "nochache::" and get identical behaviour.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants