Skip to content

RestCatalog append table is slow (2+s) #1806

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
HungYangChang opened this issue Mar 18, 2025 · 3 comments
Open

RestCatalog append table is slow (2+s) #1806

HungYangChang opened this issue Mar 18, 2025 · 3 comments

Comments

@HungYangChang
Copy link

Question

Hello PyIceberg dev

I successfully set up lakekeeper as catalog and connect to ADLS gen 2 stoarge
I found out table.append takes a long time to finish (2+s)

Here is my question:

  1. Do we have other faster way to append table into Restcatalog?
  2. If not, I started logging the detailed time of append? Is there any way to speed up writing time?
    (PS: I already dug into the source code and see there is append and fast_append)

Image

Here is my code

from pyiceberg.catalog.rest import RestCatalog

catalog = RestCatalog(
            name=CATALOG_NAME,
            uri=CATALOG_URL,
            warehouse=CATALOG_WAREHOUSE_PATH,
            token=CATALOG_TOKEN,
            properties={
                "adlfs.account-name": AZURE_STORAGE_ACCOUNT_NAME,
                "adlfs.container": CONTAINER_NAME,
                "adlfs.client-id": AZURE_STORAGE_CLIENT_ID,
                "adlfs.tenant-id": AZURE_STORAGE_TENANT_ID,
                "adlfs.client-secret": AZURE_STORAGE_CLIENT_SECRET,
                "client_secret": AZURE_STORAGE_CLIENT_SECRET,
                "client_id": AZURE_STORAGE_CLIENT_ID,
                "tenant_id": AZURE_STORAGE_TENANT_ID,
                "io-impl": "pyiceberg.io.fsspec.FsspecFileIO",
            }
        )

# My table is ready 

load_start = time.time()
iceberg_table = catalog.load_table(iceberg_table_identifier)
load_end = time.time()

# Perform the append with optimized options
append_start = time.time()
# solution 1 (seems slow):
iceberg_table.append(table)

# # solution 2: Use a bulk transaction instead of a direct append
# # Fail with error...
# with iceberg_table.transaction() as txn:
#     txn.append(table)
#     txn.commit_transaction()
append_end = time.time()

Thanks for your help in advance :)

@HungYangChang
Copy link
Author

I did some dirty logging in pyiceberg.table.append

def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
        """
        Shorthand API for appending a PyArrow table to a table transaction.

        Args:
            df: The Arrow dataframe that will be appended to overwrite the table
            snapshot_properties: Custom properties to be added to the snapshot summary
        """
        start_append_time = time.time()
        try:
            import pyarrow as pa
        except ModuleNotFoundError as e:
            raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

        from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

        if not isinstance(df, pa.Table):
            raise ValueError(f"Expected PyArrow table, got: {df}")

        if unsupported_partitions := [
            field for field in self.table_metadata.spec().fields if not field.transform.supports_pyarrow_transform
        ]:
            raise ValueError(
                f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}."
            )
        downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
        _check_pyarrow_schema_compatible(
            self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
        )
        manifest_merge_enabled = property_as_bool(
            self.table_metadata.properties,
            TableProperties.MANIFEST_MERGE_ENABLED,
            TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
        )
        update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties)
        append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append

        logging.info(append_method)
        end_time = time.time()
        logging.info(f"set up {end_time - start_append_time:.3f} seconds")


        with append_method() as append_files:
            # skip writing data files if the dataframe is empty
            if df.shape[0] > 0:
                start_time = time.time()
                data_files = _dataframe_to_data_files(
                    table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
                )
                end_time = time.time()
                logging.info(f"_dataframe_to_data_files  {end_time - start_time:.3f} seconds")
                start_time = time.time()
                for data_file in data_files:
                    append_files.append_data_file(data_file)
                end_time = time.time()
                logging.info(f"append_data_file {end_time - start_time:.3f} seconds")
        end_append_time = time.time()
        logging.info(f"append_data_file {end_append_time - start_append_time:.3f} seconds")

Here is the result I got:

[2025-03-18T18:35:19.587Z] set up 0.018 seconds
[2025-03-18T18:35:19.605Z] _dataframe_to_data_files 0.000 seconds
[2025-03-18T18:35:20.342Z] append_data_file 0.838 seconds
[2025-03-18T18:35:21.799Z] append_data_file 2.333 seconds
[2025-03-18T18:35:22.413Z] Table append operation took 2.950 seconds
[2025-03-18T18:35:22.483Z] Successfully appended data to table: inboundrequesteventv2 in 3.393 seconds
[2025-03-18T18:35:22.505Z] Wrote to Iceberg in 3.395 seconds
[2025-03-18T18:35:22.516Z] Total processing time: 3.398 seconds

@sungwy
Copy link
Collaborator

sungwy commented Mar 19, 2025

Hi @HungYangChang - thanks for posting the logs!

A couple of things to unpack here: _dataframe_to_data_files produces an Iterator, which means that the task of actually writing the parquet files isn't done when you log the 0.000 seconds output.

def _dataframe_to_data_files(
table_metadata: TableMetadata,
df: pa.Table,
io: FileIO,
write_uuid: Optional[uuid.UUID] = None,
counter: Optional[itertools.count[int]] = None,
) -> Iterable[DataFile]:

Instead, it writes the parquet files when the iterator's elements are appended to append_files, which in your logs is at this point in time:

[2025-03-18T18:35:20.342Z] append_data_file 0.838 seconds

From my observation of the logs, your commit does seem to be taking:

[2025-03-18T18:35:22.413Z] Table append operation took 2.950 seconds

Do you have access to the Lakekeeper logs that gives your information on how long it take for the Rest Catalog to process the commit request? Once it accepts the commit request, the Rest Catalog must write the metadata on its end and then return an HTTP response back to PyIceberg. It would be good to compare this number against the request->response wall time Lakekeeper is reporting for your specific commit request

@corleyma
Copy link

I wonder if @c-thiel has any thoughts about the best way to profile this from the Lakekeeper side? My guess is enable tracing logs?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants