Skip to content

Commit

Permalink
improve savemode api
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 committed May 17, 2023
1 parent c73b933 commit 10d7a46
Show file tree
Hide file tree
Showing 12 changed files with 43 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public static class SingleChoiceOptionBuilder<T> {
* @param value The default value for the config option
* @return The config option with the default value.
*/
public Option<T> defaultValue(T value) {
public SingleChoiceOption<T> defaultValue(T value) {
return new SingleChoiceOption<T>(key, typeReference, optionValues, value);
}

Expand All @@ -253,7 +253,7 @@ public Option<T> defaultValue(T value) {
*
* @return The config option without a default value.
*/
public Option<T> noDefaultValue() {
public SingleChoiceOption<T> noDefaultValue() {
return new SingleChoiceOption<T>(key, typeReference, optionValues, null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import java.util.List;

public class SingleChoiceOption<T> extends Option {
public class SingleChoiceOption<T> extends Option<T> {

@Getter private final List<T> optionValues;

Expand All @@ -32,4 +32,10 @@ public SingleChoiceOption(
super(key, typeReference, defaultValue);
this.optionValues = optionValues;
}

@Override
public SingleChoiceOption<T> withDescription(String description) {
this.description = description;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,3 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.sink;

/**
Expand All @@ -31,6 +14,10 @@ public enum DataSaveMode {
// path and files in the path, create new files in the path.
KEEP_SCHEMA_AND_DATA,

// The connector provides custom processing methods, such as running user provided SQL or shell
// scripts, etc
CUSTOM_PROCESSING,

// Throw error when table is exists for MySQL. Throw error when path is exists.
ERROR_WHEN_EXISTS
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,71 +1,15 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;

import java.util.List;
import java.util.Locale;

/** The Sink Connectors which support data SaveMode should implement this interface */
public interface SupportDataSaveMode {

/**
* We hope every sink connector use the same option name to config SaveMode, So I add
* checkOptions method to this interface. checkOptions method have a default implement to check
* whether `save_mode` parameter is in config.
*
* @param config config of sink Connector
*/
default void checkOptions(Config config) {
if (config.hasPath(SinkCommonOptions.DATA_SAVE_MODE)) {
String tableSaveMode = config.getString(SinkCommonOptions.DATA_SAVE_MODE);
DataSaveMode dataSaveMode =
DataSaveMode.valueOf(tableSaveMode.toUpperCase(Locale.ROOT));
if (!supportedDataSaveModeValues().contains(dataSaveMode)) {
throw new SeaTunnelRuntimeException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
"This connector don't support save mode: " + dataSaveMode);
}
}
}

/**
* Get the {@link DataSaveMode} that the user configured
*
* @return DataSaveMode
*/
DataSaveMode getDataSaveMode();

String SAVE_MODE_KEY = "savemode";
/**
* Return the {@link DataSaveMode} list supported by this connector
* Return the value of DataSaveMode configured by user in the job config file.
*
* @return the list of supported data save modes
* @return
*/
List<DataSaveMode> supportedDataSaveModeValues();
DataSaveMode getUserConfigSaveMode();

/**
* The implementation of specific logic according to different {@link DataSaveMode}
*
* @param saveMode data save mode
*/
void handleSaveMode(DataSaveMode saveMode);
/** The implementation of specific logic according to different {@link DataSaveMode} */
void handleSaveMode(DataSaveMode userConfigSaveMode);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,15 @@
package org.apache.seatunnel.api.table.factory;

import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;

Expand Down Expand Up @@ -289,28 +283,6 @@ public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory factory) {
if (sinkOptionRule == null) {
throw new FactoryException("sinkOptionRule can not be null");
}

try {
TableSink sink = factory.createSink(null);
if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
SupportDataSaveMode supportDataSaveModeSink = (SupportDataSaveMode) sink;
Option<DataSaveMode> saveMode =
Options.key(SinkCommonOptions.DATA_SAVE_MODE)
.singleChoice(
DataSaveMode.class,
supportDataSaveModeSink.supportedDataSaveModeValues())
.noDefaultValue()
.withDescription("data save mode");
OptionRule sinkCommonOptionRule = OptionRule.builder().required(saveMode).build();
sinkOptionRule
.getOptionalOptions()
.addAll(sinkCommonOptionRule.getOptionalOptions());
}
} catch (Exception e) {
LOG.warn(
"Add save mode option need sink connector support create sink by TableSinkFactory");
}

return sinkOptionRule;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -176,15 +175,10 @@ public Optional<Serializer<XidInfo>> getCommitInfoSerializer() {
}

@Override
public DataSaveMode getDataSaveMode() {
public DataSaveMode getUserConfigSaveMode() {
return dataSaveMode;
}

@Override
public List<DataSaveMode> supportedDataSaveModeValues() {
return Collections.singletonList(DataSaveMode.KEEP_SCHEMA_AND_DATA);
}

@Override
public void handleSaveMode(DataSaveMode saveMode) {
if (catalogTable != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;

import lombok.Getter;
import lombok.Setter;
Expand Down Expand Up @@ -58,6 +59,8 @@ public enum StreamLoadFormat {

private String saveModeCreateTemplate;

private DataSaveMode dataSaveMode;

@Getter private final Map<String, Object> streamLoadProps = new HashMap<>();

public static SinkConfig of(ReadonlyConfig config) {
Expand Down Expand Up @@ -89,6 +92,7 @@ public static SinkConfig of(ReadonlyConfig config) {
config.getOptional(StarRocksSinkOptions.COLUMN_SEPARATOR)
.ifPresent(sinkConfig::setColumnSeparator);
sinkConfig.setLoadFormat(config.get(StarRocksSinkOptions.LOAD_FORMAT));
sinkConfig.setDataSaveMode(config.get(StarRocksSinkOptions.SAVE_MODE));
return sinkConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.SingleChoiceOption;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.StreamLoadFormat;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -133,4 +137,12 @@ public interface StarRocksSinkOptions {
.enumType(StreamLoadFormat.class)
.defaultValue(StreamLoadFormat.JSON)
.withDescription("");

SingleChoiceOption<DataSaveMode> SAVE_MODE =
Options.key(SupportDataSaveMode.SAVE_MODE_KEY)
.singleChoice(
DataSaveMode.class, Arrays.asList(DataSaveMode.KEEP_SCHEMA_AND_DATA))
.defaultValue(DataSaveMode.KEEP_SCHEMA_AND_DATA)
.withDescription(
"Table structure and data processing methods that already exist on the target end");
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@
import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;

import java.util.Collections;
import java.util.List;

@NoArgsConstructor
@AutoService(SeaTunnelSink.class)
public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>
Expand All @@ -56,12 +53,11 @@ public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>

private CatalogTable catalogTable;

public StarRocksSink(
DataSaveMode dataSaveMode, SinkConfig sinkConfig, CatalogTable catalogTable) {
this.dataSaveMode = dataSaveMode;
public StarRocksSink(SinkConfig sinkConfig, CatalogTable catalogTable) {
this.sinkConfig = sinkConfig;
this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
this.catalogTable = catalogTable;
this.dataSaveMode = sinkConfig.getDataSaveMode();
}

@Override
Expand All @@ -77,7 +73,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
if (StringUtils.isEmpty(sinkConfig.getTable()) && catalogTable != null) {
sinkConfig.setTable(catalogTable.getTableId().getTableName());
}
dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
dataSaveMode = sinkConfig.getDataSaveMode();
}

private void autoCreateTable(String template) {
Expand Down Expand Up @@ -117,15 +113,10 @@ public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context co
}

@Override
public DataSaveMode getDataSaveMode() {
public DataSaveMode getUserConfigSaveMode() {
return dataSaveMode;
}

@Override
public List<DataSaveMode> supportedDataSaveModeValues() {
return Collections.singletonList(DataSaveMode.KEEP_SCHEMA_AND_DATA);
}

@Override
public void handleSaveMode(DataSaveMode saveMode) {
if (catalogTable != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
Expand Down Expand Up @@ -56,6 +55,7 @@ public OptionRule optionRule() {
StarRocksSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS,
StarRocksSinkOptions.STARROCKS_CONFIG,
StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
StarRocksSinkOptions.SAVE_MODE,
StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE)
.build();
}
Expand All @@ -67,6 +67,6 @@ public TableSink createSink(TableFactoryContext context) {
if (StringUtils.isBlank(sinkConfig.getTable())) {
sinkConfig.setTable(catalogTable.getTableId().getTableName());
}
return () -> new StarRocksSink(DataSaveMode.KEEP_SCHEMA_AND_DATA, sinkConfig, catalogTable);
return () -> new StarRocksSink(sinkConfig, catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ private static <T> T findLast(LinkedHashMap<?, T> map) {
public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;
DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode();
saveModeSink.handleSaveMode(dataSaveMode);
}
}
Expand Down

0 comments on commit 10d7a46

Please sign in to comment.