From 52c0773d9b7e21053310116c3e625f52ec92baf1 Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Wed, 8 Jan 2025 15:18:10 +0800 Subject: [PATCH 1/3] [FLINK-37042] Rename maxcompute pipieline connector options with kebab-case --- .../pipeline-connectors/maxcompute.md | 24 +++++----- .../pipeline-connectors/maxcompute.md | 24 +++++----- .../maxcompute/MaxComputeDataSinkFactory.java | 20 ++++---- .../maxcompute/MaxComputeDataSinkOptions.java | 48 ++++++++++--------- .../maxcompute/options/CompressAlgorithm.java | 32 +++++++++++++ .../options/MaxComputeWriteOptions.java | 6 +-- 6 files changed, 94 insertions(+), 60 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/CompressAlgorithm.java diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/maxcompute.md b/docs/content.zh/docs/connectors/pipeline-connectors/maxcompute.md index dd68721a10b..63e2f4700ff 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/maxcompute.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/maxcompute.md @@ -94,7 +94,7 @@ pipeline: Sink 的名称. - accessId + access-id required (none) String @@ -102,7 +102,7 @@ pipeline: AccessKey管理页面 获取AccessKey ID。 - accessKey + access-key required (none) String @@ -126,14 +126,14 @@ pipeline: MaxCompute控制台,在 工作区 > 项目管理 页面获取MaxCompute项目名称。 - tunnelEndpoint + tunnel.endpoint optional (none) String MaxCompute Tunnel服务的连接地址,通常这项配置可以根据指定的project所在的region进行自动路由。仅在使用代理等特殊网络环境下使用该配置。 - quotaName + quota.name optional (none) String @@ -141,14 +141,14 @@ pipeline: 使用 Maxcompute 独享资源组 - stsToken + sts-token optional (none) String 当使用RAM角色颁发的短时有效的访问令牌(STS Token)进行鉴权时,需要指定该参数。 - bucketsNum + buckets-num optional 16 Integer @@ -156,35 +156,35 @@ pipeline: Delta Table 概述 - compressAlgorithm + compress.algorithm optional zlib String - 写入MaxCompute时使用的数据压缩算法,当前支持raw(不进行压缩),zlibsnappy。 + 写入MaxCompute时使用的数据压缩算法,当前支持raw(不进行压缩),zlib, lz4snappy。 - totalBatchSize + total.buffer-size optional 64MB String 内存中缓冲的数据量大小,单位为分区级(非分区表单位为表级),不同分区(表)的缓冲区相互独立,达到阈值后数据写入到MaxCompute。 - bucketBatchSize + bucket.buffer-size optional 4MB String 内存中缓冲的数据量大小,单位为桶级,仅写入 Delta 表时生效。不同数据桶的缓冲区相互独立,达到阈值后将该桶数据写入到MaxCompute。 - numCommitThreads + commit.thread-num optional 16 Integer checkpoint阶段,能够同时处理的分区(表)数量。 - numFlushConcurrent + flush.concurrent-num optional 4 Integer diff --git a/docs/content/docs/connectors/pipeline-connectors/maxcompute.md b/docs/content/docs/connectors/pipeline-connectors/maxcompute.md index d1d39a6457a..f05cca380ec 100644 --- a/docs/content/docs/connectors/pipeline-connectors/maxcompute.md +++ b/docs/content/docs/connectors/pipeline-connectors/maxcompute.md @@ -94,7 +94,7 @@ pipeline: The name of the sink. - accessId + access-id required (none) String @@ -102,7 +102,7 @@ pipeline: AccessKey management page Obtain AccessKey ID. - accessKey + access-key required (none) String @@ -124,63 +124,63 @@ pipeline: The name of the MaxCompute project. You can log in to the MaxCompute console and obtain the MaxCompute project name on the Workspace > Project Management page. - tunnelEndpoint + tunnel.endpoint optional (none) String The connection address for the MaxCompute Tunnel service. Typically, this configuration can be auto-routed based on the region where the specified project is located. It is used only in special network environments such as when using a proxy. - quotaName + quota.name optional (none) String The name of the exclusive resource group for MaxCompute data transfer. If not specified, the shared resource group is used. For details, refer to Using exclusive resource groups for Maxcompute - stsToken + sts-token optional (none) String When using a temporary access token (STS Token) issued by a RAM role for authentication, this parameter must be specified. - bucketsNum + buckets-num optional 16 Integer The number of buckets used when auto-creating MaxCompute Delta tables. For usage, refer to Delta Table Overview - compressAlgorithm + compress.algorithm optional zlib String - The data compression algorithm used when writing to MaxCompute. Currently supports raw (no compression), zlib, and snappy. + The data compression algorithm used when writing to MaxCompute. Currently supports raw (no compression), zlib, lz4, and snappy. - totalBatchSize + total.buffer-size optional 64MB String The size of the data buffer in memory, by partition level (for non-partitioned tables, by table level). Buffers for different partitions (tables) are independent, and data is written to MaxCompute when the threshold is reached. - bucketBatchSize + bucket.buffer-size optional 4MB String The size of the data buffer in memory, by bucket level. This is effective only when writing to Delta tables. Buffers for different data buckets are independent, and the bucket data is written to MaxCompute when the threshold is reached. - numCommitThreads + commit.thread-num optional 16 Integer The number of partitions (tables) that can be processed simultaneously during the checkpoint stage. - numFlushConcurrent + flush.concurrent-num optional 4 Integer diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkFactory.java index 6f83853785a..87d87bea5b8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkFactory.java @@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.factories.DataSinkFactory; import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.connectors.maxcompute.options.CompressAlgorithm; import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions; import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeWriteOptions; import org.apache.flink.configuration.MemorySize; @@ -69,21 +70,20 @@ private MaxComputeOptions extractMaxComputeOptions( private MaxComputeWriteOptions extractMaxComputeWriteOptions( Configuration factoryConfiguration) { - int numCommitThread = - factoryConfiguration.get(MaxComputeDataSinkOptions.NUM_COMMIT_THREADS); - String compressAlgorithm = + int numCommitThread = factoryConfiguration.get(MaxComputeDataSinkOptions.COMMIT_THREAD_NUM); + CompressAlgorithm compressAlgorithm = factoryConfiguration.get(MaxComputeDataSinkOptions.COMPRESS_ALGORITHM); int flushConcurrent = - factoryConfiguration.get(MaxComputeDataSinkOptions.NUM_FLUSH_CONCURRENT); + factoryConfiguration.get(MaxComputeDataSinkOptions.FLUSH_CONCURRENT_NUM); long maxBufferSize = MemorySize.parse( factoryConfiguration.get( - MaxComputeDataSinkOptions.TOTAL_BATCH_SIZE)) + MaxComputeDataSinkOptions.TOTAL_BUFFER_SIZE)) .getBytes(); long maxSlotSize = MemorySize.parse( factoryConfiguration.get( - MaxComputeDataSinkOptions.BUCKET_BATCH_SIZE)) + MaxComputeDataSinkOptions.BUCKET_BUFFER_SIZE)) .getBytes(); return MaxComputeWriteOptions.builder() @@ -119,11 +119,11 @@ public Set> optionalOptions() { optionalOptions.add(MaxComputeDataSinkOptions.STS_TOKEN); optionalOptions.add(MaxComputeDataSinkOptions.BUCKETS_NUM); // write options - optionalOptions.add(MaxComputeDataSinkOptions.NUM_COMMIT_THREADS); + optionalOptions.add(MaxComputeDataSinkOptions.COMMIT_THREAD_NUM); optionalOptions.add(MaxComputeDataSinkOptions.COMPRESS_ALGORITHM); - optionalOptions.add(MaxComputeDataSinkOptions.NUM_FLUSH_CONCURRENT); - optionalOptions.add(MaxComputeDataSinkOptions.TOTAL_BATCH_SIZE); - optionalOptions.add(MaxComputeDataSinkOptions.BUCKET_BATCH_SIZE); + optionalOptions.add(MaxComputeDataSinkOptions.FLUSH_CONCURRENT_NUM); + optionalOptions.add(MaxComputeDataSinkOptions.TOTAL_BUFFER_SIZE); + optionalOptions.add(MaxComputeDataSinkOptions.BUCKET_BUFFER_SIZE); return optionalOptions; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkOptions.java index e28272b9108..e85c949410f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkOptions.java @@ -20,18 +20,19 @@ import org.apache.flink.cdc.common.configuration.ConfigOption; import org.apache.flink.cdc.common.configuration.ConfigOptions; +import org.apache.flink.cdc.connectors.maxcompute.options.CompressAlgorithm; /** Options for MaxCompute Data Sink. */ public class MaxComputeDataSinkOptions { // basic options. public static final ConfigOption ACCESS_ID = - ConfigOptions.key("accessId") + ConfigOptions.key("access-id") .stringType() .noDefaultValue() .withDescription("MaxCompute user access id."); public static final ConfigOption ACCESS_KEY = - ConfigOptions.key("accessKey") + ConfigOptions.key("access-key") .stringType() .noDefaultValue() .withDescription("MaxCompute user access key."); @@ -49,59 +50,60 @@ public class MaxComputeDataSinkOptions { .withDescription("MaxCompute project."); public static final ConfigOption TUNNEL_ENDPOINT = - ConfigOptions.key("tunnelEndpoint") + ConfigOptions.key("tunnel.endpoint") .stringType() .noDefaultValue() .withDescription("MaxCompute tunnel end point."); + public static final ConfigOption QUOTA_NAME = - ConfigOptions.key("quotaName") + ConfigOptions.key("quota.name") .stringType() .noDefaultValue() .withDescription( "MaxCompute tunnel quota name, note that not quota nick-name."); public static final ConfigOption STS_TOKEN = - ConfigOptions.key("stsToken") + ConfigOptions.key("sts-token") .stringType() .noDefaultValue() .withDescription("MaxCompute sts token."); public static final ConfigOption BUCKETS_NUM = - ConfigOptions.key("bucketsNum") + ConfigOptions.key("buckets-num") .intType() .defaultValue(16) .withDescription( "The batch size of MaxCompute table when automatically create table."); // write options. - public static final ConfigOption COMPRESS_ALGORITHM = - ConfigOptions.key("compressAlgorithm") - .stringType() - .defaultValue("zlib") + public static final ConfigOption COMPRESS_ALGORITHM = + ConfigOptions.key("compress.algorithm") + .enumType(CompressAlgorithm.class) + .defaultValue(CompressAlgorithm.ZLIB) .withDescription( - "The compress algorithm of data upload to MaxCompute, support 'zlib', 'snappy', 'raw'."); - - public static final ConfigOption TOTAL_BATCH_SIZE = - ConfigOptions.key("totalBatchSize") - .stringType() - .defaultValue("64MB") - .withDescription("The max batch size of data upload to MaxCompute."); + "The compress algorithm of data upload to MaxCompute, support 'zlib', 'snappy', 'lz4', 'raw'."); - public static final ConfigOption BUCKET_BATCH_SIZE = - ConfigOptions.key("bucketBatchSize") + public static final ConfigOption BUCKET_BUFFER_SIZE = + ConfigOptions.key("bucket.buffer-size") .stringType() .defaultValue("4MB") .withDescription( "The max batch size of data per bucket when upload to MaxCompute"); - public static final ConfigOption NUM_COMMIT_THREADS = - ConfigOptions.key("numCommitThreads") + public static final ConfigOption TOTAL_BUFFER_SIZE = + ConfigOptions.key("total.buffer-size") + .stringType() + .defaultValue("64MB") + .withDescription("The max batch size of data upload to MaxCompute."); + + public static final ConfigOption COMMIT_THREAD_NUM = + ConfigOptions.key("commit.thread-num") .intType() .defaultValue(16) .withDescription("The number of threads used to commit data to MaxCompute."); - public static final ConfigOption NUM_FLUSH_CONCURRENT = - ConfigOptions.key("numFlushConcurrent") + public static final ConfigOption FLUSH_CONCURRENT_NUM = + ConfigOptions.key("flush.concurrent-num") .intType() .defaultValue(4) .withDescription("The number of concurrent with flush bucket data."); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/CompressAlgorithm.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/CompressAlgorithm.java new file mode 100644 index 00000000000..75780bee622 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/CompressAlgorithm.java @@ -0,0 +1,32 @@ +package org.apache.flink.cdc.connectors.maxcompute.options; + +/** Compress algorithm for MaxCompute table. */ +public enum CompressAlgorithm { + /** No compress. */ + RAW("raw"), + + /** Zlib compress. */ + ZLIB("zlib"), + + /** LZ4 compress. */ + LZ4("lz4"), + + /** Snappy compress. */ + @Deprecated + SNAPPY("snappy"); + + private final String value; + + CompressAlgorithm(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + @Override + public String toString() { + return value; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/MaxComputeWriteOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/MaxComputeWriteOptions.java index c746063e6b6..3aa933c7a94 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/MaxComputeWriteOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/MaxComputeWriteOptions.java @@ -34,7 +34,7 @@ private MaxComputeWriteOptions(Builder builder) { this.maxBufferSize = builder.maxBufferSize; this.slotBufferSize = builder.slotBufferSize; this.numCommitThread = builder.numCommitThread; - this.compressAlgorithm = builder.compressAlgorithm; + this.compressAlgorithm = builder.compressAlgorithm.getValue(); } public static Builder builder() { @@ -67,7 +67,7 @@ public static class Builder { private long maxBufferSize = 64 * 1024 * 1024L; private long slotBufferSize = 1024 * 1024L; private int numCommitThread = 16; - private String compressAlgorithm = "zlib"; + private CompressAlgorithm compressAlgorithm = CompressAlgorithm.ZLIB; public Builder withFlushConcurrent(int flushConcurrent) { this.flushConcurrent = flushConcurrent; @@ -89,7 +89,7 @@ public Builder withNumCommitThread(int numCommitThread) { return this; } - public Builder withCompressAlgorithm(String compressAlgorithm) { + public Builder withCompressAlgorithm(CompressAlgorithm compressAlgorithm) { this.compressAlgorithm = compressAlgorithm; return this; } From 4c30e5f79cbf74987dc85778c8286101261b8852 Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Wed, 8 Jan 2025 15:43:31 +0800 Subject: [PATCH 2/3] Add license --- .../maxcompute/options/CompressAlgorithm.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/CompressAlgorithm.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/CompressAlgorithm.java index 75780bee622..57df8c224e2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/CompressAlgorithm.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/CompressAlgorithm.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.cdc.connectors.maxcompute.options; /** Compress algorithm for MaxCompute table. */ From 4a04439f97bae6d0279c24b4b1cc49cf0a97884b Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Mon, 13 Jan 2025 12:15:01 +0800 Subject: [PATCH 3/3] update e2e tests --- .../flink/cdc/pipeline/tests/MaxComputeE2eITCase.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MaxComputeE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MaxComputeE2eITCase.java index 16748550d23..9aa78cadafe 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MaxComputeE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MaxComputeE2eITCase.java @@ -108,17 +108,17 @@ private void startTest(String testSet) throws Exception { + "sink:\n" + " type: maxcompute\n" + " name: MaxComputeSink\n" - + " accessId: ak\n" - + " accessKey: sk\n" + + " access-id: ak\n" + + " access-key: sk\n" + " endpoint: " + getEndpoint() + "\n" - + " tunnelEndpoint: " + + " tunnel.endpoint: " + getEndpoint() + "\n" + " project: mocked_mc\n" - + " bucketsNum: 8\n" - + " compressAlgorithm: raw\n" + + " buckets-num: 8\n" + + " compress.algorithm: raw\n" + "\n" + "pipeline:\n" + " parallelism: 4";