Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DataPipe] Add as_tuple argument for CSVParserIterDataPipe #646

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions test/test_local_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,15 @@ def make_path(fname):
expected_res = [("1.csv", ["key", "item"]), ("1.csv", ["a", "1"]), ("1.csv", ["b", "2"]), ("empty2.csv", [])]
self.assertEqual(expected_res, list(csv_parser_dp))

# Functional Test: yield one row at time from each file as tuple instead of list, skipping over empty content
csv_parser_dp = datapipe3.parse_csv(as_tuple=True)
expected_res = [("key", "item"), ("a", "1"), ("b", "2"), ()]
Copy link
Contributor

@ejguan ejguan Jul 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emmm. It weird that CI doesn't catch the Error. Could you please change the variable name from expected_res to something else?
It breaks another test at line 171

Edit: Actually, it's raised by CI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I didn't notice that expected_res has been reused in another test case. Fixed!

self.assertEqual(expected_res, list(csv_parser_dp))

# Reset Test:
csv_parser_dp = CSVParser(datapipe3, return_path=True)
n_elements_before_reset = 2
expected_res = [("1.csv", ["key", "item"]), ("1.csv", ["a", "1"]), ("1.csv", ["b", "2"]), ("empty2.csv", [])]
res_before_reset, res_after_reset = reset_after_n_next_calls(csv_parser_dp, n_elements_before_reset)
self.assertEqual(expected_res[:n_elements_before_reset], res_before_reset)
self.assertEqual(expected_res, res_after_reset)
Expand Down
18 changes: 18 additions & 0 deletions torchdata/datapipes/iter/util/plain_text_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(
encoding="utf-8",
errors: str = "ignore",
return_path: bool = False,
as_tuple: bool = False,
) -> None:
if skip_lines < 0:
raise ValueError("'skip_lines' is required to be a positive integer.")
Expand All @@ -34,6 +35,7 @@ def __init__(
self._encoding = encoding
self._errors = errors
self._return_path = return_path
self._as_tuple = as_tuple

def skip_lines(self, file: IO) -> Union[Iterator[bytes], Iterator[str]]:
with contextlib.suppress(StopIteration):
Expand Down Expand Up @@ -68,6 +70,16 @@ def return_path(self, stream: Iterator[D], *, path: str) -> Iterator[Union[D, Tu
for data in stream:
yield path, data

def as_tuple(self, stream: Iterator[D]) -> Iterator[Union[D, Tuple]]:
if not self._as_tuple:
yield from stream
return
for data in stream:
if isinstance(data, list):
yield tuple(data)
else:
yield data
Comment on lines +78 to +81
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of performance, we might want to directly yield tuple(data) without checking isinstance every single time when as_tuple is specified as True. In that case, Users should take responsibility to handle the case.
Could you please run benchmarking using these two implementation with a long csv file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll do this benchmarking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ejguan, I run the following benchmark:

import os
import timeit
import tempfile
from torchdata.datapipes.iter import IterableWrapper, FileOpener
from torchdata.datapipes.iter.util.plain_text_reader import PlainTextReaderHelper


def impl_a(self, stream):
    if not self._as_tuple:
        yield from stream
        return
    for data in stream:
        if isinstance(data, list):
            yield tuple(data)
        else:
            yield data


def impl_b(self, stream):
    if not self._as_tuple:
        yield from stream
        return
    for data in stream:
        yield tuple(data)


fake_line = '1,2,3,4,5,6,7,8,9'
line_num = 100 * 10000

with tempfile.TemporaryDirectory() as temp_dir:
    
    temp_fpath = os.path.join(temp_dir, 'temp.csv')
    with open(temp_fpath, 'w') as f:
        f.write('\n'.join([fake_line, ] * line_num))

    datapipe1 = IterableWrapper([temp_fpath])
    datapipe2 = FileOpener(datapipe1, mode="b")
    csv_parser_dp = datapipe2.parse_csv(as_tuple=True)

    PlainTextReaderHelper.as_tuple = impl_a
    print("impl a: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10))

    PlainTextReaderHelper.as_tuple = impl_b
    print("impl b: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10))

Got:

impl a:  13.500808166
impl b:  13.221415374999998

Any comments about this? ^_^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's run a dummy test for the case that not isinstance(data, list). To test it, you can expose as_tuple to parse_csv_as_dict and turn it on. Thank you

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ejguan , I added a benchmark for parse_csv_as_dict:

import csv
import os
import timeit
import tempfile
from torchdata.datapipes.iter import IterableWrapper, FileOpener
from torchdata.datapipes.iter.util.plain_text_reader import PlainTextReaderHelper
from torchdata.datapipes.iter.util.plain_text_reader import _CSVBaseParserIterDataPipe


def impl_a(self, stream):
    if not self._as_tuple:
        yield from stream
        return
    for data in stream:
        if isinstance(data, list):
            yield tuple(data)
        else:
            yield data


def impl_b(self, stream):
    if not self._as_tuple:
        yield from stream
        return
    for data in stream:
        yield tuple(data)


fake_line = '1,2,3,4,5,6,7,8,9'
line_num = 100 * 10000

with tempfile.TemporaryDirectory() as temp_dir:
    
    temp_fpath = os.path.join(temp_dir, 'temp.csv')
    with open(temp_fpath, 'w') as f:
        f.write('\n'.join([fake_line, ] * line_num))

    datapipe1 = IterableWrapper([temp_fpath])
    datapipe2 = FileOpener(datapipe1, mode="b")
    csv_parser_dp = datapipe2.parse_csv(as_tuple=True)

    PlainTextReaderHelper.as_tuple = impl_a
    print("impl a: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10))

    PlainTextReaderHelper.as_tuple = impl_b
    print("impl b: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10))

    PlainTextReaderHelper.as_tuple = impl_a
    csv_dict_parser_dp1 = _CSVBaseParserIterDataPipe(
        datapipe2, csv.DictReader, decode=True, as_tuple=True)
    csv_dict_parser_dp2 = _CSVBaseParserIterDataPipe(
        datapipe2, csv.DictReader, decode=True, as_tuple=False)
    
    print("impl a with dict as_tupe=True: ", timeit.timeit(stmt=lambda: list(csv_dict_parser_dp1), number=10))
    
    print("impl a with dict as_tupe=False: ", timeit.timeit(stmt=lambda: list(csv_dict_parser_dp2), number=10))

Got:

impl a:  13.746312875000001
impl b:  13.414058333999998
impl a with dict as_tupe=True:  25.277461583
impl a with dict as_tupe=False:  24.659475541

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about impl b with dict?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, there are the result of impl_b with dict parser:

impl a:  13.375169375
impl b:  13.102719625
impl a with dict as_tupe=True: 24.521925125000003
impl a with dict as_tupe=False: 23.991537542000003
impl b with dict as_tupe=True: 24.552794458999998
impl b with dict as_tupe=False: 24.026250667

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Then, let's leave it as it is.



@functional_datapipe("readlines")
class LineReaderIterDataPipe(IterDataPipe[Union[Str_Or_Bytes, Tuple[str, Str_Or_Bytes]]]):
Expand Down Expand Up @@ -136,6 +148,7 @@ def __init__(
encoding="utf-8",
errors: str = "ignore",
return_path: bool = True,
as_tuple: bool = False,
**fmtparams,
) -> None:
self.source_datapipe = source_datapipe
Expand All @@ -146,6 +159,7 @@ def __init__(
encoding=encoding,
errors=errors,
return_path=return_path,
as_tuple=as_tuple,
)
self.fmtparams = fmtparams

Expand All @@ -154,6 +168,7 @@ def __iter__(self) -> Iterator[Union[D, Tuple[str, D]]]:
stream = self._helper.skip_lines(file)
stream = self._helper.decode(stream)
stream = self._csv_reader(stream, **self.fmtparams)
stream = self._helper.as_tuple(stream) # type: ignore[assignment]
yield from self._helper.return_path(stream, path=path) # type: ignore[misc]


Expand All @@ -173,6 +188,7 @@ class CSVParserIterDataPipe(_CSVBaseParserIterDataPipe):
errors: the error handling scheme used while decoding
return_path: if ``True``, each line will return a tuple of path and contents, rather
than just the contents
as_tuple: if ``True``, each line will return a tuple instead of a list

Example:
>>> from torchdata.datapipes.iter import IterableWrapper, FileOpener
Expand All @@ -196,6 +212,7 @@ def __init__(
encoding: str = "utf-8",
errors: str = "ignore",
return_path: bool = False,
as_tuple: bool = False,
**fmtparams,
) -> None:
super().__init__(
Expand All @@ -206,6 +223,7 @@ def __init__(
encoding=encoding,
errors=errors,
return_path=return_path,
as_tuple=as_tuple,
**fmtparams,
)

Expand Down