Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DataPipe] Add as_tuple argument for CSVParserIterDataPipe #646

Closed

Conversation

ice-tong
Copy link
Contributor

@ice-tong ice-tong commented Jul 12, 2022

Motivation

see pytorch/torcharrow#421

Changes

  • Add as_tuple argument for CSVParserIterDataPipe
  • Add a functional test for as_tuple in tests/test_local_io.py

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Jul 12, 2022
Copy link
Contributor

@ejguan ejguan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, LGTM with one nit comment

torchdata/datapipes/iter/util/plain_text_reader.py Outdated Show resolved Hide resolved
@facebook-github-bot
Copy link
Contributor

@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@@ -68,6 +70,16 @@ def return_path(self, stream: Iterator[D], *, path: str) -> Iterator[Union[D, Tu
for data in stream:
yield path, data

def as_tuple(self, stream: Iterator[List]) -> Iterator[Union[List, Tuple]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realize we need to change the type hint here. The input stream might not be iterator[List]. We can use Iterator[D] -> Iterator[Union[D, Tuple]].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, done.

Comment on lines +78 to +81
if isinstance(data, list):
yield tuple(data)
else:
yield data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of performance, we might want to directly yield tuple(data) without checking isinstance every single time when as_tuple is specified as True. In that case, Users should take responsibility to handle the case.
Could you please run benchmarking using these two implementation with a long csv file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll do this benchmarking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ejguan, I run the following benchmark:

import os
import timeit
import tempfile
from torchdata.datapipes.iter import IterableWrapper, FileOpener
from torchdata.datapipes.iter.util.plain_text_reader import PlainTextReaderHelper


def impl_a(self, stream):
    if not self._as_tuple:
        yield from stream
        return
    for data in stream:
        if isinstance(data, list):
            yield tuple(data)
        else:
            yield data


def impl_b(self, stream):
    if not self._as_tuple:
        yield from stream
        return
    for data in stream:
        yield tuple(data)


fake_line = '1,2,3,4,5,6,7,8,9'
line_num = 100 * 10000

with tempfile.TemporaryDirectory() as temp_dir:
    
    temp_fpath = os.path.join(temp_dir, 'temp.csv')
    with open(temp_fpath, 'w') as f:
        f.write('\n'.join([fake_line, ] * line_num))

    datapipe1 = IterableWrapper([temp_fpath])
    datapipe2 = FileOpener(datapipe1, mode="b")
    csv_parser_dp = datapipe2.parse_csv(as_tuple=True)

    PlainTextReaderHelper.as_tuple = impl_a
    print("impl a: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10))

    PlainTextReaderHelper.as_tuple = impl_b
    print("impl b: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10))

Got:

impl a:  13.500808166
impl b:  13.221415374999998

Any comments about this? ^_^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's run a dummy test for the case that not isinstance(data, list). To test it, you can expose as_tuple to parse_csv_as_dict and turn it on. Thank you

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ejguan , I added a benchmark for parse_csv_as_dict:

import csv
import os
import timeit
import tempfile
from torchdata.datapipes.iter import IterableWrapper, FileOpener
from torchdata.datapipes.iter.util.plain_text_reader import PlainTextReaderHelper
from torchdata.datapipes.iter.util.plain_text_reader import _CSVBaseParserIterDataPipe


def impl_a(self, stream):
    if not self._as_tuple:
        yield from stream
        return
    for data in stream:
        if isinstance(data, list):
            yield tuple(data)
        else:
            yield data


def impl_b(self, stream):
    if not self._as_tuple:
        yield from stream
        return
    for data in stream:
        yield tuple(data)


fake_line = '1,2,3,4,5,6,7,8,9'
line_num = 100 * 10000

with tempfile.TemporaryDirectory() as temp_dir:
    
    temp_fpath = os.path.join(temp_dir, 'temp.csv')
    with open(temp_fpath, 'w') as f:
        f.write('\n'.join([fake_line, ] * line_num))

    datapipe1 = IterableWrapper([temp_fpath])
    datapipe2 = FileOpener(datapipe1, mode="b")
    csv_parser_dp = datapipe2.parse_csv(as_tuple=True)

    PlainTextReaderHelper.as_tuple = impl_a
    print("impl a: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10))

    PlainTextReaderHelper.as_tuple = impl_b
    print("impl b: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10))

    PlainTextReaderHelper.as_tuple = impl_a
    csv_dict_parser_dp1 = _CSVBaseParserIterDataPipe(
        datapipe2, csv.DictReader, decode=True, as_tuple=True)
    csv_dict_parser_dp2 = _CSVBaseParserIterDataPipe(
        datapipe2, csv.DictReader, decode=True, as_tuple=False)
    
    print("impl a with dict as_tupe=True: ", timeit.timeit(stmt=lambda: list(csv_dict_parser_dp1), number=10))
    
    print("impl a with dict as_tupe=False: ", timeit.timeit(stmt=lambda: list(csv_dict_parser_dp2), number=10))

Got:

impl a:  13.746312875000001
impl b:  13.414058333999998
impl a with dict as_tupe=True:  25.277461583
impl a with dict as_tupe=False:  24.659475541

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about impl b with dict?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, there are the result of impl_b with dict parser:

impl a:  13.375169375
impl b:  13.102719625
impl a with dict as_tupe=True: 24.521925125000003
impl a with dict as_tupe=False: 23.991537542000003
impl b with dict as_tupe=True: 24.552794458999998
impl b with dict as_tupe=False: 24.026250667

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Then, let's leave it as it is.

Copy link
Contributor

@ejguan ejguan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you

@facebook-github-bot
Copy link
Contributor

@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@@ -159,6 +159,11 @@ def make_path(fname):
expected_res = [("1.csv", ["key", "item"]), ("1.csv", ["a", "1"]), ("1.csv", ["b", "2"]), ("empty2.csv", [])]
self.assertEqual(expected_res, list(csv_parser_dp))

# Functional Test: yield one row at time from each file as tuple instead of list, skipping over empty content
csv_parser_dp = datapipe3.parse_csv(as_tuple=True)
expected_res = [("key", "item"), ("a", "1"), ("b", "2"), ()]
Copy link
Contributor

@ejguan ejguan Jul 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emmm. It weird that CI doesn't catch the Error. Could you please change the variable name from expected_res to something else?
It breaks another test at line 171

Edit: Actually, it's raised by CI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I didn't notice that expected_res has been reused in another test case. Fixed!

@@ -154,6 +168,7 @@ def __iter__(self) -> Iterator[Union[D, Tuple[str, D]]]:
stream = self._helper.skip_lines(file)
stream = self._helper.decode(stream)
stream = self._csv_reader(stream, **self.fmtparams)
stream = self._helper.as_tuple(stream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mypy complains about it at this line. Could you please try to fix it? Otherwise, you can add a comment at the end of this line to bypass the mypy check with # type: ignore[assignment]

Copy link
Contributor Author

@ice-tong ice-tong Jul 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! mypy doesn't suggest variable name reuse in one function body.

@facebook-github-bot
Copy link
Contributor

@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

1 similar comment
@facebook-github-bot
Copy link
Contributor

@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants