Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ProtocolChanged error when perfoming append write #1585

Closed
finlaydotb opened this issue Aug 7, 2023 · 2 comments
Closed

ProtocolChanged error when perfoming append write #1585

finlaydotb opened this issue Aug 7, 2023 · 2 comments
Labels
bug Something isn't working

Comments

@finlaydotb
Copy link

Environment

Delta-rs version: 0.13.0

Binding:

Environment:

  • Cloud provider:
  • OS: MacOS
  • Other:

Bug

I have this Python code that executes 3 insert transactions into a delta table

import pandas as pd
from deltalake.writer import write_deltalake
from deltalake import DeltaTable

if __name__ == '__main__':
    # First transaction
    id_list = []
    name_list = []

    for i in range(1, 11):
        id_list.append(i)
        name_list.append(f'item {i}')

    data_dict = {'id': id_list, 'name': name_list}
    write_deltalake('./data/table2', pd.DataFrame(data_dict), mode='append')

    # Second transaction
    id_list = []
    name_list = []

    for i in range(11, 16):
        id_list.append(i)
        name_list.append(f'item {i}')

    data_dict = {'id': id_list, 'name': name_list}
    write_deltalake('./data/table2', pd.DataFrame(data_dict), mode='append')

    # Third transaction
    id_list = []
    name_list = []

    for i in range(16, 31):
        id_list.append(i)
        name_list.append(f'item {i}')

    data_dict = {'id': id_list, 'name': name_list}
    write_deltalake('./data/table2', pd.DataFrame(data_dict), mode='append')

This does what is intended. When I run this code, I can confirm by checking the _delta_log that indeed 3 transactions was executed.

Now I am trying to do similar using Rust, with the following code

fn get_data(start: u8, end: u8) -> RecordBatch {
    let schema = Arc::new(ArrowSchema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("name", DataType::Utf8, false),
    ]));

    let ids: Vec<i32> = (start..=end).map(i32::from).collect();
    let names: Vec<String> = ids.iter().map(|x| format!("item {x}")).collect();

    let id_values = Int32Array::from(ids);
    let name_values = StringArray::from(names);

    RecordBatch::try_new(schema, vec![Arc::new(id_values), Arc::new(name_values)]).unwrap()
}

async fn main() {
    // First transaction
    let data = get_data(1, 10);

    let table = DeltaTableBuilder::from_uri("./data/table3")
        .build()
        .unwrap();

    let ops = DeltaOps::from(table);

    ops.write(vec![data]).await.unwrap();

    // Second transaction
    let data = get_data(11, 15);

    let table = DeltaTableBuilder::from_uri("./data/table3")
        .build()
        .unwrap();

    let ops = DeltaOps::from(table);

    ops.write(vec![data]).await.unwrap();

    // Third transaction
    let data = get_data(16, 30);

    let table = DeltaTableBuilder::from_uri("./data/table3")
        .build()
        .unwrap();

    let ops = DeltaOps::from(table);

    ops.write(vec![data]).await.unwrap();
}

But when I run this, it fails with the following error

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Transaction { source: CommitConflict(ProtocolChanged) }', src/main.rs:87:33
stack backtrace:
   0: rust_begin_unwind

I noticed the first transaction is executed and this error happens on the second write to the delta table.

@finlaydotb finlaydotb added the bug Something isn't working label Aug 7, 2023
@finlaydotb finlaydotb changed the title ProtocolChanged Error when perfoming Append write ProtocolChanged error when perfoming append write Aug 7, 2023
@roeap
Copy link
Collaborator

roeap commented Aug 18, 2023

The reason you are seeing this probably due to poor documentation on our end :( ...

The build() method just sets up the table, but does not yet actually load the table state. What happens then is that each individual transaction tries to create a new table (v0), which would include the protocol metadata action. On trying to commit it sees the version already exists and conflict resolution kicks in. Writing new data to a table when the protocol was changed by a concurrent transaction (which the conflict checker thinks happened) is an illegal operation.

So in the first case build is what you want, in subsequent calls you need to load the table state via .load().await, rather then build().

The write operation actually also returns the updated table instance, so you do not have to re-load entire state every time when you do multiple operations in a row..

something like

async fn main() {
   let table = DeltaTableBuilder::from_uri("./data/table3")
        .build()
        .unwrap();
    println!("version: {}", table.version());

     // First transaction
    let data = get_data(1, 10);
    let table = DeltaOps(table).write(vec![data]).await.unwrap();

    println!("version: {}", table.version());

    // Second transaction
    let data = get_data(11, 15);
    let table = DeltaOps(table).write(vec![data]).await.unwrap();

    println!("version: {}", table.version());

    // Third transaction
    let data = get_data(16, 30);
    let table = DeltaOps(table).write(vec![data]).await.unwrap();

    println!("version: {}", table.version());
}

@roeap
Copy link
Collaborator

roeap commented Sep 23, 2023

@finlaydotb, closing this for now, as the above code runs as expected on my machine.

Feel free to re-open if the issue persists for you.

@roeap roeap closed this as completed Sep 23, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants