-
Notifications
You must be signed in to change notification settings - Fork 268
Error reading table after appending pyarrow table #1798
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
can you provide the code you use to do this? |
after investigation I found out it happens after I append pyarrow table without list field specified in schema as optional. Example table with schema from pyiceberg.catalog import load_catalog
catalog = load_catalog(**dict(type="in-memory"))
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, ListType
schema = Schema(
NestedField(field_id=1, name="name", field_type=StringType(), required=False),
NestedField(
field_id=3,
name="my_list",
field_type=ListType(
element_id=45, element=StringType(), element_required=False
),
required=False,
),
)
catalog.create_namespace_if_not_exists("test")
catalog.create_table_if_not_exists("test.table", schema) I append dataset with my_list: import pyarrow as pa
df_1 = pa.Table.from_pylist([
{"name": "one", "my_list": ["test"]},
{"name": "another", "my_list": ["test"]},
])
catalog.load_table("test.table").append(df_1) Read works catalog.load_table("test.table").scan().to_arrow() I append dataset without my_list: import pyarrow as pa
df_2 = pa.Table.from_pylist([
{"name": "one"},
{"name": "another"},
])
catalog.load_table("test.table").append(df_2) This time it won't work catalog.load_table("test.table").scan().to_arrow() it will throw
|
interesting, thanks for the code! I can reproduce the issue. heres a working version, note how the schema used for create table and append are all aligned # working
from pyiceberg.catalog import load_catalog
import pyarrow as pa
catalog = load_catalog(**dict(type="in-memory"))
df_1 = pa.Table.from_pylist([
{"name": "one", "my_list": ["test"]},
{"name": "another", "my_list": ["test"]},
])
pyarrow_schema = df_1.schema
# create table
catalog.create_namespace_if_not_exists("test")
catalog.create_table_if_not_exists("test.table", pyarrow_schema)
# append data
catalog.load_table("test.table").append(df_1)
catalog.load_table("test.table").scan().to_arrow()
# append more data
df_2 = pa.Table.from_pylist([
{"name": "one"},
{"name": "another"},
], schema=pyarrow_schema)
catalog.load_table("test.table").append(df_2)
catalog.load_table("test.table").scan().to_arrow() |
I suspect the issue is with the schema definition
or how we handle the schema conversion internally, between iceberg schema and pyarrow schema. For example, using the example iceberg schema provided, i get a schema mismatch
|
there's a bug somewhere in the schema translation between pyarrow schema and iceberg schema. Note the iceberg table schema, has an extra Output:
Reproduce: # schema difference
from pyiceberg.catalog import load_catalog
import pyarrow as pa
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, ListType
from pyiceberg.io.pyarrow import schema_to_pyarrow
catalog = load_catalog(**dict(type="in-memory"))
schema = Schema(
NestedField(field_id=1, name="name", field_type=StringType(), required=False),
NestedField(
field_id=3,
name="my_list",
field_type=ListType(
element_id=45, element=StringType(), element_required=False
),
required=False,
),
)
pyarrow_schema = schema_to_pyarrow(schema)
# create table
catalog.create_namespace_if_not_exists("test")
catalog.create_table_if_not_exists("test.table", pyarrow_schema)
# iceberg schema
catalog.load_table("test.table").schema()
# pyarrow to iceberg schema
from pyiceberg.io.pyarrow import pyarrow_to_schema
pyarrow_to_schema(pyarrow_schema, name_mapping=schema.name_mapping) |
ok heres a working version, which supplies a pyarrow schema to when creating the pyarrow table. The difference is the parquet from pyiceberg.catalog import load_catalog
catalog = load_catalog(**dict(type="in-memory"))
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, ListType
schema = Schema(
NestedField(field_id=1, name="name", field_type=StringType(), required=False),
NestedField(
field_id=3,
name="my_list",
field_type=ListType(
element_id=45, element=StringType(), element_required=False
),
required=False,
),
)
catalog.create_namespace_if_not_exists("test")
tbl = catalog.create_table_if_not_exists("test.table", schema)
import pyarrow as pa
df_1 = pa.Table.from_pylist([
{"name": "one", "my_list": ["test"]},
{"name": "another", "my_list": ["test"]},
], tbl.schema().as_arrow())
catalog.load_table("test.table").append(df_1)
catalog.load_table("test.table").scan().to_arrow()
import pyarrow as pa
df_2 = pa.Table.from_pylist([
{"name": "one"},
{"name": "another"},
], tbl.schema().as_arrow())
catalog.load_table("test.table").append(df_2)
catalog.load_table("test.table").scan().to_arrow() |
alternatively, this can be resolved by setting the table's name-mapping. This is interesting because But Spark ensure that it is set from pyiceberg.catalog import load_catalog
catalog = load_catalog(**dict(type="in-memory"))
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, ListType
from pyiceberg.table import TableProperties
schema = Schema(
NestedField(field_id=1, name="name", field_type=StringType(), required=False),
NestedField(
field_id=3,
name="my_list",
field_type=ListType(
element_id=45, element=StringType(), element_required=False
),
required=False,
),
)
catalog.create_namespace_if_not_exists("test")
tbl = catalog.create_table_if_not_exists("test.table", schema)
print(f"name-mapping: {tbl.metadata.name_mapping()}")
if tbl.metadata.name_mapping() is None:
with tbl.transaction() as txn:
txn.set_properties(
**{TableProperties.DEFAULT_NAME_MAPPING: tbl.metadata.schema().name_mapping.model_dump_json()}
)
print(f"name-mapping: {tbl.metadata.name_mapping()}")
import pyarrow as pa
df_1 = pa.Table.from_pylist([
{"name": "one", "my_list": ["test"]},
{"name": "another", "my_list": ["test"]},
])
catalog.load_table("test.table").append(df_1)
catalog.load_table("test.table").scan().to_arrow()
import pyarrow as pa
df_2 = pa.Table.from_pylist([
{"name": "one"},
{"name": "another"},
])
catalog.load_table("test.table").append(df_2)
catalog.load_table("test.table").scan().to_arrow() |
Fixes apache#1798
Fixes #1798 <!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change # Are these changes tested? # Are there any user-facing changes? <!-- In the case of user-facing changes, please add the changelog label. -->
Apache Iceberg version
None
Please describe the bug 🐞
Hi,
I have iceberg table created by pyiceberg and appended some data in pyarrow table format. When I try to read the table now I get error:
I have to covert my pyarrow table to pandas dataframe to make it work.
Willingness to contribute
The text was updated successfully, but these errors were encountered: