Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSV inference reads in the whole file to memory, regardless of row limit #3658

Closed
kmitchener opened this issue Sep 29, 2022 · 3 comments · Fixed by #4661
Closed

CSV inference reads in the whole file to memory, regardless of row limit #3658

kmitchener opened this issue Sep 29, 2022 · 3 comments · Fixed by #4661
Labels
bug Something isn't working

Comments

@kmitchener
Copy link
Contributor

Describe the bug
When inferring the schema, the complete CSV will be read into memory even if you leave it at the default 1000 rows to infer from.

To Reproduce

let df = ctx.read_csv("./test/", CsvReadOptions::new()).await?;

Happens here:
https://github.com/apache/arrow-datafusion/blob/master/datafusion/core/src/datasource/file_format/csv.rs#L109

Expected behavior
It should read in only as much data as it needs for the given row count to infer data from.

Additional context

@kmitchener kmitchener added the bug Something isn't working label Sep 29, 2022
@tustvold
Copy link
Contributor

This should just be a case of hooking up #2936. I think the inference code dates from a time before that

@kmitchener
Copy link
Contributor Author

@tustvold thanks for the comment and pointer. I'm having a hard time figuring out how to hook up a Reader (used by the infer_reader_schema function) to a Future's stream. Do you have any hints for me? I was able to pull from the stream (regardless of file vs. networked object store) into a buffer, and turn that into a reader. But hooking up a reader directly to the stream is confounding me.

@tustvold
Copy link
Contributor

tustvold commented Oct 6, 2022

You will want to do something vaguely like (not properly tested)

async fn infer_schema(
        &self,
        store: &Arc<dyn ObjectStore>,
        objects: &[ObjectMeta],
    ) -> Result<SchemaRef> {
        let mut schemas = vec![];

        let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);

        for object in objects {
            match store.get(&object.location).await? {
                GetResult::File(f, _) => {
                    let (schema, records_read) = arrow::csv::reader::infer_reader_schema(
                        f,
                        self.delimiter,
                        Some(records_to_read),
                        self.has_header,
                    )?;
                    schemas.push(schema);
                    records_to_read -= records_read;
                }
                GetResult::Stream(s) => {
                    let mut has_header = self.has_header;
                    let s = newline_delimited_stream(s.map_err(Into::into));
                    pin_mut!(s);
                    while let Some(bytes) = s.next().await.transpose()? {
                        let (schema, records_read) =
                            arrow::csv::reader::infer_reader_schema(
                                bytes.as_ref(),
                                self.delimiter,
                                Some(records_to_read),
                                has_header,
                            )?;
                        has_header = false;

                        schemas.push(schema);
                        records_to_read -= records_read;
                        if records_to_read == 0 {
                            break;
                        }
                    }
                }
            }
        }

        let merged_schema = Schema::try_merge(schemas)?;
        Ok(Arc::new(merged_schema))
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants