Skip to content

Assertion failed with many concurrent writers on GCS #1356

@wjones127

Description

@wjones127

Error and backtrace:

thread '<unnamed>' panicked at 'assertion failed: buf.len() >= msg_len + 4', /home/runner/work/lance/lance/rust/lance/src/io/object_reader.rs:121:9
stack backtrace:
   0: rust_begin_unwind
             at /rustc/d5c2e9c342b358556da91d61ed4133f6f50fc0c3/library/std/src/panicking.rs:593:5
   1: core::panicking::panic_fmt
             at /rustc/d5c2e9c342b358556da91d61ed4133f6f50fc0c3/library/core/src/panicking.rs:67:14
   2: core::panicking::panic
             at /rustc/d5c2e9c342b358556da91d61ed4133f6f50fc0c3/library/core/src/panicking.rs:117:5
   3: lance::io::object_reader::read_struct::{{closure}}
   4: lance::dataset::Dataset::checkout_manifest::{{closure}}
   5: lance::dataset::Dataset::open_with_params::{{closure}}
   6: lance::dataset::Dataset::write_impl::{{closure}}::{{closure}}
   7: lance::dataset::Dataset::write_impl::{{closure}}
   8: lance::dataset::Dataset::write::{{closure}}
   9: lance::dataset::write_dataset
  10: lance::dataset::_::__pyfunction_write_dataset
  11: pyo3::impl_::trampoline::trampoline
  12: lance::dataset::_::<impl lance::dataset::write_dataset::MakeDef>::DEF::trampoline
Reproduction

# python test.py gs://lance-performance-testing/concurrent_write
import lance
import pyarrow as pa
from concurrent.futures import ThreadPoolExecutor
import fire
import os

N_PROCESSES = 50
N_ITERS = 10

def do_write(uri: str):
    tab =  pa.table({
        'i': pa.array(range(10)),
    })
    return lance.write_dataset(tab, uri, mode="append")

def write_in_loop(uri: str):
    for i in range(N_ITERS):
        try:
            do_write(uri)
        except Exception as e:
            if 'Commit conflict for version' in str(e):
                pass
            else:
                raise e

def main(uri: str):
    # One initial to establish the table
    dataset = do_write(uri)

    pool = ThreadPoolExecutor(max_workers=N_PROCESSES)
    results = list(pool.map(write_in_loop, [uri] * N_PROCESSES))
    print(results)

if __name__ == "__main__":
    fire.Fire(main)

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions