From d7f5b2e3c9c39dff9f7778601f230e45c0163014 Mon Sep 17 00:00:00 2001 From: yoogo Date: Fri, 8 Nov 2024 22:12:20 +0800 Subject: [PATCH 1/2] [Improve][Connector-V2][Paimon] upgrade paimon version --- seatunnel-connectors-v2/connector-paimon/pom.xml | 2 +- .../connectors/seatunnel/paimon/sink/PaimonSinkWriter.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/seatunnel-connectors-v2/connector-paimon/pom.xml b/seatunnel-connectors-v2/connector-paimon/pom.xml index 80934e68a2b..87a480e5f4b 100644 --- a/seatunnel-connectors-v2/connector-paimon/pom.xml +++ b/seatunnel-connectors-v2/connector-paimon/pom.xml @@ -30,7 +30,7 @@ SeaTunnel : Connectors V2 : Paimon - 0.7.0-incubating + 0.9.0 2.3.9 diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java index e57e62c9814..213096f8a9b 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java @@ -120,9 +120,9 @@ public PaimonSinkWriter( this.tableSchema = this.table.schema(); BucketMode bucketMode = this.table.bucketMode(); this.dynamicBucket = - BucketMode.DYNAMIC == bucketMode || BucketMode.GLOBAL_DYNAMIC == bucketMode; + BucketMode.HASH_DYNAMIC == bucketMode || BucketMode.CROSS_PARTITION == bucketMode; int bucket = ((FileStoreTable) table).coreOptions().bucket(); - if (bucket == -1 && BucketMode.UNAWARE == bucketMode) { + if (bucket == -1 && BucketMode.BUCKET_UNAWARE == bucketMode) { log.warn("Append only table currently do not support dynamic bucket"); } if (dynamicBucket) { From 97a2ef8f76961a9dde973a930fd1ab66de891f97 Mon Sep 17 00:00:00 2001 From: yoogo Date: Fri, 8 Nov 2024 22:34:26 +0800 Subject: [PATCH 2/2] [Improve][Connector-V2][Paimon] add filesystem config to PaimonConfig --- docs/en/connector-v2/sink/Paimon.md | 3 +- docs/en/connector-v2/source/Paimon.md | 3 +- docs/zh/connector-v2/sink/Paimon.md | 31 ++++++++++--------- .../paimon/catalog/PaimonCatalogLoader.java | 5 +++ .../seatunnel/paimon/config/PaimonConfig.java | 8 +++++ 5 files changed, 33 insertions(+), 17 deletions(-) diff --git a/docs/en/connector-v2/sink/Paimon.md b/docs/en/connector-v2/sink/Paimon.md index c9e4b3a9b61..ed7d39ee16a 100644 --- a/docs/en/connector-v2/sink/Paimon.md +++ b/docs/en/connector-v2/sink/Paimon.md @@ -31,7 +31,7 @@ libfb303-xxx.jar ## Options -| name | type | required | default value | Description | +| name | type | required | default value | Description | |-----------------------------|--------|----------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------| | warehouse | String | Yes | - | Paimon warehouse path | | catalog_type | String | No | filesystem | Catalog type of Paimon, support filesystem and hive | @@ -46,6 +46,7 @@ libfb303-xxx.jar | paimon.table.write-props | Map | No | - | Properties passed through to paimon table initialization, [reference](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions). | | paimon.hadoop.conf | Map | No | - | Properties in hadoop conf | | paimon.hadoop.conf-path | String | No | - | The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files | +| paimon.fs.conf | Map | No | - | Properties when the catalog type is filesystem | ## Changelog You must configure the `changelog-producer=input` option to enable the changelog producer mode of the paimon table. If you use the auto-create table function of paimon sink, you can configure this property in `paimon.table.write-props`. diff --git a/docs/en/connector-v2/source/Paimon.md b/docs/en/connector-v2/source/Paimon.md index e586a4fd9d8..f421c1180ff 100644 --- a/docs/en/connector-v2/source/Paimon.md +++ b/docs/en/connector-v2/source/Paimon.md @@ -17,7 +17,7 @@ Read data from Apache Paimon. ## Options -| name | type | required | default value | +| name | type | required | default value | |-------------------------|--------|----------|---------------| | warehouse | String | Yes | - | | catalog_type | String | No | filesystem | @@ -28,6 +28,7 @@ Read data from Apache Paimon. | query | String | No | - | | paimon.hadoop.conf | Map | No | - | | paimon.hadoop.conf-path | String | No | - | +| paimon.fs.conf | Map | No | - | ### warehouse [string] diff --git a/docs/zh/connector-v2/sink/Paimon.md b/docs/zh/connector-v2/sink/Paimon.md index 375c8c90caf..61b7b661494 100644 --- a/docs/zh/connector-v2/sink/Paimon.md +++ b/docs/zh/connector-v2/sink/Paimon.md @@ -30,21 +30,22 @@ libfb303-xxx.jar ## 连接器选项 -| 名称 | 类型 | 是否必须 | 默认值 | 描述 | -|-----------------------------|------|------|------------------------------|-------------------------------------------------------------------------------------------------------| -| warehouse | 字符串 | 是 | - | Paimon warehouse路径 | -| catalog_type | 字符串 | 否 | filesystem | Paimon的catalog类型,目前支持filesystem和hive | -| catalog_uri | 字符串 | 否 | - | Paimon catalog的uri,仅当catalog_type为hive时需要配置 | -| database | 字符串 | 是 | - | 数据库名称 | -| table | 字符串 | 是 | - | 表名 | -| hdfs_site_path | 字符串 | 否 | - | hdfs-site.xml文件路径 | -| schema_save_mode | 枚举 | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | Schema保存模式 | -| data_save_mode | 枚举 | 否 | APPEND_DATA | 数据保存模式 | -| paimon.table.primary-keys | 字符串 | 否 | - | 主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中) | -| paimon.table.partition-keys | 字符串 | 否 | - | 分区字段列表,多字段使用逗号分隔 | -| paimon.table.write-props | Map | 否 | - | Paimon表初始化指定的属性, [参考](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions) | -| paimon.hadoop.conf | Map | 否 | - | Hadoop配置文件属性信息 | -| paimon.hadoop.conf-path | 字符串 | 否 | - | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 | +| 名称 | 类型 | 是否必须 | 默认值 | 描述 | +|-----------------------------|-----|------|------------------------------|------------------------------------------------------------------------------------------------------| +| warehouse | 字符串 | 是 | - | Paimon warehouse路径 | +| catalog_type | 字符串 | 否 | filesystem | Paimon的catalog类型,目前支持filesystem和hive | +| catalog_uri | 字符串 | 否 | - | Paimon catalog的uri,仅当catalog_type为hive时需要配置 | +| database | 字符串 | 是 | - | 数据库名称 | +| table | 字符串 | 是 | - | 表名 | +| hdfs_site_path | 字符串 | 否 | - | hdfs-site.xml文件路径 | +| schema_save_mode | 枚举 | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | Schema保存模式 | +| data_save_mode | 枚举 | 否 | APPEND_DATA | 数据保存模式 | +| paimon.table.primary-keys | 字符串 | 否 | - | 主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中) | +| paimon.table.partition-keys | 字符串 | 否 | - | 分区字段列表,多字段使用逗号分隔 | +| paimon.table.write-props | Map | 否 | - | Paimon表初始化指定的属性, [参考](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions) | +| paimon.hadoop.conf | Map | 否 | - | Hadoop配置文件属性信息 | +| paimon.hadoop.conf-path | 字符串 | 否 | - | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 | +| paimon.fs.conf | Map | 否 | - | 当catalog类型为filesystem时,传入的配置属性信息 | ## 更新日志 你必须配置`changelog-producer=input`来启用paimon表的changelog产生模式。如果你使用了paimon sink的自动建表功能,你可以在`paimon.table.write-props`中指定这个属性。 diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java index 774576c408f..a74dcdd86b2 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java @@ -55,11 +55,14 @@ public class PaimonCatalogLoader implements Serializable { private PaimonHadoopConfiguration paimonHadoopConfiguration; + private Map paimonFilesystemConfiguration; + public PaimonCatalogLoader(PaimonConfig paimonConfig) { this.warehouse = paimonConfig.getWarehouse(); this.catalogType = paimonConfig.getCatalogType(); this.catalogUri = paimonConfig.getCatalogUri(); this.paimonHadoopConfiguration = PaimonSecurityContext.loadHadoopConfig(paimonConfig); + this.paimonFilesystemConfiguration = paimonConfig.getFsConfProps(); } public Catalog loadCatalog() { @@ -77,6 +80,8 @@ public Catalog loadCatalog() { paimonHadoopConfiguration .getPropsWithPrefix(StringUtils.EMPTY) .forEach((k, v) -> optionsMap.put(k, v)); + } else if (PaimonCatalogEnum.FILESYSTEM.getType().equals(catalogType.getType())) { + optionsMap.putAll(paimonFilesystemConfiguration); } final Options options = Options.fromMap(optionsMap); PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration); diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java index d394455d2f0..7097a7d2f58 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java @@ -106,6 +106,12 @@ public class PaimonConfig implements Serializable { .withDescription( "The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files"); + public static final Option> FS_CONF = + Options.key("paimon.fs.conf") + .mapType() + .defaultValue(new HashMap<>()) + .withDescription("Properties in conf when catalog type is 'filesystem'"); + protected String catalogName; protected PaimonCatalogEnum catalogType; protected String catalogUri; @@ -114,6 +120,7 @@ public class PaimonConfig implements Serializable { protected String table; protected String hdfsSitePath; protected Map hadoopConfProps; + protected Map fsConfProps; protected String hadoopConfPath; public PaimonConfig(ReadonlyConfig readonlyConfig) { @@ -125,6 +132,7 @@ public PaimonConfig(ReadonlyConfig readonlyConfig) { this.hdfsSitePath = readonlyConfig.get(HDFS_SITE_PATH); this.hadoopConfProps = readonlyConfig.get(HADOOP_CONF); this.hadoopConfPath = readonlyConfig.get(HADOOP_CONF_PATH); + this.fsConfProps = readonlyConfig.get(FS_CONF); this.catalogType = readonlyConfig.get(CATALOG_TYPE); if (PaimonCatalogEnum.HIVE.getType().equals(catalogType.getType())) { this.catalogUri =