-
Notifications
You must be signed in to change notification settings - Fork 152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DataPipe] Add as_tuple argument for CSVParserIterDataPipe #646
Changes from 4 commits
e72528e
932deca
9aac787
8a36d6e
6ebebe0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ def __init__( | |
encoding="utf-8", | ||
errors: str = "ignore", | ||
return_path: bool = False, | ||
as_tuple: bool = False, | ||
) -> None: | ||
if skip_lines < 0: | ||
raise ValueError("'skip_lines' is required to be a positive integer.") | ||
|
@@ -34,6 +35,7 @@ def __init__( | |
self._encoding = encoding | ||
self._errors = errors | ||
self._return_path = return_path | ||
self._as_tuple = as_tuple | ||
|
||
def skip_lines(self, file: IO) -> Union[Iterator[bytes], Iterator[str]]: | ||
with contextlib.suppress(StopIteration): | ||
|
@@ -68,6 +70,16 @@ def return_path(self, stream: Iterator[D], *, path: str) -> Iterator[Union[D, Tu | |
for data in stream: | ||
yield path, data | ||
|
||
def as_tuple(self, stream: Iterator[D]) -> Iterator[Union[D, Tuple]]: | ||
if not self._as_tuple: | ||
yield from stream | ||
return | ||
for data in stream: | ||
if isinstance(data, list): | ||
yield tuple(data) | ||
else: | ||
yield data | ||
Comment on lines
+78
to
+81
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In terms of performance, we might want to directly There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I'll do this benchmarking. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @ejguan, I run the following benchmark: import os
import timeit
import tempfile
from torchdata.datapipes.iter import IterableWrapper, FileOpener
from torchdata.datapipes.iter.util.plain_text_reader import PlainTextReaderHelper
def impl_a(self, stream):
if not self._as_tuple:
yield from stream
return
for data in stream:
if isinstance(data, list):
yield tuple(data)
else:
yield data
def impl_b(self, stream):
if not self._as_tuple:
yield from stream
return
for data in stream:
yield tuple(data)
fake_line = '1,2,3,4,5,6,7,8,9'
line_num = 100 * 10000
with tempfile.TemporaryDirectory() as temp_dir:
temp_fpath = os.path.join(temp_dir, 'temp.csv')
with open(temp_fpath, 'w') as f:
f.write('\n'.join([fake_line, ] * line_num))
datapipe1 = IterableWrapper([temp_fpath])
datapipe2 = FileOpener(datapipe1, mode="b")
csv_parser_dp = datapipe2.parse_csv(as_tuple=True)
PlainTextReaderHelper.as_tuple = impl_a
print("impl a: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10))
PlainTextReaderHelper.as_tuple = impl_b
print("impl b: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10)) Got:
Any comments about this? ^_^ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's run a dummy test for the case that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @ejguan , I added a benchmark for import csv
import os
import timeit
import tempfile
from torchdata.datapipes.iter import IterableWrapper, FileOpener
from torchdata.datapipes.iter.util.plain_text_reader import PlainTextReaderHelper
from torchdata.datapipes.iter.util.plain_text_reader import _CSVBaseParserIterDataPipe
def impl_a(self, stream):
if not self._as_tuple:
yield from stream
return
for data in stream:
if isinstance(data, list):
yield tuple(data)
else:
yield data
def impl_b(self, stream):
if not self._as_tuple:
yield from stream
return
for data in stream:
yield tuple(data)
fake_line = '1,2,3,4,5,6,7,8,9'
line_num = 100 * 10000
with tempfile.TemporaryDirectory() as temp_dir:
temp_fpath = os.path.join(temp_dir, 'temp.csv')
with open(temp_fpath, 'w') as f:
f.write('\n'.join([fake_line, ] * line_num))
datapipe1 = IterableWrapper([temp_fpath])
datapipe2 = FileOpener(datapipe1, mode="b")
csv_parser_dp = datapipe2.parse_csv(as_tuple=True)
PlainTextReaderHelper.as_tuple = impl_a
print("impl a: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10))
PlainTextReaderHelper.as_tuple = impl_b
print("impl b: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10))
PlainTextReaderHelper.as_tuple = impl_a
csv_dict_parser_dp1 = _CSVBaseParserIterDataPipe(
datapipe2, csv.DictReader, decode=True, as_tuple=True)
csv_dict_parser_dp2 = _CSVBaseParserIterDataPipe(
datapipe2, csv.DictReader, decode=True, as_tuple=False)
print("impl a with dict as_tupe=True: ", timeit.timeit(stmt=lambda: list(csv_dict_parser_dp1), number=10))
print("impl a with dict as_tupe=False: ", timeit.timeit(stmt=lambda: list(csv_dict_parser_dp2), number=10)) Got:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi, there are the result of impl_b with dict parser:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cool. Then, let's leave it as it is. |
||
|
||
|
||
@functional_datapipe("readlines") | ||
class LineReaderIterDataPipe(IterDataPipe[Union[Str_Or_Bytes, Tuple[str, Str_Or_Bytes]]]): | ||
|
@@ -136,6 +148,7 @@ def __init__( | |
encoding="utf-8", | ||
errors: str = "ignore", | ||
return_path: bool = True, | ||
as_tuple: bool = False, | ||
**fmtparams, | ||
) -> None: | ||
self.source_datapipe = source_datapipe | ||
|
@@ -146,6 +159,7 @@ def __init__( | |
encoding=encoding, | ||
errors=errors, | ||
return_path=return_path, | ||
as_tuple=as_tuple, | ||
) | ||
self.fmtparams = fmtparams | ||
|
||
|
@@ -154,6 +168,7 @@ def __iter__(self) -> Iterator[Union[D, Tuple[str, D]]]: | |
stream = self._helper.skip_lines(file) | ||
stream = self._helper.decode(stream) | ||
stream = self._csv_reader(stream, **self.fmtparams) | ||
stream = self._helper.as_tuple(stream) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done! mypy doesn't suggest variable name reuse in one function body. |
||
yield from self._helper.return_path(stream, path=path) # type: ignore[misc] | ||
|
||
|
||
|
@@ -173,6 +188,7 @@ class CSVParserIterDataPipe(_CSVBaseParserIterDataPipe): | |
errors: the error handling scheme used while decoding | ||
return_path: if ``True``, each line will return a tuple of path and contents, rather | ||
than just the contents | ||
as_tuple: if ``True``, each line will return a tuple instead of a list | ||
|
||
Example: | ||
>>> from torchdata.datapipes.iter import IterableWrapper, FileOpener | ||
|
@@ -196,6 +212,7 @@ def __init__( | |
encoding: str = "utf-8", | ||
errors: str = "ignore", | ||
return_path: bool = False, | ||
as_tuple: bool = False, | ||
**fmtparams, | ||
) -> None: | ||
super().__init__( | ||
|
@@ -206,6 +223,7 @@ def __init__( | |
encoding=encoding, | ||
errors=errors, | ||
return_path=return_path, | ||
as_tuple=as_tuple, | ||
**fmtparams, | ||
) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emmm. It weird that CI doesn't catch the Error. Could you please change the variable name from
expected_res
to something else?It breaks another test at line 171
Edit: Actually, it's raised by CI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, I didn't notice that
expected_res
has been reused in another test case. Fixed!