diff --git a/cpp/src/arrow/csv/parser.cc b/cpp/src/arrow/csv/parser.cc index 07e561276fa..446f36a4ee5 100644 --- a/cpp/src/arrow/csv/parser.cc +++ b/cpp/src/arrow/csv/parser.cc @@ -35,14 +35,24 @@ using detail::ParsedValueDesc; namespace { -Status ParseError(const char* message) { - return Status::Invalid("CSV parse error: ", message); +template +Status ParseError(Args&&... args) { + return Status::Invalid("CSV parse error: ", std::forward(args)...); } -Status MismatchingColumns(int32_t expected, int32_t actual) { - char s[50]; - snprintf(s, sizeof(s), "Expected %d columns, got %d", expected, actual); - return ParseError(s); +Status MismatchingColumns(int32_t expected, int32_t actual, int64_t row_num, + util::string_view row) { + std::string ellipse; + if (row.length() > 100) { + row = row.substr(0, 96); + ellipse = " ..."; + } + if (row_num < 0) { + return ParseError("Expected ", expected, " columns, got ", actual, ": ", row, + ellipse); + } + return ParseError("Row #", row_num, ": Expected ", expected, " columns, got ", actual, + ": ", row, ellipse); } inline bool IsControlChar(uint8_t c) { return c < ' '; } @@ -173,17 +183,24 @@ class PresizedValueDescWriter : public ValueDescWriter class BlockParserImpl { public: BlockParserImpl(MemoryPool* pool, ParseOptions options, int32_t num_cols, - int32_t max_num_rows) - : pool_(pool), options_(options), max_num_rows_(max_num_rows), batch_(num_cols) {} + int64_t first_row, int32_t max_num_rows) + : pool_(pool), + options_(options), + first_row_(first_row), + max_num_rows_(max_num_rows), + batch_(num_cols) {} const DataBatch& parsed_batch() const { return batch_; } + int64_t first_row_num() const { return first_row_; } + template Status ParseLine(ValueDescWriter* values_writer, DataWriter* parsed_writer, const char* data, const char* data_end, bool is_final, const char** out_data) { int32_t num_cols = 0; char c; + const auto start = data; DCHECK_GT(data_end, data); @@ -299,7 +316,17 @@ class BlockParserImpl { if (batch_.num_cols_ == -1) { batch_.num_cols_ = num_cols; } else { - return MismatchingColumns(batch_.num_cols_, num_cols); + // Find the end of the line without newline or carriage return + auto end = data; + if (*(end - 1) == '\n') { + --end; + } + if (*(end - 1) == '\r') { + --end; + } + return MismatchingColumns(batch_.num_cols_, num_cols, + first_row_ < 0 ? -1 : first_row_ + batch_.num_rows_, + util::string_view(start, end - start)); } } ++batch_.num_rows_; @@ -481,6 +508,7 @@ class BlockParserImpl { protected: MemoryPool* pool_; const ParseOptions options_; + const int64_t first_row_; // The maximum number of rows to parse from a block int32_t max_num_rows_; @@ -490,12 +518,14 @@ class BlockParserImpl { DataBatch batch_; }; -BlockParser::BlockParser(ParseOptions options, int32_t num_cols, int32_t max_num_rows) - : BlockParser(default_memory_pool(), options, num_cols, max_num_rows) {} +BlockParser::BlockParser(ParseOptions options, int32_t num_cols, int64_t first_row, + int32_t max_num_rows) + : BlockParser(default_memory_pool(), options, num_cols, first_row, max_num_rows) {} BlockParser::BlockParser(MemoryPool* pool, ParseOptions options, int32_t num_cols, - int32_t max_num_rows) - : impl_(new BlockParserImpl(pool, std::move(options), num_cols, max_num_rows)) {} + int64_t first_row, int32_t max_num_rows) + : impl_(new BlockParserImpl(pool, std::move(options), num_cols, first_row, + max_num_rows)) {} BlockParser::~BlockParser() {} @@ -519,6 +549,8 @@ Status BlockParser::ParseFinal(util::string_view data, uint32_t* out_size) { const DataBatch& BlockParser::parsed_batch() const { return impl_->parsed_batch(); } +int64_t BlockParser::first_row_num() const { return impl_->first_row_num(); } + int32_t SkipRows(const uint8_t* data, uint32_t size, int32_t num_rows, const uint8_t** out_data) { const auto end = data + size; diff --git a/cpp/src/arrow/csv/parser.h b/cpp/src/arrow/csv/parser.h index 4fcc52fb3a6..ffc735c228f 100644 --- a/cpp/src/arrow/csv/parser.h +++ b/cpp/src/arrow/csv/parser.h @@ -63,19 +63,26 @@ class ARROW_EXPORT DataBatch { uint32_t num_bytes() const { return parsed_size_; } template - Status VisitColumn(int32_t col_index, Visitor&& visit) const { + Status VisitColumn(int32_t col_index, int64_t first_row, Visitor&& visit) const { using detail::ParsedValueDesc; + int64_t row = first_row; for (size_t buf_index = 0; buf_index < values_buffers_.size(); ++buf_index) { const auto& values_buffer = values_buffers_[buf_index]; const auto values = reinterpret_cast(values_buffer->data()); const auto max_pos = static_cast(values_buffer->size() / sizeof(ParsedValueDesc)) - 1; - for (int32_t pos = col_index; pos < max_pos; pos += num_cols_) { + for (int32_t pos = col_index; pos < max_pos; pos += num_cols_, ++row) { auto start = values[pos].offset; auto stop = values[pos + 1].offset; auto quoted = values[pos + 1].quoted; - ARROW_RETURN_NOT_OK(visit(parsed_ + start, stop - start, quoted)); + Status status = visit(parsed_ + start, stop - start, quoted); + if (ARROW_PREDICT_FALSE(!status.ok())) { + if (first_row >= 0) { + status = status.WithMessage("Row #", row, ": ", status.message()); + } + ARROW_RETURN_NOT_OK(status); + } } } return Status::OK(); @@ -134,9 +141,9 @@ constexpr int32_t kMaxParserNumRows = 100000; class ARROW_EXPORT BlockParser { public: explicit BlockParser(ParseOptions options, int32_t num_cols = -1, - int32_t max_num_rows = kMaxParserNumRows); + int64_t first_row = -1, int32_t max_num_rows = kMaxParserNumRows); explicit BlockParser(MemoryPool* pool, ParseOptions options, int32_t num_cols = -1, - int32_t max_num_rows = kMaxParserNumRows); + int64_t first_row = -1, int32_t max_num_rows = kMaxParserNumRows); ~BlockParser(); /// \brief Parse a block of data @@ -167,6 +174,8 @@ class ARROW_EXPORT BlockParser { int32_t num_cols() const { return parsed_batch().num_cols(); } /// \brief Return the total size in bytes of parsed data uint32_t num_bytes() const { return parsed_batch().num_bytes(); } + /// \brief Return the row number of the first row in the block or -1 if unsupported + int64_t first_row_num() const; /// \brief Visit parsed values in a column /// @@ -174,7 +183,8 @@ class ARROW_EXPORT BlockParser { /// Status(const uint8_t* data, uint32_t size, bool quoted) template Status VisitColumn(int32_t col_index, Visitor&& visit) const { - return parsed_batch().VisitColumn(col_index, std::forward(visit)); + return parsed_batch().VisitColumn(col_index, first_row_num(), + std::forward(visit)); } template diff --git a/cpp/src/arrow/csv/parser_test.cc b/cpp/src/arrow/csv/parser_test.cc index 6414b379804..67cf4226a7a 100644 --- a/cpp/src/arrow/csv/parser_test.cc +++ b/cpp/src/arrow/csv/parser_test.cc @@ -20,6 +20,7 @@ #include #include +#include #include #include "arrow/csv/options.h" @@ -295,7 +296,7 @@ TEST(BlockParser, Newlines) { TEST(BlockParser, MaxNumRows) { auto csv = MakeCSVData({"a\n", "b\n", "c\n", "d\n"}); - BlockParser parser(ParseOptions::Defaults(), -1, 3 /* max_num_rows */); + BlockParser parser(ParseOptions::Defaults(), -1, 0, 3 /* max_num_rows */); AssertParsePartial(parser, csv, 6); AssertColumnsEq(parser, {{"a", "b", "c"}}); @@ -536,22 +537,37 @@ TEST(BlockParser, QuotesSpecial) { TEST(BlockParser, MismatchingNumColumns) { uint32_t out_size; { - BlockParser parser(ParseOptions::Defaults()); + BlockParser parser(ParseOptions::Defaults(), -1, 0 /* first_row */); auto csv = MakeCSVData({"a,b\nc\n"}); Status st = Parse(parser, csv, &out_size); - ASSERT_RAISES(Invalid, st); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, + testing::HasSubstr("CSV parse error: Row #1: Expected 2 columns, got 1: c"), st); } { - BlockParser parser(ParseOptions::Defaults(), 2 /* num_cols */); + BlockParser parser(ParseOptions::Defaults(), 2 /* num_cols */, 0 /* first_row */); auto csv = MakeCSVData({"a\n"}); Status st = Parse(parser, csv, &out_size); - ASSERT_RAISES(Invalid, st); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, + testing::HasSubstr("CSV parse error: Row #0: Expected 2 columns, got 1: a"), st); } { - BlockParser parser(ParseOptions::Defaults(), 2 /* num_cols */); + BlockParser parser(ParseOptions::Defaults(), 2 /* num_cols */, 50 /* first_row */); auto csv = MakeCSVData({"a,b,c\n"}); Status st = Parse(parser, csv, &out_size); - ASSERT_RAISES(Invalid, st); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, + testing::HasSubstr("CSV parse error: Row #50: Expected 2 columns, got 3: a,b,c"), + st); + } + // No row number + { + BlockParser parser(ParseOptions::Defaults(), 2 /* num_cols */, -1); + auto csv = MakeCSVData({"a\n"}); + Status st = Parse(parser, csv, &out_size); + EXPECT_RAISES_WITH_MESSAGE_THAT( + Invalid, testing::HasSubstr("CSV parse error: Expected 2 columns, got 1: a"), st); } } @@ -623,5 +639,46 @@ TEST(BlockParser, QuotedEscape) { } } +TEST(BlockParser, RowNumberAppendedToError) { + auto options = ParseOptions::Defaults(); + auto csv = "a,b,c\nd,e,f\ng,h,i\n"; + { + BlockParser parser(options, -1, 0); + ASSERT_NO_FATAL_FAILURE(AssertParseOk(parser, csv)); + int row = 0; + auto status = parser.VisitColumn( + 0, [row](const uint8_t* data, uint32_t size, bool quoted) mutable -> Status { + return ++row == 2 ? Status::Invalid("Bad value") : Status::OK(); + }); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr("Row #1: Bad value"), + status); + } + + { + BlockParser parser(options, -1, 100); + ASSERT_NO_FATAL_FAILURE(AssertParseOk(parser, csv)); + int row = 0; + auto status = parser.VisitColumn( + 0, [row](const uint8_t* data, uint32_t size, bool quoted) mutable -> Status { + return ++row == 3 ? Status::Invalid("Bad value") : Status::OK(); + }); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr("Row #102: Bad value"), + status); + } + + // No first row specified should not append row information + { + BlockParser parser(options, -1, -1); + ASSERT_NO_FATAL_FAILURE(AssertParseOk(parser, csv)); + int row = 0; + auto status = parser.VisitColumn( + 0, [row](const uint8_t* data, uint32_t size, bool quoted) mutable -> Status { + return ++row == 3 ? Status::Invalid("Bad value") : Status::OK(); + }); + EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::Not(testing::HasSubstr("Row")), + status); + } +} + } // namespace csv } // namespace arrow diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index f05f8cac9a9..7a12cfea943 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -319,11 +319,13 @@ class ReaderMixin { public: ReaderMixin(io::IOContext io_context, std::shared_ptr input, const ReadOptions& read_options, const ParseOptions& parse_options, - const ConvertOptions& convert_options) + const ConvertOptions& convert_options, bool count_rows) : io_context_(std::move(io_context)), read_options_(read_options), parse_options_(parse_options), convert_options_(convert_options), + count_rows_(count_rows), + num_rows_seen_(count_rows_ ? 1 : -1), input_(std::move(input)) {} protected: @@ -344,11 +346,15 @@ class ReaderMixin { " rows from CSV file, " "either file is too short or header is larger than block size"); } + if (count_rows_) { + num_rows_seen_ = num_skipped_rows; + } } if (read_options_.column_names.empty()) { // Parse one row (either to read column names or to know the number of columns) - BlockParser parser(io_context_.pool(), parse_options_, num_csv_cols_, 1); + BlockParser parser(io_context_.pool(), parse_options_, num_csv_cols_, + num_rows_seen_, 1); uint32_t parsed_size = 0; RETURN_NOT_OK(parser.Parse( util::string_view(reinterpret_cast(data), data_end - data), @@ -374,6 +380,9 @@ class ReaderMixin { DCHECK_EQ(static_cast(parser.num_cols()), column_names_.size()); // Skip parsed header row data += parsed_size; + if (count_rows_) { + ++num_rows_seen_; + } } } else { column_names_ = read_options_.column_names; @@ -466,8 +475,8 @@ class ReaderMixin { const std::shared_ptr& block, int64_t block_index, bool is_final) { static constexpr int32_t max_num_rows = std::numeric_limits::max(); - auto parser = std::make_shared(io_context_.pool(), parse_options_, - num_csv_cols_, max_num_rows); + auto parser = std::make_shared( + io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows); std::shared_ptr straddling; std::vector views; @@ -490,6 +499,9 @@ class ReaderMixin { } else { RETURN_NOT_OK(parser->Parse(views, &parsed_size)); } + if (count_rows_) { + num_rows_seen_ += parser->num_rows(); + } return ParseResult{std::move(parser), static_cast(parsed_size)}; } @@ -500,6 +512,10 @@ class ReaderMixin { // Number of columns in the CSV file int32_t num_csv_cols_ = -1; + // Whether num_rows_seen_ tracks the number of rows seen in the CSV being parsed + bool count_rows_; + // Number of rows seen in the csv. Not used if count_rows is false + int64_t num_rows_seen_; // Column names in the CSV file std::vector column_names_; ConversionSchema conversion_schema_; @@ -588,9 +604,9 @@ class BaseStreamingReader : public ReaderMixin, public csv::StreamingReader { BaseStreamingReader(io::IOContext io_context, Executor* cpu_executor, std::shared_ptr input, const ReadOptions& read_options, const ParseOptions& parse_options, - const ConvertOptions& convert_options) + const ConvertOptions& convert_options, bool count_rows) : ReaderMixin(io_context, std::move(input), read_options, parse_options, - convert_options), + convert_options, count_rows), cpu_executor_(cpu_executor) {} virtual Future> Init() = 0; @@ -889,8 +905,9 @@ class AsyncThreadedTableReader const ReadOptions& read_options, const ParseOptions& parse_options, const ConvertOptions& convert_options, Executor* cpu_executor) + // Count rows is currently not supported during parallel read : BaseTableReader(std::move(io_context), input, read_options, parse_options, - convert_options), + convert_options, /*count_rows=*/false), cpu_executor_(cpu_executor) {} ~AsyncThreadedTableReader() override { @@ -992,7 +1009,8 @@ Result> MakeTableReader( io_context, input, read_options, parse_options, convert_options, cpu_executor); } else { reader = std::make_shared(io_context, input, read_options, - parse_options, convert_options); + parse_options, convert_options, + /*count_rows=*/true); } RETURN_NOT_OK(reader->Init()); return reader; @@ -1004,7 +1022,8 @@ Future> MakeStreamingReader( const ParseOptions& parse_options, const ConvertOptions& convert_options) { std::shared_ptr reader; reader = std::make_shared( - io_context, cpu_executor, input, read_options, parse_options, convert_options); + io_context, cpu_executor, input, read_options, parse_options, convert_options, + /*count_rows=*/true); return reader->Init(); } diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 9c11afec264..5a16a52c544 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -51,7 +51,7 @@ Result> GetColumnNames( const csv::ParseOptions& parse_options, util::string_view first_block, MemoryPool* pool) { uint32_t parsed_size = 0; - csv::BlockParser parser(pool, parse_options, /*num_cols=*/-1, + csv::BlockParser parser(pool, parse_options, /*num_cols=*/-1, /*first_row=*/1, /*max_num_rows=*/1); RETURN_NOT_OK(parser.Parse(util::string_view{first_block}, &parsed_size)); diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index fef1ac60f37..3fa9ae02e4d 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -52,11 +52,12 @@ def generate_col_names(): yield first + second -def make_random_csv(num_cols=2, num_rows=10, linesep='\r\n'): +def make_random_csv(num_cols=2, num_rows=10, linesep='\r\n', write_names=True): arr = np.random.RandomState(42).randint(0, 1000, size=(num_cols, num_rows)) - col_names = list(itertools.islice(generate_col_names(), num_cols)) csv = io.StringIO() - csv.write(",".join(col_names)) + col_names = list(itertools.islice(generate_col_names(), num_cols)) + if write_names: + csv.write(",".join(col_names)) csv.write(linesep) for row in arr.T: csv.write(",".join(map(str, row))) @@ -974,6 +975,71 @@ def read_csv(self, *args, validate_full=True, **kwargs): table.validate(full=validate_full) return table + def test_row_numbers_in_errors(self): + """ Row numbers are only correctly counted in serial reads """ + csv, _ = make_random_csv(4, 100, write_names=True) + + read_options = ReadOptions() + read_options.block_size = len(csv) / 3 + convert_options = ConvertOptions() + convert_options.column_types = {"a": pa.int32(), "d": pa.int32()} + + # Test without skip_rows and column names in the csv + csv_bad_columns = csv + b"1,2\r\n" + with pytest.raises(pa.ArrowInvalid, + match="Row #102: Expected 4 columns, got 2"): + self.read_bytes(csv_bad_columns, read_options=read_options, + convert_options=convert_options) + + csv_bad_type = csv + b"a,b,c,d\r\n" + message = ("In CSV column #0: Row #102: " + + "CSV conversion error to int32: invalid value 'a'") + with pytest.raises(pa.ArrowInvalid, match=message): + self.read_bytes(csv_bad_type, read_options=read_options, + convert_options=convert_options) + + long_row = (b"this is a long row" * 15) + b",3\r\n" + csv_bad_columns_long = csv + long_row + message = ("Row #102: Expected 4 columns, got 2: " + + long_row[0:96].decode("utf-8") + " ...") + with pytest.raises(pa.ArrowInvalid, match=message): + self.read_bytes(csv_bad_columns_long, read_options=read_options, + convert_options=convert_options) + + # Test without skip_rows and column names not in the csv + csv, _ = make_random_csv(4, 100, write_names=False) + read_options.column_names = ["a", "b", "c", "d"] + csv_bad_columns = csv + b"1,2\r\n" + with pytest.raises(pa.ArrowInvalid, + match="Row #101: Expected 4 columns, got 2"): + self.read_bytes(csv_bad_columns, read_options=read_options, + convert_options=convert_options) + + csv_bad_columns_long = csv + long_row + message = ("Row #101: Expected 4 columns, got 2: " + + long_row[0:96].decode("utf-8") + " ...") + with pytest.raises(pa.ArrowInvalid, match=message): + self.read_bytes(csv_bad_columns_long, read_options=read_options, + convert_options=convert_options) + + csv_bad_type = csv + b"a,b,c,d\r\n" + message = ("In CSV column #0: Row #101: " + + "CSV conversion error to int32: invalid value 'a'") + with pytest.raises(pa.ArrowInvalid, match=message): + self.read_bytes(csv_bad_type, read_options=read_options, + convert_options=convert_options) + + # Test with skip_rows and column names not in the csv + read_options.skip_rows = 23 + with pytest.raises(pa.ArrowInvalid, + match="Row #101: Expected 4 columns, got 2"): + self.read_bytes(csv_bad_columns, read_options=read_options, + convert_options=convert_options) + + with pytest.raises(pa.ArrowInvalid, match=message): + self.read_bytes(csv_bad_type, read_options=read_options, + convert_options=convert_options) + class TestParallelCSVRead(BaseTestCSVRead, unittest.TestCase):