Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Contributing a CSV module [RE: dask dataframe read_csv] #66

Open
lmmx opened this issue Aug 17, 2021 · 15 comments
Open

Contributing a CSV module [RE: dask dataframe read_csv] #66

lmmx opened this issue Aug 17, 2021 · 15 comments

Comments

@lmmx
Copy link
Contributor

lmmx commented Aug 17, 2021

Following our discussion in dask #8045 I’m seeking to contributing a CSV module to fsspec-reference-maker, and wanted to kick off a feature proposal issue here to clarify some aspects.

Hope this is a decent start, but WIP and comments appreciated (I need to review the role of fsspec in dask more closely).

Firstly, as described in #7, this library is geared towards evenly spaced chunks. The README of this repo gives an example of this with a single variable, or “dimension”, i (the number of chunks), which is used in the example spec along with a fixed length (1000) to produce range(0, i * 1000, 1000).

This of course matches how dask reads CSVs as range(0, size, blocksize), but the whole point of my intervention here is to make them no longer evenly spaced deterministic chunks.

The word ‘deterministic’ here seems to mean both:

  • “based on the byte positions rather than byte content at those positions”, as well as
  • “uniquely identifiable, not changing upon recalculation”.

The alternative I am proposing for CSVs to dask only fits the 2nd of these criteria (as it will involve checking bytes, which may result in changes to the offsets, thus chunks may not stay evenly spaced).

Also in #7 there is mention of “callbacks”, and I’m uncertain whether this is the route I should take to achieving this adjustment to the offsets (or something entirely different I should ignore).

I am inclined to copy the conclusions of that issue, that perhaps it is easiest to begin by aiming to produce the explicit ‘version 0’ spec rather than the templated ‘version 1’ until how this works is clearer to me.

As a less significant problem for me, the docs here refer to jinja2 rendered strings, but I don’t see that library as a requirement anywhere here, so I’m not sure how that works (perhaps it is a future feature, I’m noting this library is a recent/future-facing effort).

Here’s my first attempt at an idea of how this would look (filenames vaguely copying the format of the WIT dataset as “base_0_ordinal_count of base_1_cardinal_total”):

  • The values in the 2nd and 3rd parts of these values are supposed to indicate where a previous routine has calculated the offsets (as 10, 5, 20, 0, 50, 25), which are added to the evenly spaced offsets (1000+10, 2000+5 etc.) and subtracted from the lengths between consecutive offsets (1010-0 = 1010, 2005-1010=995, etc.)

  • I’m fairly sure that the 3rd item in the gen keys should indicate the length of the chunk that ends at that offset but please correct me if I’m wrong.

This then gives the spec of a ‘filesystem for partitions within a file’, addressable by filepath plus ‘virtual’ partition index:

{
  "key0": "data",
  "gen_key0": ["/path/to/csv_0_of_1.csv.gz/partition_0", 1010, 1010],
  "gen_key1": ["/path/to/csv_0_of_1.csv.gz/partition_1", 2005, 995],
  "gen_key2": ["/path/to/csv_0_of_1.csv.gz/partition_2", 3020, 1015],
  "gen_key3": ["/path/to/csv_0_of_1.csv.gz/partition_3", 4000, 980],
  "gen_key4": ["/path/to/csv_0_of_1.csv.gz/partition_4", 5050, 1050],
  "gen_key5": ["/path/to/csv_0_of_1.csv.gz/partition_5", 6025, 975]
}
  • I’m not sure what to put in the ‘key’ entries so have removed the ones from the example spec (please let me know if this is unadvisable, and if you have an idea of what should go there instead)
  • I presume the one that is currently bearing the bytes b”data” should be storing something important to identify the CSV, but I can’t determine what that is on my first attempt
    • My understanding is that this will be fed into the OpenFile object as the fs argument, so it should store things relevant to that. Perhaps path? I’m very unsure how this should look though, and suspect if I guess I’d only end up putting irrelevant info in that’ll already be passed in.
  • For simplicity I’m considering 2 files here, each with 3 offsets (i.e. 4 chunks: the offset starting at 0 is always going to be assumed to be valid: if it’s not then that’s a corrupt CSV, not the problem I’m seeking to solve here)

As for the matter of identifying the offset adjustments (10, 5, 20, 0, 50, 25) I expect the fastest way to do so is

  • initialise separator_skip_count = separator_skip_offset = 0 at each offset mark (1000, 2000, etc.)
  • try pandas.read_csv(nrows=1)
    • catch failure; increment separator_skip_count += 1 if it fails (repeat)
  • finally [upon success]
    • break out of the loop
    • use the tell minus the offset to give the ‘offset adjustment’ (assign separator_skip_offset)
      • left as 0 for no adjustment (if separator_skip_count == 0), or a positive integer

The separator_skip_count indicating the number of newlines that were skipped after the offset+length to find the ‘genuine’ row-delimiting offset seems redundant to store, but useful while writing/debugging this algorithm.

  • I say that, but I don’t know: perhaps it’d be inexpensive to recalculate the actual byte offsets from the number of newlines to skip after the offset, rather than store that offset? (Not clear to me yet)

Only the separator_skip_offset needs to be stored: summed with the offset, in the 2nd item of the values (1010, 2005, etc.)

I think at the point that the separator_skip_offset is calculated, the ‘version 1’ spec could be computed, to reduce to the above ‘version 0’ spec, as something like:

{
    "version": 1,
    "templates": {
        "u": "/path/to/csv_0_of_1.csv.gz",
        "f": "partition_{{c}}"
    },
    "gen": [
        {
            "key": "gen_key{{i}}",
            "url": "{{u}}/{{f(c=i)}}",
            "offset": "{{(i + 1) * 1000}}",
            "length": "1000",
            "dimensions": 
              {
                "i": {"stop":  5}
              }
        }   
    ],
    "refs": {
      "key0": "data",
    }
}
  • I may be misunderstanding something by putting the filename within the template rather than as a variable
  • should gen_key0 be partition0 (etc) ? or should this stay as gen_key0 to make clear that it’s generated?
  • I can’t figure out where the array specifying the separator_skip_offset should go (if I put it in the gen.dimensions key it’ll become a Cartesian product, whereas I want to ‘zip’ it against the i range…)
  • Should I change the gen.url key from “url” to something else, since it’s expected to refer to a file path not a web resource?

Without understanding how to incorporate the offset adjustments into this template, I don’t think I can write the ‘version 1’ spec at this time, but I hope we might be able to figure it out here.

@martindurant
Copy link
Member

martindurant commented Aug 18, 2021

Thank you for this long and thoughtful description of the problem and your attempt to fit it into fsspec-reference-maker. I see that our documentation, such as it is, is clearly substandard and has caused you some misconceptions. In truth, the case is much simpler and indeed Version 0 if fully acceptable in Version 1 too - no need for any templates or generated keys at all.

Let me give a very simple example with a file from https://github.com/datapackage-examples/sample-csv .

Here are two references in version 1 format:

refs = {
  "version": 1,
  "refs": {
    "file1": ["https://people.sc.fsu.edu/~jburkardt/data/csv/addresses.csv", 0,  48],
    "file2": ["https://people.sc.fsu.edu/~jburkardt/data/csv/addresses.csv", 224, 31]
  }
}

(this could be in a JSON file, but works with python dicts too)

Now we can use fsspec to play with this:

fs = fsspec.filesystem("reference", fo=refs, target_protocol="http")
fs.ls("", False)
["file1", "file2]

fs.cat("file1")
b'John,Doe,120 jefferson st.,Riverside, NJ, 08075\n'

fs.cat("file2")
b',Blankman,,SomeTown, SD, 00298\n'

And we could get Dask to read a set of virtual CSVs like

dd.read_csv("reference://*", storage_options={"fo": refs,  "target_protocol": "http"})

(plus extra read_csv kwargs, since this dataset doesn't have any column headers, as it happens). Normally, the references would be stored in a JSON file, which all the workers must be able to see. Note that the glob "*" will be expanded to the two virtual files in reference. So the only task you need to accomplish, is to find a set of offset/size pairs that can load the data.

(aside: "templates" only exist to reduce the redundancy of including the same URL in the references multiple times)

@martindurant
Copy link
Member

Would you like to try to encode your method described in dask/dask as a function/class for this repo? It would take a URL/storage-options, maybe preferred blocksize, and return valid byte offsets for line-ends that are actual row terminators (i.e,. not the parsed data at all). In this repo it would run in serial.

@lmmx
Copy link
Contributor Author

lmmx commented Aug 20, 2021

Already on it! 👍

I'm writing a class fsspec_reference_maker.csv.SingleCsvToPartitions, following the hdf.py module's example.

The class has the following method signatures:

def __init__(
    self, csv: BinaryIO, url: str, blocksize: int | None, lineseparator: str, spec=1, sample_rows: int = 10,
):

    def translate(self):
        """Translate content of one CSV file into partition offsets format."""
        ...
        return {"version": 1, "templates": {"u": self._uri}, "refs": self.store}

    def _visitoffsets(self, callback: Callable):
        """Visit each of the offsets"""

    def _translator(self, offset: int):
        """Produce offset metadata for all partitions in the CSV file."""

I suppose that passing blocksize of None would be the way to override the calculation of evenly spaced offsets

@lmmx
Copy link
Contributor Author

lmmx commented Aug 20, 2021

I don't think it's right to initialise the SingleCsvToPartitions class with the blocksize: IMO it should be initialised with the list of offsets, and then 'edit' those on the dask.Delayed objects, rather than putting CSV-specific routines into the general purpose bytes.core.read_bytes function.

  • From what I can see, you can't edit a dask.Delayed object, so I'd just recreate them
    • (Actually, after trying I think I can, unclear if will work properly: will test as it may avoid need for excessive code reuse)

In this scenario, the out list of lists of dask.Delayed would be edited to have the correct offsets (within read_pandas, after returning from read_bytes), rather than being created with the correct offsets.

Below is the undesirable alternative, which would require using a popped kwarg at the start of read_bytes (putting CSV format-specific code in the general-purpose bytes.core module)

Click to show the alternative idea I rejected


I'm not sure if the dask read_bytes interface should be changed (if a new argument is added to it then it may break existing code that uses it), so a way to keep the API intact would be to pop a kwarg at the very start of read_bytes, defaulting to 0 if not present.

    sample_tail_rows = kwargs.pop("sample_tail_rows", 0)

passed only by read_pandas (here) leaving the remaining kwargs valid to be passed as storage_options, as currently.

This variable is needed to change the assignment of the offsets in bytes.core into a conditional:

if sample_tail_rows > 0:
    off, length = zip([
        (s["offset"], s["size"])
        for s in SingleCsvToPartition(...).store
    ])
else:
    off = list(range(0, size, blocksize)) 
    length = [blocksize] * len(off) 

Would this be acceptable? I'm conscious that this introduces format-specific behaviour into the cross-format bytes.core module.

  • read_bytes will always assign offsets based on blocksize so you can't assign them any earlier
  • I'd say it's messy to put format specific routines into a general purpose module (unless you strongly believe otherwise), so you can't change their assignment during
  • the only remaining point to assign offsets therefore must be after they've been computed [and tokenised] within read_bytes

My conclusion is that this is undesirable and SingleCsvToPartitions should instead be used to modify the dask.Delayed objects within the dd.io.csv module, would appreciate your view

@martindurant
Copy link
Member

Some time has passed, @lmmx !
Do you think you can contribute the code you have, which takes one file and rough offsets as inputs, and returns exact offsets of valid, non-quoted, newline characters?

@lmmx
Copy link
Contributor Author

lmmx commented Oct 1, 2021

Hey Martin, that it has! New puppy arrived, sorry for the pause 😅 It's still up on my whiteboard, I'll bump it to the top of my weekend to-do list 👍

@martindurant
Copy link
Member

Will be very glad to see something woking here! If you have an example that Dask currently struggles with (I think you did), all the better.

@lmmx
Copy link
Contributor Author

lmmx commented Oct 4, 2021

Me too! Yes it was the WIT dataset, and can be observed using the dataset's sample file as described in my initial comment at https://storage.googleapis.com/gresearch/wit/wit_v1.train.all-1percent_sample.tsv.gz.

Weekend came & went but have cleared everything now + getting to grips with this again, will be in touch 👍

I'm using the repo csv-validation-sandbox to work out the minimum viable example. So far I went through my assumptions and found that some of them weren't what I first thought, and I can use the pandas parser (phew) which simplifies things, I just have to enforce the python engine rather than the C one. The code in that repo is structured in the form of tests to demonstrate said assumptions (I felt my long issue comments were too hard to review and worked some of them out in that repo's issues instead).

@martindurant
Copy link
Member

I would not attempt to solve the compression and parsing issues in one go, it would be better to use an uncompressed target at first, I think.

@lmmx
Copy link
Contributor Author

lmmx commented Oct 4, 2021

Ah, yes, had not been on my mind but one of the scripts I'm working with in testing [in an environment with a locally editable installation of both dask and fsspec with breakpoints etc at points of interest] is working with

wit_v1.train.all-1percent_sample.tsv

as you suggest

@martindurant
Copy link
Member

Note that you probably want to take dask out of the equation too - it might be where you want the files to be processes eventually, but I think you should be able to find valid offsets without it, simplifying the process (at the expense of no parallelism).

@lmmx
Copy link
Contributor Author

lmmx commented Oct 6, 2021

Yeah, sorry, to clarify I was working with a local editable dask install to look at certain parameters set within the program which I'm trying to excise into this module (e.g. default blocksize is 33554432, from 2 ** 25 set as a magic number of sorts)

@martindurant
Copy link
Member

I am still convinced that you shouldn't need dask at all to find the offsets, and it would make life much simpler to separate out a) generating the offsets (here) and b) using them (with dask).

@martindurant
Copy link
Member

This has become more interesting with the appearance of indexed_gzip, as a kerchunked read_csv could be the only way to read massive .csv.gz in parallel with dask, as it allows quasy random access into gzip streams.

@lmmx
Copy link
Contributor Author

lmmx commented Jul 21, 2023

Popping back in to note that I just yesterday came across the chunked CSV reader capability in Parquet (as used in PyArrow) and after checking it indeed seems to be capable of handling intraline newlines: so I'm curious how it's done

The code for CSVStreamingReader features this param

    newlines_in_values : bool, optional (default False)
        Whether newline characters are allowed in CSV values.
        Setting this to True reduces the performance of multi-threaded
        CSV reading.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants