-
Notifications
You must be signed in to change notification settings - Fork 7.1k
[Data] [1/n] - Iceberg Schema Evolution #59210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] [1/n] - Iceberg Schema Evolution #59210
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant improvement to the Iceberg datasink by adding support for schema evolution. The changes are well-structured, and the refactoring of IcebergDatasink makes it more robust and easier to understand. The addition of comprehensive tests for schema evolution is also a great contribution.
I have a couple of suggestions for minor optimizations in the on_write_complete method to improve efficiency by reducing redundant operations. Overall, this is an excellent pull request.
| """ | ||
| Update the table schema to accommodate incoming data using union-by-name semantics. | ||
| property_as_bool = PropertyUtil.property_as_bool | ||
| This is called from the driver after reconciling all schemas. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make it clear that this only can be called from the driver
(Also think about how we can assert that it's only called from the driver)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is_driver = ray.get_runtime_context().worker.mode != WORKER_MODE should work
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
bb2f196 to
ac28c03
Compare
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Callback skipped for small datasets in all_inputs_done
The _on_first_input_callback is only invoked in _add_input_inner() but not in all_inputs_done(). When processing small datasets where all bundles don't meet the min_rows_per_bundle threshold during normal processing, the bundles are deferred to all_inputs_done(), which calls _add_bundled_input() directly without invoking the callback. For IcebergDatasink, this means on_write_start() (which handles schema evolution) is never called for small datasets, potentially causing write failures when incoming data has new columns.
python/ray/data/_internal/execution/operators/map_operator.py#L566-L576
ray/python/ray/data/_internal/execution/operators/map_operator.py
Lines 566 to 576 in 318f40c
| def all_inputs_done(self): | |
| self._block_ref_bundler.done_adding_bundles() | |
| if self._block_ref_bundler.has_bundle(): | |
| # Handle any leftover bundles in the bundler. | |
| ( | |
| _, | |
| bundled_input, | |
| ) = self._block_ref_bundler.get_next_bundle() | |
| self._add_bundled_input(bundled_input) | |
| super().all_inputs_done() |
python/ray/data/_internal/planner/plan_write_op.py#L136-L145
ray/python/ray/data/_internal/planner/plan_write_op.py
Lines 136 to 145 in 318f40c
| if not isinstance(datasink, _FileDatasink): | |
| if isinstance(datasink, IcebergDatasink): | |
| # Iceberg needs the schema for schema evolution, use deferred callback | |
| def on_first_input(bundle: RefBundle): | |
| schema: Optional["pa.Schema"] = _get_pyarrow_schema_from_bundle( | |
| bundle | |
| ) | |
| datasink.on_write_start(schema) | |
| map_op.set_on_first_input_callback(on_first_input) |
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
alexeykudinkin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, minor comments
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
|
/gemini summary |
Summary of ChangesThis pull request significantly enhances Ray Data's integration with Apache Iceberg by upgrading the underlying Pyiceberg library and introducing robust schema evolution capabilities. This allows users to append data to Iceberg tables with varying schemas, as the system will automatically adapt the table's schema to accommodate new columns and promote types as needed, simplifying data ingestion workflows. Highlights
Changelog
Activity
|
Signed-off-by: Goutam <goutam@anyscale.com>
| def _update_schema(self, incoming_schema: "pa.Schema") -> None: | ||
| """ | ||
| Update the table schema to accommodate incoming data using union-by-name semantics. | ||
| property_as_bool = PropertyUtil.property_as_bool | ||
| .. warning:: | ||
| This method must only be called from the driver process. | ||
| It performs schema evolution which requires exclusive table access. | ||
| catalog = self._get_catalog() | ||
| table = catalog.load_table(self.table_identifier) | ||
| self._txn = table.transaction() | ||
| self._io = self._txn._table.io | ||
| self._table_metadata = self._txn.table_metadata | ||
| self._uuid = uuid.uuid4() | ||
|
|
||
| if unsupported_partitions := [ | ||
| field | ||
| for field in self._table_metadata.spec().fields | ||
| if not field.transform.supports_pyarrow_transform | ||
| ]: | ||
| raise ValueError( | ||
| f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}." | ||
| ) | ||
|
|
||
| self._manifest_merge_enabled = property_as_bool( | ||
| self._table_metadata.properties, | ||
| TableProperties.MANIFEST_MERGE_ENABLED, | ||
| TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, | ||
| ) | ||
| Args: | ||
| incoming_schema: The PyArrow schema to merge with the table schema | ||
| """ | ||
| with self._table.update_schema() as update: | ||
| update.union_by_name(incoming_schema) | ||
| # Succeeded, reload to get latest table version and exit. | ||
| self._reload_table() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used in 1 place, let's inline
| # Reload table to get latest metadata | ||
| self._reload_table() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to reload?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can remove this.
| ) | ||
| assert rows_same(result_df, expected) | ||
|
|
||
| def test_multiple_schema_evolutions(self, clean_table): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a test promoting type (as separate test)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added test_schema_evolution_type_promotion
Signed-off-by: Goutam <goutam@anyscale.com>
Description
Related issues
Additional information