Skip to content

Commit

Permalink
add test and fix missing builder
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal committed Aug 30, 2024
1 parent dd1110b commit b98980b
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 4 deletions.
8 changes: 8 additions & 0 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,13 @@ impl CsvFormat {
self
}

/// The character used to indicate the end of a row.
/// - default to None (CRLF)
pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
self.options.terminator = terminator;
self
}

/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
Expand Down Expand Up @@ -359,6 +366,7 @@ impl FileFormat for CsvFormat {
.with_has_header(has_header)
.with_delimeter(self.options.delimiter)
.with_quote(self.options.quote)
.with_terminator(self.options.terminator)
.with_escape(self.options.escape)
.with_comment(self.options.comment)
.with_newlines_in_values(newlines_in_values)
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_delimiter(self.delimiter)
.with_quote(self.quote)
.with_escape(self.escape)
.with_terminator(self.terminator)
.with_newlines_in_values(self.newlines_in_values)
.with_schema_infer_max_rec(self.schema_infer_max_records)
.with_file_compression_type(self.file_compression_type.to_owned());
Expand Down
15 changes: 11 additions & 4 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,6 @@ impl ExecutionPlan for CsvExec {
object_store,
comment: self.comment,
});

let opener = CsvOpener {
config,
file_compression_type: self.file_compression_type.to_owned(),
Expand Down Expand Up @@ -518,6 +517,7 @@ impl CsvConfig {

impl CsvConfig {
fn open<R: Read>(&self, reader: R) -> Result<csv::Reader<R>> {
dbg!(&self.terminator);
Ok(self.builder().build(reader)?)
}

Expand All @@ -527,7 +527,6 @@ impl CsvConfig {
.with_batch_size(self.batch_size)
.with_header(self.has_header)
.with_quote(self.quote);

if let Some(terminator) = self.terminator {
builder = builder.with_terminator(terminator);
}
Expand Down Expand Up @@ -557,6 +556,7 @@ impl CsvOpener {
config: Arc<CsvConfig>,
file_compression_type: FileCompressionType,
) -> Self {
dbg!(&config);
Self {
config,
file_compression_type,
Expand Down Expand Up @@ -1249,15 +1249,15 @@ mod tests {
let session_ctx = SessionContext::new();
let store = object_store::memory::InMemory::new();

let data = bytes::Bytes::from("a,b 1,2 3,4");
let data = bytes::Bytes::from("a,b\r1,2\r3,4");
let path = object_store::path::Path::from("a.csv");
store.put(&path, data.into()).await.unwrap();

let url = Url::parse("memory://").unwrap();
session_ctx.register_object_store(&url, Arc::new(store));

let df = session_ctx
.read_csv("memory:///", CsvReadOptions::new().terminator(Some(b' ')))
.read_csv("memory:///", CsvReadOptions::new().terminator(Some(b'\r')))
.await
.unwrap();

Expand All @@ -1273,6 +1273,13 @@ mod tests {
];

crate::assert_batches_eq!(expected, &result);

match session_ctx
.read_csv("memory:///", CsvReadOptions::new().terminator(Some(b'\n')))
.await.unwrap().collect().await {
Ok(_) => panic!("Expected error"),
Err(e) => assert_eq!(e.strip_backtrace(), "Arrow error: Csv error: incorrect number of fields for line 1, expected 2 got more than 2"),
}
}

#[tokio::test]
Expand Down

0 comments on commit b98980b

Please sign in to comment.