diff --git a/website/docs/streaming-lakehouse/integrate-data-lakes/paimon.md b/website/docs/streaming-lakehouse/integrate-data-lakes/paimon.md index f7a166bf5b..35fe2d4b2f 100644 --- a/website/docs/streaming-lakehouse/integrate-data-lakes/paimon.md +++ b/website/docs/streaming-lakehouse/integrate-data-lakes/paimon.md @@ -21,67 +21,102 @@ sidebar_position: 1 # Paimon -[Apache Paimon](https://paimon.apache.org/) innovatively combines lake format and LSM structure, bringing efficient updates into the lake architecture. -To integrate Fluss with Paimon, you must enable lakehouse storage and configure Paimon as lakehouse storage. See more detail about [Enable Lakehouse Storage](maintenance/tiered-storage/lakehouse-storage.md#enable-lakehouse-storage). +[Apache Paimon](https://paimon.apache.org/) innovatively combines a lake format with an LSM (Log-Structured Merge-tree) structure, bringing efficient updates into the lake architecture . +To integrate Fluss with Paimon, you must enable lakehouse storage and configure Paimon as the lakehouse storage. For more details, see [Enable Lakehouse Storage](maintenance/tiered-storage/lakehouse-storage.md#enable-lakehouse-storage). ## Introduction -When a table with option `'table.datalake.enabled' = 'true'` is created or altered in Fluss, Fluss will create a corresponding Paimon table with same table path as well. -The schema of the Paimon table is as same as the schema of the Fluss table, except for there are two extra columns `__offset` and `__timestamp` appended to the last. -These two columns are used to help Fluss client to consume the data in Paimon in streaming way like seek by offset/timestamp, etc. -Then datalake tiering service compacts the data from Fluss to Paimon continuously. For primary key table, it will also generate change log in Paimon format which -enables you streaming consume it in Paimon way. +When a table with the option `'table.datalake.enabled' = 'true'` is created or altered in Fluss, Fluss will automatically create a corresponding Paimon table with the same table path . +The schema of the Paimon table matches that of the Fluss table, except for the addition of three system columns at the end: `__bucket`, `__offset`, and `__timestamp`. +These system columns help Fluss clients consume data from Paimon in a streaming fashion—such as seeking by a specific bucket using an offset or timestamp. -## Read tables +```sql title="Flink SQL" +USE CATALOG fluss_catalog; + +CREATE TABLE fluss_order_with_lake ( + `order_key` BIGINT, + `cust_key` INT NOT NULL, + `total_price` DECIMAL(15, 2), + `order_date` DATE, + `order_priority` STRING, + `clerk` STRING, + `ptime` AS PROCTIME(), + PRIMARY KEY (`order_key`) NOT ENFORCED + ) WITH ( + 'table.datalake.enabled' = 'true', + 'table.datalake.freshness' = '30s'); +``` + +Then, the datalake tiering service continuously tiers data from Fluss to Paimon. The parameter `table.datalake.freshness` controls how soon data written to Fluss should be tiered to Paimon—by default, this delay is 3 minutes. +For primary key tables, change logs are also generated in Paimon format, enabling stream-based consumption via Paimon APIs. + +Since Fluss version 0.7, you can also specify Paimon table properties when creating a datalake-enabled Fluss table by using the `paimon.` prefix within the Fluss table properties clause. + +```sql title="Flink SQL" +CREATE TABLE fluss_order_with_lake ( + `order_key` BIGINT, + `cust_key` INT NOT NULL, + `total_price` DECIMAL(15, 2), + `order_date` DATE, + `order_priority` STRING, + `clerk` STRING, + `ptime` AS PROCTIME(), + PRIMARY KEY (`order_key`) NOT ENFORCED + ) WITH ( + 'table.datalake.enabled' = 'true', + 'table.datalake.freshness' = '30s', + 'paimon.file.format' = 'orc', + 'paimon.deletion-vectors.enabled' = 'true'); +``` + +For example, you can specify the Paimon property `file.format` to change the file format of the Paimon table, or set `deletion-vectors.enabled` to enable or disable deletion vectors for the Paimon table. + +## Read Tables ### Read by Flink -For the table with option `'table.datalake.enabled' = 'true'`, there are two part of data: the data remains in Fluss and the data already in Paimon. -Now, you have two view of the table: one view is the Paimon data which has minute-level latency, one view is the full data union Fluss and Paimon data -which is the latest within second-level latency. +For a table with the option `'table.datalake.enabled' = 'true'`, its data exists in two layers: one remains in Fluss, and the other has already been tiered to Paimon. +You can choose between two views of the table: +- A **Paimon-only view**, which offers minute-level latency but better analytics performance. +- A **combined view** of both Fluss and Paimon data, which provides second-level latency but may result in slightly degraded query performance. -Flink empowers you to decide to choose which view: -- Only Paimon means a better analytics performance but with worse data freshness -- Combing Fluss and Paimon means a better data freshness but with analytics performance degrading +#### Read Data Only in Paimon -#### Read data only in Paimon -To point to read data in Paimon, you must specify the table with `$lake` suffix, the following -SQL shows how to do that: +To read only data stored in Paimon, use the `$lake` suffix in the table name. The following example demonstrates this: ```sql title="Flink SQL" --- assume we have a table named `orders` +-- Assume we have a table named `orders` --- read from paimon +-- Read from Paimon SELECT COUNT(*) FROM orders$lake; ``` ```sql title="Flink SQL" --- we can also query the system tables +-- We can also query the system tables SELECT * FROM orders$lake$snapshots; ``` -When specify the table with `$lake` suffix in query, it just acts like a normal Paimon table, so it inherits all ability of Paimon table. -You can enjoy all the features that Flink's query supports/optimization on Paimon, like query system tables, time travel, etc. See more -about Paimon's [sql-query](https://paimon.apache.org/docs/0.9/flink/sql-query/#sql-query). +When you specify the `$lake` suffix in a query, the table behaves like a standard Paimon table and inherits all its capabilities. +This allows you to take full advantage of Flink's query support and optimizations on Paimon, such as querying system tables, time travel, and more. +For further information, refer to Paimon’s [SQL Query documentation](https://paimon.apache.org/docs/0.9/flink/sql-query/#sql-query). +#### Union Read of Data in Fluss and Paimon -#### Union read data in Fluss and Paimon -To point to read the full data that union Fluss and Paimon, you just query it as a normal table without any suffix or others, the following -SQL shows how to do that: +To read the full dataset, which includes both Fluss and Paimon data, simply query the table without any suffix. The following example illustrates this: ```sql title="Flink SQL" --- query will union data of Fluss and Paimon -SELECT SUM(order_count) as total_orders FROM ads_nation_purchase_power; +-- Query will union data from Fluss and Paimon +SELECT SUM(order_count) AS total_orders FROM ads_nation_purchase_power; ``` -The query may look slower than only querying data in Paimon, but it queries the full data which means better data freshness. You can -run the query multi-times, you should get different results in every one run as the data is written to the table continuously. -### Read by other engines +This query may run slower than reading only from Paimon, but it returns the most up-to-date data. If you execute the query multiple times, you may observe different results due to continuous data ingestion. + +### Read by Other Engines + +Since the data tiered to Paimon from Fluss is stored as a standard Paimon table, you can use any engine that supports Paimon to read it. Below is an example using [StarRocks](https://paimon.apache.org/docs/master/engines/starrocks/): -As the tiered data in Paimon compacted from Fluss is also a standard Paimon table, you can use -[any engines](https://paimon.apache.org/docs/0.9/engines/overview/) that support Paimon to read the data. Here, we take [StarRocks](https://paimon.apache.org/docs/master/engines/starrocks/) as the engine to read the data: +First, create a Paimon catalog in StarRocks: -First, create a Paimon catalog for StarRocks: ```sql title="StarRocks SQL" CREATE EXTERNAL CATALOG paimon_catalog PROPERTIES @@ -92,23 +127,24 @@ PROPERTIES ); ``` -**NOTE**: The configuration value `paimon.catalog.type` and `paimon.catalog.warehouse` should be same as how you configure the Paimon as lakehouse storage for Fluss in `server.yaml`. +> **NOTE**: The configuration values for `paimon.catalog.type` and `paimon.catalog.warehouse` must match those used when configuring Paimon as the lakehouse storage for Fluss in `server.yaml`. + +Then, you can query the `orders` table using StarRocks: -Then, you can query the `orders` table by StarRocks: ```sql title="StarRocks SQL" --- the table is in database `fluss` +-- The table is in the database `fluss` SELECT COUNT(*) FROM paimon_catalog.fluss.orders; ``` ```sql title="StarRocks SQL" --- query the system tables, to know the snapshots of the table +-- Query the system tables to view snapshots of the table SELECT * FROM paimon_catalog.fluss.enriched_orders$snapshots; ``` - ## Data Type Mapping -When integrate with Paimon, Fluss automatically converts between Fluss data type and Paimon data type. -The following content shows the mapping between [Fluss data type](table-design/data-types.md) and Paimon data type: + +When integrating with Paimon, Fluss automatically converts between Fluss data types and Paimon data types. +The following table shows the mapping between [Fluss data types](table-design/data-types.md) and Paimon data types: | Fluss Data Type | Paimon Data Type | |-------------------------------|-------------------------------| @@ -127,4 +163,4 @@ The following content shows the mapping between [Fluss data type](table-design/d | TIMESTAMP | TIMESTAMP | | TIMESTAMP WITH LOCAL TIMEZONE | TIMESTAMP WITH LOCAL TIMEZONE | | BINARY | BINARY | -| BYTES | BYTES | +| BYTES | BYTES | \ No newline at end of file