-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data] Handle nullable fields in schema across blocks for parquet files #48478
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
High level approach LGTM
@@ -75,10 +75,12 @@ def write( | |||
|
|||
def write_blocks_to_path(): | |||
with self.open_output_stream(write_path) as file: | |||
schema = BlockAccessor.for_block(blocks[0]).to_arrow().schema | |||
tables = [BlockAccessor.for_block(block).to_arrow() for block in blocks] | |||
schema = self._try_merge_nullable_fields(tables) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than introducing a new method, we could extend the existing unify_schemas
function:
def unify_schemas( |
if not table.schema.equals(schema): | ||
table = table.cast(schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if we don't explicitly cast the tables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The table would still have a mismatch schema. i.e.
table.schema.equals(schema)
in this case would still be false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha. Wasn't sure if PyArrow would implicitly cast tables to match the specified schema under-the-hood
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it doesn't do the casting because there's check on the schema equality here:
https://github.com/apache/arrow/blob/main/python/pyarrow/parquet/core.py#L1110-L1114
Updates:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
||
if OperatorFusionRule in _PHYSICAL_RULES: | ||
_PHYSICAL_RULES.remove(OperatorFusionRule) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a note why we're removing operator fusion here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, why are we removing fusion?
We'd not need to do that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we don't do this - I think there will be only 1 block somehow. So the repro here didn't work, I guess we would need some other examples/repros if we don't remove the rule?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, i see what you're saying.
Surely we can disable operator fusion, but that should be done t/h configuration not "physically" removing the rule from the list (just add config to DataContext
disabling it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just add config to DataContext disabling it
@alexeykudinkin do you envision us adding a config for each optimization rule, or special-case operator fusion?
In any case, adding an interface for disabling optimization rules seems orthogonal to the goal of this PR, and can probably be handled as a follow-up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rickyyx no need to block this PR on this, let's just reshape your test a bit:
- Instead of using
ray.data.range
as source, create 2 parquet files -- 1 without nulls, another with nulls - Read both of these and then write out as single one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexeykudinkin I might be missing something here, but I am not sure how I can force writing the 2 files with a single block w/o disabling the operator fusing.
Something like below still only writes to the file with a single block (so there's technically no schema unification needed)
# Write each row to a separate file.
for i, row in enumerate(row_data):
ray.data.from_pandas(pd.DataFrame([row])).write_parquet(
os.path.join(tmp_path, f"file_{i}.parquet")
)
# Read files and merge into a single file shouldn't error.
ray.data.read_parquet(tmp_path).write_parquet(tmp_path, num_rows_per_file=2)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using ray.data.range as source, create 2 parquet files -- 1 without nulls, another with nulls
Read both of these and then write out as single one
I don't think this'd reproduce the error. IIRC Ray Data will read both files in a single task, and then BlockOutputBuffer
will combine the read data into a single block before passing it to the datasink
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that might require some fidgeting to make it work.
Alternative path is to specify num_cpus
which should make them diverge and hence avoid fusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was able to force with by changing the target_max_block_size
if that's a better approach.
[ | ||
[{"a": 1, "b": None}, {"a": 1, "b": 2}], | ||
[{"a": None, "b": None}, {"a": 1, "b": 2}], | ||
[{"a": 1, "b": 2}, {"a": 1, "b": "hi"}], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does the type get promoted to for "b" in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh - this shouldn't pass actually. it was somehow passing without remove the fusion.
|
||
if OperatorFusionRule in _PHYSICAL_RULES: | ||
_PHYSICAL_RULES.remove(OperatorFusionRule) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, why are we removing fusion?
We'd not need to do that
Updates
|
], | ||
ids=["row1_b_null", "row1_a_null", "row_each_null"], | ||
) | ||
def test_write_auto_infer_nullable_fields(tmp_path, ray_start_regular_shared, row_data): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the restore_data_context
fixture so that changes aren't persisted across tests
def test_write_auto_infer_nullable_fields(tmp_path, ray_start_regular_shared, row_data): | |
def test_write_auto_infer_nullable_fields(tmp_path, ray_start_regular_shared, row_data, restore_data_context): |
ctx = DataContext.get_current() | ||
# So that we force multiple blocks on mapping. | ||
ctx.target_max_block_size = 1 | ||
ds = ray.data.range(len(row_data)).map(lambda i: row_data[i["id"]]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: The name i
makes me think that i
is an int
(index).
ds = ray.data.range(len(row_data)).map(lambda i: row_data[i["id"]]) | |
ds = ray.data.range(len(row_data)).map(lambda row: row_data[row["id"]]) |
(Feel free to keep it as-is, too)
…es (ray-project#48478) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? When writing blocks to parquet, there might be blocks with fields that differ ONLY in nullability - by default, this would be rejected since some blocks might have a different schema than the ParquetWriter. However, we could potentially allow it to happen by tweaking the schema. This PR goes through all blocks before writing them to parquet, and merge schemas that differ only in nullability of the fields. It also casts the table to the newly merged schema so that the write could happen. <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number Closes ray-project#48102 --------- Signed-off-by: rickyx <rickyx@anyscale.com>
…es (ray-project#48478) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? When writing blocks to parquet, there might be blocks with fields that differ ONLY in nullability - by default, this would be rejected since some blocks might have a different schema than the ParquetWriter. However, we could potentially allow it to happen by tweaking the schema. This PR goes through all blocks before writing them to parquet, and merge schemas that differ only in nullability of the fields. It also casts the table to the newly merged schema so that the write could happen. <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number Closes ray-project#48102 --------- Signed-off-by: rickyx <rickyx@anyscale.com> Signed-off-by: mohitjain2504 <mohit.jain@dream11.com>
…es (ray-project#48478) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? When writing blocks to parquet, there might be blocks with fields that differ ONLY in nullability - by default, this would be rejected since some blocks might have a different schema than the ParquetWriter. However, we could potentially allow it to happen by tweaking the schema. This PR goes through all blocks before writing them to parquet, and merge schemas that differ only in nullability of the fields. It also casts the table to the newly merged schema so that the write could happen. <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number Closes ray-project#48102 --------- Signed-off-by: rickyx <rickyx@anyscale.com> Signed-off-by: hjiang <dentinyhao@gmail.com>
Why are these changes needed?
When writing blocks to parquet, there might be blocks with fields that differ ONLY in nullability - by default, this would be rejected since some blocks might have a different schema than the ParquetWriter. However, we could potentially allow it to happen by tweaking the schema.
This PR goes through all blocks before writing them to parquet, and merge schemas that differ only in nullability of the fields.
It also casts the table to the newly merged schema so that the write could happen.
Related issue number
Closes #48102
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.