Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][API] Improve savemode api #4767

Merged
merged 1 commit into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public static class SingleChoiceOptionBuilder<T> {
* @param value The default value for the config option
* @return The config option with the default value.
*/
public Option<T> defaultValue(T value) {
public SingleChoiceOption<T> defaultValue(T value) {
return new SingleChoiceOption<T>(key, typeReference, optionValues, value);
}

Expand All @@ -253,7 +253,7 @@ public Option<T> defaultValue(T value) {
*
* @return The config option without a default value.
*/
public Option<T> noDefaultValue() {
public SingleChoiceOption<T> noDefaultValue() {
return new SingleChoiceOption<T>(key, typeReference, optionValues, null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import java.util.List;

public class SingleChoiceOption<T> extends Option {
public class SingleChoiceOption<T> extends Option<T> {

@Getter private final List<T> optionValues;

Expand All @@ -32,4 +32,10 @@ public SingleChoiceOption(
super(key, typeReference, defaultValue);
this.optionValues = optionValues;
}

@Override
public SingleChoiceOption<T> withDescription(String description) {
this.description = description;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public enum DataSaveMode {
// path and files in the path, create new files in the path.
KEEP_SCHEMA_AND_DATA,

// The connector provides custom processing methods, such as running user provided SQL or shell
// scripts, etc
CUSTOM_PROCESSING,

// Throw error when table is exists for MySQL. Throw error when path is exists.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only for mysql?

ERROR_WHEN_EXISTS
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,55 +17,16 @@

package org.apache.seatunnel.api.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;

import java.util.List;
import java.util.Locale;

/** The Sink Connectors which support data SaveMode should implement this interface */
public interface SupportDataSaveMode {

/**
* We hope every sink connector use the same option name to config SaveMode, So I add
* checkOptions method to this interface. checkOptions method have a default implement to check
* whether `save_mode` parameter is in config.
*
* @param config config of sink Connector
*/
default void checkOptions(Config config) {
if (config.hasPath(SinkCommonOptions.DATA_SAVE_MODE)) {
String tableSaveMode = config.getString(SinkCommonOptions.DATA_SAVE_MODE);
DataSaveMode dataSaveMode =
DataSaveMode.valueOf(tableSaveMode.toUpperCase(Locale.ROOT));
if (!supportedDataSaveModeValues().contains(dataSaveMode)) {
throw new SeaTunnelRuntimeException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
"This connector don't support save mode: " + dataSaveMode);
}
}
}

String SAVE_MODE_KEY = "savemode";
/**
* Get the {@link DataSaveMode} that the user configured
* Return the value of DataSaveMode configured by user in the job config file.
*
* @return DataSaveMode
* @return
*/
DataSaveMode getDataSaveMode();
DataSaveMode getUserConfigSaveMode();

/**
* Return the {@link DataSaveMode} list supported by this connector
*
* @return the list of supported data save modes
*/
List<DataSaveMode> supportedDataSaveModeValues();

/**
* The implementation of specific logic according to different {@link DataSaveMode}
*
* @param saveMode data save mode
*/
void handleSaveMode(DataSaveMode saveMode);
/** The implementation of specific logic according to different {@link DataSaveMode} */
void handleSaveMode(DataSaveMode userConfigSaveMode);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,15 @@
package org.apache.seatunnel.api.table.factory;

import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;

Expand Down Expand Up @@ -289,28 +283,6 @@ public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory factory) {
if (sinkOptionRule == null) {
throw new FactoryException("sinkOptionRule can not be null");
}

try {
TableSink sink = factory.createSink(null);
if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
SupportDataSaveMode supportDataSaveModeSink = (SupportDataSaveMode) sink;
Option<DataSaveMode> saveMode =
Options.key(SinkCommonOptions.DATA_SAVE_MODE)
.singleChoice(
DataSaveMode.class,
supportDataSaveModeSink.supportedDataSaveModeValues())
.noDefaultValue()
.withDescription("data save mode");
OptionRule sinkCommonOptionRule = OptionRule.builder().required(saveMode).build();
sinkOptionRule
.getOptionalOptions()
.addAll(sinkCommonOptionRule.getOptionalOptions());
}
} catch (Exception e) {
LOG.warn(
"Add save mode option need sink connector support create sink by TableSinkFactory");
}

return sinkOptionRule;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -176,15 +175,10 @@ public Optional<Serializer<XidInfo>> getCommitInfoSerializer() {
}

@Override
public DataSaveMode getDataSaveMode() {
public DataSaveMode getUserConfigSaveMode() {
return dataSaveMode;
}

@Override
public List<DataSaveMode> supportedDataSaveModeValues() {
return Collections.singletonList(DataSaveMode.KEEP_SCHEMA_AND_DATA);
}

@Override
public void handleSaveMode(DataSaveMode saveMode) {
if (catalogTable != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;

import lombok.Getter;
import lombok.Setter;
Expand Down Expand Up @@ -58,6 +59,8 @@ public enum StreamLoadFormat {

private String saveModeCreateTemplate;

private DataSaveMode dataSaveMode;

@Getter private final Map<String, Object> streamLoadProps = new HashMap<>();

public static SinkConfig of(ReadonlyConfig config) {
Expand Down Expand Up @@ -89,6 +92,7 @@ public static SinkConfig of(ReadonlyConfig config) {
config.getOptional(StarRocksSinkOptions.COLUMN_SEPARATOR)
.ifPresent(sinkConfig::setColumnSeparator);
sinkConfig.setLoadFormat(config.get(StarRocksSinkOptions.LOAD_FORMAT));
sinkConfig.setDataSaveMode(config.get(StarRocksSinkOptions.SAVE_MODE));
return sinkConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.SingleChoiceOption;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.StreamLoadFormat;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -133,4 +137,12 @@ public interface StarRocksSinkOptions {
.enumType(StreamLoadFormat.class)
.defaultValue(StreamLoadFormat.JSON)
.withDescription("");

SingleChoiceOption<DataSaveMode> SAVE_MODE =
Options.key(SupportDataSaveMode.SAVE_MODE_KEY)
.singleChoice(
DataSaveMode.class, Arrays.asList(DataSaveMode.KEEP_SCHEMA_AND_DATA))
.defaultValue(DataSaveMode.KEEP_SCHEMA_AND_DATA)
.withDescription(
"Table structure and data processing methods that already exist on the target end");
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@
import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;

import java.util.Collections;
import java.util.List;

@NoArgsConstructor
@AutoService(SeaTunnelSink.class)
public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>
Expand All @@ -56,12 +53,11 @@ public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>

private CatalogTable catalogTable;

public StarRocksSink(
DataSaveMode dataSaveMode, SinkConfig sinkConfig, CatalogTable catalogTable) {
this.dataSaveMode = dataSaveMode;
public StarRocksSink(SinkConfig sinkConfig, CatalogTable catalogTable) {
this.sinkConfig = sinkConfig;
this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
this.catalogTable = catalogTable;
this.dataSaveMode = sinkConfig.getDataSaveMode();
}

@Override
Expand All @@ -77,7 +73,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
if (StringUtils.isEmpty(sinkConfig.getTable()) && catalogTable != null) {
sinkConfig.setTable(catalogTable.getTableId().getTableName());
}
dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
dataSaveMode = sinkConfig.getDataSaveMode();
}

private void autoCreateTable(String template) {
Expand Down Expand Up @@ -117,15 +113,10 @@ public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context co
}

@Override
public DataSaveMode getDataSaveMode() {
public DataSaveMode getUserConfigSaveMode() {
return dataSaveMode;
}

@Override
public List<DataSaveMode> supportedDataSaveModeValues() {
return Collections.singletonList(DataSaveMode.KEEP_SCHEMA_AND_DATA);
}

@Override
public void handleSaveMode(DataSaveMode saveMode) {
if (catalogTable != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
Expand Down Expand Up @@ -56,6 +55,7 @@ public OptionRule optionRule() {
StarRocksSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS,
StarRocksSinkOptions.STARROCKS_CONFIG,
StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
StarRocksSinkOptions.SAVE_MODE,
StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE)
.build();
}
Expand All @@ -67,6 +67,6 @@ public TableSink createSink(TableFactoryContext context) {
if (StringUtils.isBlank(sinkConfig.getTable())) {
sinkConfig.setTable(catalogTable.getTableId().getTableName());
}
return () -> new StarRocksSink(DataSaveMode.KEEP_SCHEMA_AND_DATA, sinkConfig, catalogTable);
return () -> new StarRocksSink(sinkConfig, catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,6 @@ protected SinkExecuteProcessor(
pluginIdentifier);
seaTunnelSink.prepare(sinkConfig);
seaTunnelSink.setJobContext(jobContext);
if (SupportDataSaveMode.class.isAssignableFrom(
seaTunnelSink.getClass())) {
SupportDataSaveMode saveModeSink =
(SupportDataSaveMode) seaTunnelSink;
saveModeSink.checkOptions(sinkConfig);
}
return seaTunnelSink;
})
.distinct()
Expand All @@ -111,7 +105,7 @@ public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
(SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink;
DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode();
saveModeSink.handleSaveMode(dataSaveMode);
}
DataStreamSink<Row> dataStreamSink =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,6 @@ protected SinkExecuteProcessor(
pluginIdentifier);
seaTunnelSink.prepare(sinkConfig);
seaTunnelSink.setJobContext(jobContext);
if (SupportDataSaveMode.class.isAssignableFrom(
seaTunnelSink.getClass())) {
SupportDataSaveMode saveModeSink =
(SupportDataSaveMode) seaTunnelSink;
saveModeSink.checkOptions(sinkConfig);
}
return seaTunnelSink;
})
.distinct()
Expand All @@ -112,7 +106,7 @@ public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
(SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
if (SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink;
DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode();
saveModeSink.handleSaveMode(dataSaveMode);
}
DataStreamSink<Row> dataStreamSink =
Expand Down
Loading