Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't write data from parquet file to delta table (Rust) #1470

Closed
cj-zhukov opened this issue Jun 16, 2023 · 2 comments · Fixed by #2274
Closed

Can't write data from parquet file to delta table (Rust) #1470

cj-zhukov opened this issue Jun 16, 2023 · 2 comments · Fixed by #2274
Labels
bug Something isn't working

Comments

@cj-zhukov
Copy link

Environment

Delta-rs version: 0.12

Binding: Rust

Environment:

  • OS: macOS Monterey 12.6.2
  • Other: polars = "0.30", tokio = "1", tokio-stream = "0.1"

Bug

What happened:
Cannot write data from parquet file to delta table

Error: Generic("Arrow RecordBatch schema does not match: RecordBatch schema: Field { name: "id", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "id", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }")

What you expected to happen:
Write data from parquet file to delta table with no issues

How to reproduce it:

use std::collections::HashMap;
use std::error::Error;

use deltalake::writer::{RecordBatchWriter, DeltaWriter};
use deltalake::{SchemaField, SchemaDataType, DeltaOps};
use deltalake::parquet::arrow::ParquetRecordBatchStreamBuilder;
use polars::prelude::*;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio_stream::StreamExt;

#[derive(Debug)]
struct Test;
impl Test {
    fn columns() -> Vec<SchemaField> {
        vec![
            SchemaField::new(
                "id".to_string(),
                SchemaDataType::primitive("string".to_string()), 
                true,
                HashMap::new(),
            ),
        ]
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // create parquet file with polars
    let path = "/path/to/parquet/df.parquet";
    create_bad_df(path)?;

    // read parquet file
    let data = read_file(path).await?;
    let cursor = std::io::Cursor::new(data);
    let builder = ParquetRecordBatchStreamBuilder::new(cursor).await?;
    let mut stream = builder.build()?;
    let mut records = Vec::new();
    while let Some(record) = stream.next().await {
        let record = record?;
        records.push(record);
    }

    // create delta table 
    let delta_path = "/path/to/delta/";
    let ops = DeltaOps::try_from_uri(delta_path).await?;
    let mut table = ops
        .create()
        .with_columns(Test::columns())
        .await?;

    // writing to delta
    let mut writer = RecordBatchWriter::for_table(&table)?;
    for record in records {
        writer.write(record).await?;
        let adds = writer
            .flush_and_commit(&mut table)
            .await
            .expect("Failed to flush write");

        println!("written={}", adds);
    }

    Ok(())
}

fn create_bad_df(path: &str) -> Result<(), Box<dyn Error>> {
    let mut df = df!(
        "id" => &["some_string_data"],
    ).unwrap();

    let mut file = std::fs::File::create(path)?;
    ParquetWriter::new(&mut file).finish(&mut df)?;
    Ok(())
}

async fn read_file(file_name: &str) -> Result<Vec<u8>, Box<dyn Error>> {
    let mut f = File::open(file_name).await?;
    let mut buffer = Vec::new();
    f.read_to_end(&mut buffer).await?;
    Ok(buffer)
}

More details:
This issue I think is somehow related to data_type: LargeUtf8. If I change data type when creating parquet file with polars, for example, to integer, I'll face no issue (see example below). How I can write data from parquet to delta? Please, provide an example? Maybe it is possible to parse data from parquet to user struct and then create RecordBatch from this user struct for further writing to delta (like here https://github.com/delta-io/delta-rs/blob/main/rust/examples/recordbatch-writer.rs)?

#[derive(Debug)]
struct Test;
impl Test {
    fn columns() -> Vec<SchemaField> {
        vec![
            SchemaField::new(
                "id".to_string(),
                SchemaDataType::primitive("integer".to_string()), 
                true,
                HashMap::new(),
            ),
        ]
    }
}

fn create_good_df(path: &str) -> Result<(), Box<dyn Error>> {
    let mut df = df!(
        "id" => 0..5,
    ).unwrap();

    let mut file = std::fs::File::create(path)?;
    ParquetWriter::new(&mut file).finish(&mut df)?;
    Ok(())
}
@cj-zhukov cj-zhukov added the bug Something isn't working label Jun 16, 2023
@wjones127
Copy link
Collaborator

I think the issue here is we have a 1-1 mapping between Delta lake types and Arrow types, and we don't do any automatic casting. Right now, we may the Delta Lake "string" type to the Arrow "Utf8" type. However, Polars doesn't support "Utf8" type; it always uses the large variant "LargeUtf8". So we'll need to alter the mapping to handle this complexity.

@bsarden
Copy link

bsarden commented Jul 2, 2023

+1, I also hit this error today experimenting with the polars_delta_io_manager in Dagster. The lack of the LargeUtf8 mapping on the deltalake side results in the following error on compaction.

https://github.com/danielgafni/dagster-polars

from deltalake import DeltaTable
dt = DeltaTable("/path/to/asset.delta")

dt.pyarrow_schema()
# date: string
# counter_name: string
# counter_value: double

dt.optimize.compact()

Results in the following error:

_internal.DeltaError: Data does not match the schema or partitions of the table: Unexpected Arrow schema: got: Field { name: "counter_name", data_type: LargeUtf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "counter_value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, expected: Field { name: "counter_name", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "counter_value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }

The LargeUtf8 vs Utf8 type discrepancy on the counter_name column seems to be the issue here. How much effort is it to handle the LargeUtf8 type mapping on the deltalake side?

ion-elgreco added a commit that referenced this issue Mar 15, 2024
…2274)

# Description
Such a small change, but fixes many issues where parquets were written
with arrow where the source data was in large dtype format.

By default the parquet::ParquetReader decodes the arrow metadata which
in return may give you large dtypes. This would cause issues during
DataFusion parquet scan with a filter since the filter wouldn't coerce
to the large dtypes. Simply disabling the arrow metadata decoding gives
us the parquet schema converted to an arrow schema without large types
👯‍♂️

# Related issue(s)
- closes #1470
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants