diff --git a/python/python/lance/ray/sink.py b/python/python/lance/ray/sink.py index debf956ee9..ea3b92e715 100644 --- a/python/python/lance/ray/sink.py +++ b/python/python/lance/ray/sink.py @@ -137,6 +137,10 @@ def on_write_complete( fragment = pickle.loads(fragment_str) fragments.append(fragment) schema = pickle.loads(schema_str) + # Check weather writer has fragments or not. + # Skip commit when there are no fragments. + if not schema: + return if self.mode in set(["create", "overwrite"]): op = lance.LanceOperation.Overwrite(schema, fragments) elif self.mode == "append": @@ -368,6 +372,10 @@ def write( """Passthrough the fragments to commit phase""" v = [] for block in blocks: + # If block is empty, skip to get "fragment" and "schema" filed + if len(block) == 0: + continue + for fragment, schema in zip( block["fragment"].to_pylist(), block["schema"].to_pylist() ): diff --git a/python/python/tests/test_ray.py b/python/python/tests/test_ray.py index babf1a14dc..03f9253ae6 100644 --- a/python/python/tests/test_ray.py +++ b/python/python/tests/test_ray.py @@ -101,3 +101,18 @@ def test_ray_write_lance(tmp_path: Path): tbl = ds.to_table() assert sorted(tbl["id"].to_pylist()) == list(range(10)) assert set(tbl["str"].to_pylist()) == set([f"str-{i}" for i in range(10)]) + + +@pytest.mark.filterwarnings("ignore::DeprecationWarning") +def test_ray_empty_write_lance(tmp_path: Path): + schema = pa.schema([pa.field("id", pa.int64()), pa.field("str", pa.string())]) + + ( + ray.data.range(10) + .filter((lambda row: row["id"] > 10)) + .map(lambda x: {"id": x["id"], "str": f"str-{x['id']}"}) + .write_lance(tmp_path, schema=schema) + ) + # empty write would not generate dataset. + with pytest.raises(ValueError): + lance.dataset(tmp_path)