Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Paimon] upgrade version and add fs config #8002

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/en/connector-v2/sink/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ libfb303-xxx.jar

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Piamon version 0.7 is incompatible with version 0.9. You are advised to provide the mapping between the ST Paimon Connector and the Paimon version.

## Options

| name | type | required | default value | Description |
| name | type | required | default value | Description |
|-----------------------------|--------|----------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| warehouse | String | Yes | - | Paimon warehouse path |
| catalog_type | String | No | filesystem | Catalog type of Paimon, support filesystem and hive |
Expand All @@ -46,6 +46,7 @@ libfb303-xxx.jar
| paimon.table.write-props | Map | No | - | Properties passed through to paimon table initialization, [reference](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions). |
| paimon.hadoop.conf | Map | No | - | Properties in hadoop conf |
| paimon.hadoop.conf-path | String | No | - | The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files |
| paimon.fs.conf | Map | No | - | Properties when the catalog type is filesystem |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add paimon.fs.conf examples


## Changelog
You must configure the `changelog-producer=input` option to enable the changelog producer mode of the paimon table. If you use the auto-create table function of paimon sink, you can configure this property in `paimon.table.write-props`.
Expand Down
3 changes: 2 additions & 1 deletion docs/en/connector-v2/source/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Read data from Apache Paimon.

## Options

| name | type | required | default value |
| name | type | required | default value |
|-------------------------|--------|----------|---------------|
| warehouse | String | Yes | - |
| catalog_type | String | No | filesystem |
Expand All @@ -28,6 +28,7 @@ Read data from Apache Paimon.
| query | String | No | - |
| paimon.hadoop.conf | Map | No | - |
| paimon.hadoop.conf-path | String | No | - |
| paimon.fs.conf | Map | No | - |

### warehouse [string]

Expand Down
31 changes: 16 additions & 15 deletions docs/zh/connector-v2/sink/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,22 @@ libfb303-xxx.jar

## 连接器选项

| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|-----------------------------|------|------|------------------------------|-------------------------------------------------------------------------------------------------------|
| warehouse | 字符串 | 是 | - | Paimon warehouse路径 |
| catalog_type | 字符串 | 否 | filesystem | Paimon的catalog类型,目前支持filesystem和hive |
| catalog_uri | 字符串 | 否 | - | Paimon catalog的uri,仅当catalog_type为hive时需要配置 |
| database | 字符串 | 是 | - | 数据库名称 |
| table | 字符串 | 是 | - | 表名 |
| hdfs_site_path | 字符串 | 否 | - | hdfs-site.xml文件路径 |
| schema_save_mode | 枚举 | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | Schema保存模式 |
| data_save_mode | 枚举 | 否 | APPEND_DATA | 数据保存模式 |
| paimon.table.primary-keys | 字符串 | 否 | - | 主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中) |
| paimon.table.partition-keys | 字符串 | 否 | - | 分区字段列表,多字段使用逗号分隔 |
| paimon.table.write-props | Map | 否 | - | Paimon表初始化指定的属性, [参考](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions) |
| paimon.hadoop.conf | Map | 否 | - | Hadoop配置文件属性信息 |
| paimon.hadoop.conf-path | 字符串 | 否 | - | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 |
| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|-----------------------------|-----|------|------------------------------|------------------------------------------------------------------------------------------------------|
| warehouse | 字符串 | 是 | - | Paimon warehouse路径 |
| catalog_type | 字符串 | 否 | filesystem | Paimon的catalog类型,目前支持filesystem和hive |
| catalog_uri | 字符串 | 否 | - | Paimon catalog的uri,仅当catalog_type为hive时需要配置 |
| database | 字符串 | 是 | - | 数据库名称 |
| table | 字符串 | 是 | - | 表名 |
| hdfs_site_path | 字符串 | 否 | - | hdfs-site.xml文件路径 |
| schema_save_mode | 枚举 | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST | Schema保存模式 |
| data_save_mode | 枚举 | 否 | APPEND_DATA | 数据保存模式 |
| paimon.table.primary-keys | 字符串 | 否 | - | 主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中) |
| paimon.table.partition-keys | 字符串 | 否 | - | 分区字段列表,多字段使用逗号分隔 |
| paimon.table.write-props | Map | 否 | - | Paimon表初始化指定的属性, [参考](https://paimon.apache.org/docs/master/maintenance/configurations/#coreoptions) |
| paimon.hadoop.conf | Map | 否 | - | Hadoop配置文件属性信息 |
| paimon.hadoop.conf-path | 字符串 | 否 | - | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 |
| paimon.fs.conf | Map | 否 | - | 当catalog类型为filesystem时,传入的配置属性信息 |

## 更新日志
你必须配置`changelog-producer=input`来启用paimon表的changelog产生模式。如果你使用了paimon sink的自动建表功能,你可以在`paimon.table.write-props`中指定这个属性。
Expand Down
2 changes: 1 addition & 1 deletion seatunnel-connectors-v2/connector-paimon/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<name>SeaTunnel : Connectors V2 : Paimon</name>

<properties>
<paimon.version>0.7.0-incubating</paimon.version>
<paimon.version>0.9.0</paimon.version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<hive.version>2.3.9</hive.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,14 @@ public class PaimonCatalogLoader implements Serializable {

private PaimonHadoopConfiguration paimonHadoopConfiguration;

private Map<String, String> paimonFilesystemConfiguration;

public PaimonCatalogLoader(PaimonConfig paimonConfig) {
this.warehouse = paimonConfig.getWarehouse();
this.catalogType = paimonConfig.getCatalogType();
this.catalogUri = paimonConfig.getCatalogUri();
this.paimonHadoopConfiguration = PaimonSecurityContext.loadHadoopConfig(paimonConfig);
this.paimonFilesystemConfiguration = paimonConfig.getFsConfProps();
}

public Catalog loadCatalog() {
Expand All @@ -77,6 +80,8 @@ public Catalog loadCatalog() {
paimonHadoopConfiguration
.getPropsWithPrefix(StringUtils.EMPTY)
.forEach((k, v) -> optionsMap.put(k, v));
} else if (PaimonCatalogEnum.FILESYSTEM.getType().equals(catalogType.getType())) {
optionsMap.putAll(paimonFilesystemConfiguration);
}
final Options options = Options.fromMap(optionsMap);
PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ public class PaimonConfig implements Serializable {
.withDescription(
"The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files");

public static final Option<Map<String, String>> FS_CONF =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of paimon.fs.conf configuration? Integrate oss s3 configuration?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, current connector don't write to s3/oss, I will add some examples

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think you @yoogoc . The other filesystems are available as plugins in paimon, so Paimon-bundle has no s3 or oss dependencies. Other than that, I don't think we should have a separate fs.conf for all of this, so we can just do it in the original hadoop.conf. I already support writing to s3, it will be committed today, if you still need oss you can continue to develop base it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course

Options.key("paimon.fs.conf")
.mapType()
.defaultValue(new HashMap<>())
.withDescription("Properties in conf when catalog type is 'filesystem'");

protected String catalogName;
protected PaimonCatalogEnum catalogType;
protected String catalogUri;
Expand All @@ -114,6 +120,7 @@ public class PaimonConfig implements Serializable {
protected String table;
protected String hdfsSitePath;
protected Map<String, String> hadoopConfProps;
protected Map<String, String> fsConfProps;
protected String hadoopConfPath;

public PaimonConfig(ReadonlyConfig readonlyConfig) {
Expand All @@ -125,6 +132,7 @@ public PaimonConfig(ReadonlyConfig readonlyConfig) {
this.hdfsSitePath = readonlyConfig.get(HDFS_SITE_PATH);
this.hadoopConfProps = readonlyConfig.get(HADOOP_CONF);
this.hadoopConfPath = readonlyConfig.get(HADOOP_CONF_PATH);
this.fsConfProps = readonlyConfig.get(FS_CONF);
this.catalogType = readonlyConfig.get(CATALOG_TYPE);
if (PaimonCatalogEnum.HIVE.getType().equals(catalogType.getType())) {
this.catalogUri =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ public PaimonSinkWriter(
this.tableSchema = this.table.schema();
BucketMode bucketMode = this.table.bucketMode();
this.dynamicBucket =
BucketMode.DYNAMIC == bucketMode || BucketMode.GLOBAL_DYNAMIC == bucketMode;
BucketMode.HASH_DYNAMIC == bucketMode || BucketMode.CROSS_PARTITION == bucketMode;
int bucket = ((FileStoreTable) table).coreOptions().bucket();
if (bucket == -1 && BucketMode.UNAWARE == bucketMode) {
if (bucket == -1 && BucketMode.BUCKET_UNAWARE == bucketMode) {
log.warn("Append only table currently do not support dynamic bucket");
}
if (dynamicBucket) {
Expand Down
Loading