Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply Name mapping #219

Merged
merged 71 commits into from
Jan 19, 2024
Merged

Apply Name mapping #219

merged 71 commits into from
Jan 19, 2024

Conversation

sungwy
Copy link
Collaborator

@sungwy sungwy commented Dec 17, 2023

Closes: #202

Based on the following two working branches from @Fokko :

  1. Name-mapping plumbing: Add name-mapping #212
  2. Allow missing field-ids from schema: Arrow: Allow missing field-ids from Schema #183

This PR adds _ApplyNameMapping SchemaVisitor that traverses the pyarrow schema and applies the provided name_mapping.
The preference order to pyarrow_to_schema function is:

  1. Use field_ids in file_schema
  2. Use name_mapping (if exists)
  3. Fallback to file column order if neither of above two works

Above order is motivated by the current logic in Spark Iceberg Parquet Read Conf

TODO:

  • Read and use table property ''schema.name-mapping.default'
  • A lot more unit test cases to cover edge cases, like: field_id -1
  • Get more context on identifier_field_ids: should they be ignored when field_ids aren't set in the file_schema?

EDIT: I've added a new utility function new_schema_for_table that can be used by PyIceberg users who want to generate a new PyIceberg Schema from a Arrow Schema to help create a new Iceberg Table.

Fokko and others added 11 commits December 5, 2023 13:43
All the things to (de)serialize the name-mapping, and
all the neccessary visitors and such
Bumps [mypy-boto3-glue](https://github.com/youtype/mypy_boto3_builder) from 1.33.5 to 1.34.0.
- [Release notes](https://github.com/youtype/mypy_boto3_builder/releases)
- [Commits](https://github.com/youtype/mypy_boto3_builder/commits)

---
updated-dependencies:
- dependency-name: mypy-boto3-glue
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
@rdblue
Copy link
Contributor

rdblue commented Dec 17, 2023

@syun64, the "fallback to file column order if neither of above two works" is unsafe and cannot be added to the Python implementation. There is some code in the Java implementation that does it but that is specifically for Netflix and dates from when the code was first donated to the ASF.

ID inference by position does not work with nested fields and can easily cause jobs to fail or produce incorrect data. We are in the process of removing it in Java so we don't want to add it here. It was also only ever supported for Parquet, so this would actually expand the problem.

@sungwy
Copy link
Collaborator Author

sungwy commented Dec 18, 2023

Thank you for the context @rdblue . I will remove the fallback logic from pyarrow_to_schema and ignore setting identifier_field_ids property in _ApplyNameMapping

sungwy and others added 11 commits December 18, 2023 22:50
Bumps [pyarrow](https://github.com/apache/arrow) from 14.0.1 to 14.0.2.
- [Commits](apache/arrow@go/v14.0.1...apache-arrow-14.0.2)

---
updated-dependencies:
- dependency-name: pyarrow
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [moto](https://github.com/getmoto/moto) from 4.2.11 to 4.2.12.
- [Release notes](https://github.com/getmoto/moto/releases)
- [Changelog](https://github.com/getmoto/moto/blob/master/CHANGELOG.md)
- [Commits](getmoto/moto@4.2.11...4.2.12)

---
updated-dependencies:
- dependency-name: moto
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
@sungwy sungwy marked this pull request as ready for review December 19, 2023 03:20
@sungwy
Copy link
Collaborator Author

sungwy commented Dec 19, 2023

Hi folks, I've rebased the branch from main to reflect the changes in Add name-mapping, and added some negative test cases.

I have some questions:

  1. The current proposed approach assumes that if any of the fields present in the pyarrow schema does not have a field ID, it has missing field IDs, and hence would require the table name mapping property to be set. Conversely, the version of ConvertToIceberg visitor on main simply ignores any fields that do not have field_ids in its metadata, and hence is able to read files with partially missing field_ids. Does the proposed behavior sound correct?
  2. In order to achieve above, I am creating NestedFields with field_id=-1 (since there is an int_type constraint on NestedField. Although this feels arbitrary, we could argue that this is reasonable since '-1' is also Parquet's choice of arbitrary int when representing missing field_ids. Example:
<pyarrow._parquet.ParquetSchema object at 0x7f72b317fec0>
required group field_id=-1 schema {
  optional double field_id=-1 one;
  optional binary field_id=-1 two (String);
  optional boolean field_id=-1 three;
  optional binary field_id=-1 __index_level_0__ (String);
}

Does this sound reasonable?

@Fokko
Copy link
Contributor

Fokko commented Dec 19, 2023

@syun64 thanks again for working on this. When there are no IDs and there is no field-mapping. I think we should fall back to assigning IDs, similar that we do in Java: https://github.com/apache/iceberg/blob/838787e296b502740470ce70f68bb27af4210121/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L225

I see in Java it is a bit more separated with a hasIds visitor, which is nice.

This is also what I proposed in #183 Thoughts @rdblue?

@sungwy
Copy link
Collaborator Author

sungwy commented Dec 19, 2023

I've discussed with @Fokko offline regarding how we'd like to handle the edge cases, and here's the summary of the logic that I've implemented in the current version that follows that discussion:

  1. HasIds first checks if ANY of the field IDs are missing in the file.
  2. If HasIds, ConvertToIceberg without name mapping. If any of the fields doesn't have a Type, we throw a ValueError
  3. Else if NameMapping is specified on the table property, ConvertToIceberg with name mapping.
  4. If any of the field IDs present in the file are missing from the NameMapping, ValueError is thrown
  5. If table does not have IDs (1) and name_mapping is also not provided, we throw a ValueError with a specific exception message: https://github.com/apache/iceberg-python/pull/219/files#diff-8d5e63f2a87ead8cebe2fd8ac5dcf2198d229f01e16bb9e06e21f7277c328abdR634

dependabot bot and others added 14 commits January 13, 2024 03:36
Bumps [jinja2](https://github.com/pallets/jinja) from 3.1.2 to 3.1.3.
- [Release notes](https://github.com/pallets/jinja/releases)
- [Changelog](https://github.com/pallets/jinja/blob/main/CHANGES.rst)
- [Commits](pallets/jinja@3.1.2...3.1.3)

---
updated-dependencies:
- dependency-name: jinja2
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
'PARQUET:' prefix is specific to Parquet, with 'PARQUET:field_id' setting the 'field_id'. Removed the non-prefixed alternative for 'field_id'. Removed the prefixed alternative for 'doc'.
All the things to (de)serialize the name-mapping, and
all the neccessary visitors and such
@sungwy sungwy changed the title Apply Name mapping Apply Name mapping, new_schema_for_table Jan 14, 2024
self._field_names.pop()


class _ConvertToIcebergWithFreshIds(PreOrderPyArrowSchemaVisitor[Union[IcebergType, Schema]]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A visitor should do one thing, and do it well. WDYT of returning to the original ID of setting it as -1 and then assigning fresh IDs using the existing visitor? We're duplicating a lot of code here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense @Fokko . Should we have a boolean attribute to drive this logic in _ConvertToIceberg? I think as @rdblue pointed out in this comment, falling back to assigning fresh IDs as a general fallback sounds like a dangerous behavior that we would want to avoid. Having a boolean attribute will make sure that we are only using this feature when creating a new table, instead of avoid falling back when there are no IDs in existing Iceberg tables.

Should we have a boolean flag _ignore_ids which if True makes _ConvertToIceberg ignore the existing ID assigning process, and instead assigns all IDs as -1?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why I was suprised to see the option in the Java API to convert a Spark dataframe to an Iceberg schema with the IDs set based on the position. Ideally, we want to have the ability to inject a name-mapping in the _ConvertToIceberg visitor so we don't rely on positions, but on names.

Copy link
Collaborator Author

@sungwy sungwy Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko right - but that sounds a bit like a chicken and egg problem... because in order to create NameMapping, we need to have a PyIceberg Schema as well. Unless we want to create a new visitor NameMappingVisitorFromArrowSchema to create NameMapping from ArrowSchema, I think the current approach of assigning IDs directly, through ArrowToSchema visitor sounds more straight forward

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko @HonahX I don't think we reached a conclusion here, so I just wanted to confirm that we were on the same page regarding the final step:

Should we have a boolean flag _ignore_ids which if True makes _ConvertToIceberg ignore the existing ID assigning process, and instead assigns all IDs as -1?

Does this sound like the best way to support this function? We first assign the IDs as -1, using _ignore_ids=True on _ConvertToIceberg, and then we use _SetFreshIDs on the PyIceberg Schema to generate the field IDs using pre-order traversal?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply. @syun64 Overall I think it is an effective way to reduce code duplication and achieve the same utility. I want to add my findings while trying this approach:

  1. Assigning everything as -1 and then use assign_fresh_schema_ids seems not working out of box. It is because in _SetFreshIds, we use

    def struct(self, struct: StructType, field_results: List[Callable[[], IcebergType]]) -> StructType:
    # assign IDs for this struct's fields first
    self.reserved_ids.update({field.field_id: self._get_and_increment() for field in struct.fields})
    return StructType(*[field() for field in field_results])

    a dictionary to keep track of assigned Ids. If our current schema has -1 for everything, the resulting new schema will also has the same id for all the fields in the same level. To resolve this, we need to either update _SetFreshIds visitor or assign distinct ids(-1, -2, ...) when converting pyarrow schema.

  2. I am thinking that if we can still have separate visitors for normal read and ignore_ids case, by extracting common logic to a parent class. Based on my observation, schema, struct, primitive, and field are the same for both cases. If we refactor it to:

class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]], ABC):
  ...

class _ConvertToIcebergWithFieldIds(_ConvertToIceberg):
  ...

class _ConvertToIcebergWithoutIds(_ConvertToIceberg):

we only need to implement list and map for both visitors, accompanied with their distinct ways to get field ids.

In this way, we can still separate the two use cases while maintaining low code duplications. However, this does undermine code readability because we split the visitor logic into two places.

I am raising this primarily because a separate and properly named visitor might emphasize that this is a special case for special usage, so it will not confuse others who inspect these codes for reference. @syun64 @Fokko What do you think of this approach, compared with _ignore_ids boolean flag suggested by @syun64 (which has the least code duplication and is simpler to read).

(I have a draft implementation for 2 in my own repo: https://github.com/HonahX/iceberg-python/blob/de14f5cb8d91a26541356dd3c614bee8c4b8cb8c/pyiceberg/io/pyarrow.py#L805)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, shall we consider moving new_schema_for_table to a separate PR? I think the Apply Name mapping part already contains lots of important changes and is good to go.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me share your concern. Currently, we have an API like:

tbl.write(df: pa.Table)

I would say at some point we get something like:

tbl.write(df: pa.Table, merge_schema=True)  # actual name TBD, could also be a property

Assuming that the Arrow dataframe doesn't have a schema, we'll use name mapping to set the names and convert it to an Iceberg schema, and that's all safe. So we need to have the ability to apply name-mapping on a PyArrow schema.

It gets dangerous when people start doing:

new_schema = new_schema_for_table(df: pa.Table)
with tbl.update_schema() as update:
    update.union_with_schema(new_schema)

Which seems reasonable to do if you're new to Iceberg. This is the Java equivalent of UnionByNameVisitor. If you have something like:

1: name (str)
2: age (int)

And you add a field:

1: name (str)
2: phonenumber (int)
3: age (int)

Then age will be renamed to phonenumber and a new field with age will be added. Therefore we want to hide this behind an API like we're doing when creating a new table.

I think that @HonahX made a good point about the _SetFreshIds visitor. Interestingly enough, the implementation on the Java side is also different where it does a lookup on the full column name of the baseSchema. This baseSchema is null when creating a new table.

I think the problem here is that we don't have an API like in Spark where we can nicely hide things. I'm almost tempted to allow creating a table from a PyArrow table create_table_from_table(df: pa.Table), but that mixes in PyArrow into the main API, but refrains us from exposing these things to the user (which isn't super user friendly in general). WDYT @syun64 @HonahX ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem here is that we don't have an API like in Spark where we can nicely hide things. I'm almost tempted to allow creating a table from a PyArrow table create_table_from_table(df: pa.Table), but that mixes in PyArrow into the main API, but refrains us from exposing these things to the user (which isn't super user friendly in general). WDYT @syun64 @HonahX ?

I'm in agreement with this idea. I see three ways a user would want to create an Iceberg table:

  1. Completely manual - by specifying the schema, field by field
  2. By inferring the schema from an existing strongly-typed file or pyarrow table
  3. By copying the schema of an existing iceberg table (migration)

Since we are only concerned with the schema, and not the data: what are your thoughts in using the pyarrow schema (instead of pyarrow table) as the input for this function?

Assuming that the Arrow dataframe doesn't have a schema, we'll use name mapping to set the names and convert it to an Iceberg schema, and that's all safe. So we need to have the ability to apply name-mapping on a PyArrow schema.

Sounds good @Fokko . Since this PR already introduces the ability to apply name-mapping onto a PyArrow Schema and create a pyiceberg.Schema, if this is the approach we'd like to take, we would need the ability to generate name-mapping from a PyArrow Schema with no IDs. This is different from existing _CreateMapping which creates name mapping based on an existing pyiceberg Schema which already have IDs assigned.

class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]], ABC):
...
class _ConvertToIcebergWithFieldIds(_ConvertToIceberg):
...
class _ConvertToIcebergWithoutIds(_ConvertToIceberg):

One thing I wanted to note, is that the task of assigning fresh IDs to a schema needs to be a pre-order visitor, instead of post-order like _ConvertToIceberg or _CreateMapping. This ensures that the field_id is assigned to the field before they are assigned to the element, key or values. I think that would prevent us from having the two visitors inherit from the same parent class.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, shall we consider moving new_schema_for_table to a separate PR? I think the Apply Name mapping part already contains lots of important changes and is good to go.

Great suggestion @HonahX - at first I thought it would be a small lift in this PR to add it in, but it seems clear that there's a lot more to be discussed on the topic. I've opened this issue, and I'll reduce the scope of this PR to just Apply Name Mapping.

Let's continue the discussion on the dedicated issue for the topic.

Co-authored-by: Fokko Driesprong <fokko@apache.org>
@sungwy sungwy changed the title Apply Name mapping, new_schema_for_table Apply Name mapping Jan 18, 2024
@sungwy sungwy requested review from Fokko and HonahX January 18, 2024 21:12
@HonahX
Copy link
Contributor

HonahX commented Jan 18, 2024

Thanks for the great work!

@sungwy
Copy link
Collaborator Author

sungwy commented Jan 18, 2024

Need help merging this in as well :)

@HonahX
Copy link
Contributor

HonahX commented Jan 19, 2024

All reviews related to "Apply Name Mapping" are resolved. Let's get this in and continue our discussion in #278 😊. Thanks @syun64

@HonahX HonahX merged commit 70972d9 into apache:main Jan 19, 2024
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support 'schema.name-mapping.default' Column Projection property
8 participants