diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java index 02aa50c008a8..60033fac28a7 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/Options.java @@ -244,7 +244,7 @@ public static class SingleChoiceOptionBuilder { * @param value The default value for the config option * @return The config option with the default value. */ - public Option defaultValue(T value) { + public SingleChoiceOption defaultValue(T value) { return new SingleChoiceOption(key, typeReference, optionValues, value); } @@ -253,7 +253,7 @@ public Option defaultValue(T value) { * * @return The config option without a default value. */ - public Option noDefaultValue() { + public SingleChoiceOption noDefaultValue() { return new SingleChoiceOption(key, typeReference, optionValues, null); } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/SingleChoiceOption.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/SingleChoiceOption.java index fd3697f681f4..b3a6574e9ed7 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/SingleChoiceOption.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/SingleChoiceOption.java @@ -23,7 +23,7 @@ import java.util.List; -public class SingleChoiceOption extends Option { +public class SingleChoiceOption extends Option { @Getter private final List optionValues; @@ -32,4 +32,10 @@ public SingleChoiceOption( super(key, typeReference, defaultValue); this.optionValues = optionValues; } + + @Override + public SingleChoiceOption withDescription(String description) { + this.description = description; + return this; + } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java index f269c9f2cb78..cdf69e1bbec9 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DataSaveMode.java @@ -1,20 +1,3 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package org.apache.seatunnel.api.sink; /** @@ -31,6 +14,10 @@ public enum DataSaveMode { // path and files in the path, create new files in the path. KEEP_SCHEMA_AND_DATA, + // The connector provides custom processing methods, such as running user provided SQL or shell + // scripts, etc + CUSTOM_PROCESSING, + // Throw error when table is exists for MySQL. Throw error when path is exists. ERROR_WHEN_EXISTS } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java deleted file mode 100644 index 4bf320b49cbd..000000000000 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.api.sink; - -public class SinkCommonOptions { - - public static final String DATA_SAVE_MODE = "save_mode"; -} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportDataSaveMode.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportDataSaveMode.java index 7d0c2838befb..6feb5898cbd2 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportDataSaveMode.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportDataSaveMode.java @@ -1,71 +1,15 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package org.apache.seatunnel.api.sink; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; - -import java.util.List; -import java.util.Locale; - /** The Sink Connectors which support data SaveMode should implement this interface */ public interface SupportDataSaveMode { - - /** - * We hope every sink connector use the same option name to config SaveMode, So I add - * checkOptions method to this interface. checkOptions method have a default implement to check - * whether `save_mode` parameter is in config. - * - * @param config config of sink Connector - */ - default void checkOptions(Config config) { - if (config.hasPath(SinkCommonOptions.DATA_SAVE_MODE)) { - String tableSaveMode = config.getString(SinkCommonOptions.DATA_SAVE_MODE); - DataSaveMode dataSaveMode = - DataSaveMode.valueOf(tableSaveMode.toUpperCase(Locale.ROOT)); - if (!supportedDataSaveModeValues().contains(dataSaveMode)) { - throw new SeaTunnelRuntimeException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - "This connector don't support save mode: " + dataSaveMode); - } - } - } - - /** - * Get the {@link DataSaveMode} that the user configured - * - * @return DataSaveMode - */ - DataSaveMode getDataSaveMode(); - + String SAVE_MODE_KEY = "savemode"; /** - * Return the {@link DataSaveMode} list supported by this connector + * Return the value of DataSaveMode configured by user in the job config file. * - * @return the list of supported data save modes + * @return */ - List supportedDataSaveModeValues(); + DataSaveMode getUserConfigSaveMode(); - /** - * The implementation of specific logic according to different {@link DataSaveMode} - * - * @param saveMode data save mode - */ - void handleSaveMode(DataSaveMode saveMode); + /** The implementation of specific logic according to different {@link DataSaveMode} */ + void handleSaveMode(DataSaveMode userConfigSaveMode); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java index 6ac939149c71..f30900269912 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java @@ -18,21 +18,15 @@ package org.apache.seatunnel.api.table.factory; import org.apache.seatunnel.api.common.CommonOptions; -import org.apache.seatunnel.api.configuration.Option; -import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.ConfigValidator; import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.sink.DataSaveMode; import org.apache.seatunnel.api.sink.SeaTunnelSink; -import org.apache.seatunnel.api.sink.SinkCommonOptions; -import org.apache.seatunnel.api.sink.SupportDataSaveMode; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.source.SupportParallelism; import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.transform.SeaTunnelTransform; @@ -289,28 +283,6 @@ public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory factory) { if (sinkOptionRule == null) { throw new FactoryException("sinkOptionRule can not be null"); } - - try { - TableSink sink = factory.createSink(null); - if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) { - SupportDataSaveMode supportDataSaveModeSink = (SupportDataSaveMode) sink; - Option saveMode = - Options.key(SinkCommonOptions.DATA_SAVE_MODE) - .singleChoice( - DataSaveMode.class, - supportDataSaveModeSink.supportedDataSaveModeValues()) - .noDefaultValue() - .withDescription("data save mode"); - OptionRule sinkCommonOptionRule = OptionRule.builder().required(saveMode).build(); - sinkOptionRule - .getOptionalOptions() - .addAll(sinkCommonOptionRule.getOptionalOptions()); - } - } catch (Exception e) { - LOG.warn( - "Add save mode option need sink connector support create sink by TableSinkFactory"); - } - return sinkOptionRule; } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index 2cce860d2eb4..b9975a3106fb 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -51,7 +51,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -176,15 +175,10 @@ public Optional> getCommitInfoSerializer() { } @Override - public DataSaveMode getDataSaveMode() { + public DataSaveMode getUserConfigSaveMode() { return dataSaveMode; } - @Override - public List supportedDataSaveModeValues() { - return Collections.singletonList(DataSaveMode.KEEP_SCHEMA_AND_DATA); - } - @Override public void handleSaveMode(DataSaveMode saveMode) { if (catalogTable != null) { diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java index 91bfb9358b17..f5a2d0dc88c1 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.starrocks.config; import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.DataSaveMode; import lombok.Getter; import lombok.Setter; @@ -58,6 +59,8 @@ public enum StreamLoadFormat { private String saveModeCreateTemplate; + private DataSaveMode dataSaveMode; + @Getter private final Map streamLoadProps = new HashMap<>(); public static SinkConfig of(ReadonlyConfig config) { @@ -89,6 +92,7 @@ public static SinkConfig of(ReadonlyConfig config) { config.getOptional(StarRocksSinkOptions.COLUMN_SEPARATOR) .ifPresent(sinkConfig::setColumnSeparator); sinkConfig.setLoadFormat(config.get(StarRocksSinkOptions.LOAD_FORMAT)); + sinkConfig.setDataSaveMode(config.get(StarRocksSinkOptions.SAVE_MODE)); return sinkConfig; } } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java index 4f87b690f187..02918f0f96d7 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java @@ -19,8 +19,12 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.SingleChoiceOption; +import org.apache.seatunnel.api.sink.DataSaveMode; +import org.apache.seatunnel.api.sink.SupportDataSaveMode; import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.StreamLoadFormat; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -133,4 +137,12 @@ public interface StarRocksSinkOptions { .enumType(StreamLoadFormat.class) .defaultValue(StreamLoadFormat.JSON) .withDescription(""); + + SingleChoiceOption SAVE_MODE = + Options.key(SupportDataSaveMode.SAVE_MODE_KEY) + .singleChoice( + DataSaveMode.class, Arrays.asList(DataSaveMode.KEEP_SCHEMA_AND_DATA)) + .defaultValue(DataSaveMode.KEEP_SCHEMA_AND_DATA) + .withDescription( + "Table structure and data processing methods that already exist on the target end"); } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java index ae808a36eef7..54163bd6f1de 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java @@ -42,9 +42,6 @@ import com.google.auto.service.AutoService; import lombok.NoArgsConstructor; -import java.util.Collections; -import java.util.List; - @NoArgsConstructor @AutoService(SeaTunnelSink.class) public class StarRocksSink extends AbstractSimpleSink @@ -56,12 +53,11 @@ public class StarRocksSink extends AbstractSimpleSink private CatalogTable catalogTable; - public StarRocksSink( - DataSaveMode dataSaveMode, SinkConfig sinkConfig, CatalogTable catalogTable) { - this.dataSaveMode = dataSaveMode; + public StarRocksSink(SinkConfig sinkConfig, CatalogTable catalogTable) { this.sinkConfig = sinkConfig; this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType(); this.catalogTable = catalogTable; + this.dataSaveMode = sinkConfig.getDataSaveMode(); } @Override @@ -77,7 +73,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { if (StringUtils.isEmpty(sinkConfig.getTable()) && catalogTable != null) { sinkConfig.setTable(catalogTable.getTableId().getTableName()); } - dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA; + dataSaveMode = sinkConfig.getDataSaveMode(); } private void autoCreateTable(String template) { @@ -117,15 +113,10 @@ public AbstractSinkWriter createWriter(SinkWriter.Context co } @Override - public DataSaveMode getDataSaveMode() { + public DataSaveMode getUserConfigSaveMode() { return dataSaveMode; } - @Override - public List supportedDataSaveModeValues() { - return Collections.singletonList(DataSaveMode.KEEP_SCHEMA_AND_DATA); - } - @Override public void handleSaveMode(DataSaveMode saveMode) { if (catalogTable != null) { diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java index 44a84c54898b..471be7001b68 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java @@ -18,7 +18,6 @@ package org.apache.seatunnel.connectors.seatunnel.starrocks.sink; import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.sink.DataSaveMode; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; @@ -56,6 +55,7 @@ public OptionRule optionRule() { StarRocksSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS, StarRocksSinkOptions.STARROCKS_CONFIG, StarRocksSinkOptions.ENABLE_UPSERT_DELETE, + StarRocksSinkOptions.SAVE_MODE, StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE) .build(); } @@ -67,6 +67,6 @@ public TableSink createSink(TableFactoryContext context) { if (StringUtils.isBlank(sinkConfig.getTable())) { sinkConfig.setTable(catalogTable.getTableId().getTableName()); } - return () -> new StarRocksSink(DataSaveMode.KEEP_SCHEMA_AND_DATA, sinkConfig, catalogTable); + return () -> new StarRocksSink(sinkConfig, catalogTable); } } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index 8fd9d90bd9f2..92a1fab842cd 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -615,7 +615,7 @@ private static T findLast(LinkedHashMap map) { public static void handleSaveMode(SeaTunnelSink sink) { if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) { SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink; - DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode(); + DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode(); saveModeSink.handleSaveMode(dataSaveMode); } }