From e72528e961fc697a13a60d3e6ef4cd435f1bab92 Mon Sep 17 00:00:00 2001 From: yancong Date: Tue, 12 Jul 2022 12:26:46 +0800 Subject: [PATCH 1/5] feat(CSVParserIterDataPipe): add as_tuple argument --- test/test_local_io.py | 5 +++++ .../datapipes/iter/util/plain_text_reader.py | 20 ++++++++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/test/test_local_io.py b/test/test_local_io.py index f15048b86..d48bffae7 100644 --- a/test/test_local_io.py +++ b/test/test_local_io.py @@ -159,6 +159,11 @@ def make_path(fname): expected_res = [("1.csv", ["key", "item"]), ("1.csv", ["a", "1"]), ("1.csv", ["b", "2"]), ("empty2.csv", [])] self.assertEqual(expected_res, list(csv_parser_dp)) + # Functional Test: yield one row at time from each file as tuple instead of list, skipping over empty content + csv_parser_dp = datapipe3.parse_csv(as_tuple=True) + expected_res = [("key", "item"), ("a", "1"), ("b", "2"), ()] + self.assertEqual(expected_res, list(csv_parser_dp)) + # Reset Test: csv_parser_dp = CSVParser(datapipe3, return_path=True) n_elements_before_reset = 2 diff --git a/torchdata/datapipes/iter/util/plain_text_reader.py b/torchdata/datapipes/iter/util/plain_text_reader.py index da11ad2d0..78f73f34d 100644 --- a/torchdata/datapipes/iter/util/plain_text_reader.py +++ b/torchdata/datapipes/iter/util/plain_text_reader.py @@ -6,7 +6,7 @@ import contextlib import csv -from typing import IO, Iterator, Tuple, TypeVar, Union +from typing import IO, Iterator, List, Tuple, TypeVar, Union from torchdata.datapipes import functional_datapipe from torchdata.datapipes.iter import IterDataPipe @@ -25,6 +25,7 @@ def __init__( encoding="utf-8", errors: str = "ignore", return_path: bool = False, + as_tuple: bool = False, ) -> None: if skip_lines < 0: raise ValueError("'skip_lines' is required to be a positive integer.") @@ -34,6 +35,7 @@ def __init__( self._encoding = encoding self._errors = errors self._return_path = return_path + self._as_tuple = as_tuple def skip_lines(self, file: IO) -> Union[Iterator[bytes], Iterator[str]]: with contextlib.suppress(StopIteration): @@ -68,6 +70,16 @@ def return_path(self, stream: Iterator[D], *, path: str) -> Iterator[Union[D, Tu for data in stream: yield path, data + def as_tuple(self, stream: Iterator[List]) -> Iterator[Union[List, Tuple]]: + if not self._as_tuple: + yield from stream + return + for data in stream: + if isinstance(data, list): + yield tuple(data) + else: + yield data + @functional_datapipe("readlines") class LineReaderIterDataPipe(IterDataPipe[Union[Str_Or_Bytes, Tuple[str, Str_Or_Bytes]]]): @@ -136,6 +148,7 @@ def __init__( encoding="utf-8", errors: str = "ignore", return_path: bool = True, + as_tuple: bool = False, **fmtparams, ) -> None: self.source_datapipe = source_datapipe @@ -146,6 +159,7 @@ def __init__( encoding=encoding, errors=errors, return_path=return_path, + as_tuple=as_tuple, ) self.fmtparams = fmtparams @@ -154,6 +168,7 @@ def __iter__(self) -> Iterator[Union[D, Tuple[str, D]]]: stream = self._helper.skip_lines(file) stream = self._helper.decode(stream) stream = self._csv_reader(stream, **self.fmtparams) + stream = self._helper.as_tuple(stream) yield from self._helper.return_path(stream, path=path) # type: ignore[misc] @@ -173,6 +188,7 @@ class CSVParserIterDataPipe(_CSVBaseParserIterDataPipe): errors: the error handling scheme used while decoding return_path: if ``True``, each line will return a tuple of path and contents, rather than just the contents + as_tuple: if ``True``, each line return as a tuple instead of a list Example: >>> from torchdata.datapipes.iter import IterableWrapper, FileOpener @@ -196,6 +212,7 @@ def __init__( encoding: str = "utf-8", errors: str = "ignore", return_path: bool = False, + as_tuple: bool = False, **fmtparams, ) -> None: super().__init__( @@ -206,6 +223,7 @@ def __init__( encoding=encoding, errors=errors, return_path=return_path, + as_tuple=as_tuple, **fmtparams, ) From 932deca9fbc2d1edc27557cbfc4806929090da20 Mon Sep 17 00:00:00 2001 From: yancong Date: Tue, 12 Jul 2022 23:21:39 +0800 Subject: [PATCH 2/5] doc(CSVParserIterDataPipe): refine doc comment --- torchdata/datapipes/iter/util/plain_text_reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/torchdata/datapipes/iter/util/plain_text_reader.py b/torchdata/datapipes/iter/util/plain_text_reader.py index 78f73f34d..914deb470 100644 --- a/torchdata/datapipes/iter/util/plain_text_reader.py +++ b/torchdata/datapipes/iter/util/plain_text_reader.py @@ -188,7 +188,7 @@ class CSVParserIterDataPipe(_CSVBaseParserIterDataPipe): errors: the error handling scheme used while decoding return_path: if ``True``, each line will return a tuple of path and contents, rather than just the contents - as_tuple: if ``True``, each line return as a tuple instead of a list + as_tuple: if ``True``, each line will return a tuple instead of a list Example: >>> from torchdata.datapipes.iter import IterableWrapper, FileOpener From 9aac78726cf2a0ab8ec2fb096417e2dcd2897d7f Mon Sep 17 00:00:00 2001 From: yancong Date: Tue, 12 Jul 2022 23:30:14 +0800 Subject: [PATCH 3/5] fix(CSVParserIterDataPipe): fix type hint of PlainTextReaderHelper.as_tuple --- torchdata/datapipes/iter/util/plain_text_reader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/torchdata/datapipes/iter/util/plain_text_reader.py b/torchdata/datapipes/iter/util/plain_text_reader.py index 914deb470..10150a6f4 100644 --- a/torchdata/datapipes/iter/util/plain_text_reader.py +++ b/torchdata/datapipes/iter/util/plain_text_reader.py @@ -6,7 +6,7 @@ import contextlib import csv -from typing import IO, Iterator, List, Tuple, TypeVar, Union +from typing import IO, Iterator, Tuple, TypeVar, Union from torchdata.datapipes import functional_datapipe from torchdata.datapipes.iter import IterDataPipe @@ -70,7 +70,7 @@ def return_path(self, stream: Iterator[D], *, path: str) -> Iterator[Union[D, Tu for data in stream: yield path, data - def as_tuple(self, stream: Iterator[List]) -> Iterator[Union[List, Tuple]]: + def as_tuple(self, stream: Iterator[D]) -> Iterator[Union[D, Tuple]]: if not self._as_tuple: yield from stream return From 8a36d6ede97346d349559d89e360aae5ba96e1e5 Mon Sep 17 00:00:00 2001 From: yancong Date: Thu, 14 Jul 2022 08:55:02 +0800 Subject: [PATCH 4/5] fix(tests/test_local_io.py): fix ci --- test/test_local_io.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test/test_local_io.py b/test/test_local_io.py index d48bffae7..7c4185763 100644 --- a/test/test_local_io.py +++ b/test/test_local_io.py @@ -167,6 +167,7 @@ def make_path(fname): # Reset Test: csv_parser_dp = CSVParser(datapipe3, return_path=True) n_elements_before_reset = 2 + expected_res = [("1.csv", ["key", "item"]), ("1.csv", ["a", "1"]), ("1.csv", ["b", "2"]), ("empty2.csv", [])] res_before_reset, res_after_reset = reset_after_n_next_calls(csv_parser_dp, n_elements_before_reset) self.assertEqual(expected_res[:n_elements_before_reset], res_before_reset) self.assertEqual(expected_res, res_after_reset) From 6ebebe04dd923289a020e72bb9166fa76ffc418f Mon Sep 17 00:00:00 2001 From: yancong Date: Thu, 14 Jul 2022 22:07:31 +0800 Subject: [PATCH 5/5] fix(_CSVBaseParserIterDataPipe): add mypy ignore comment --- torchdata/datapipes/iter/util/plain_text_reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/torchdata/datapipes/iter/util/plain_text_reader.py b/torchdata/datapipes/iter/util/plain_text_reader.py index 10150a6f4..6a03368ee 100644 --- a/torchdata/datapipes/iter/util/plain_text_reader.py +++ b/torchdata/datapipes/iter/util/plain_text_reader.py @@ -168,7 +168,7 @@ def __iter__(self) -> Iterator[Union[D, Tuple[str, D]]]: stream = self._helper.skip_lines(file) stream = self._helper.decode(stream) stream = self._csv_reader(stream, **self.fmtparams) - stream = self._helper.as_tuple(stream) + stream = self._helper.as_tuple(stream) # type: ignore[assignment] yield from self._helper.return_path(stream, path=path) # type: ignore[misc]