Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][SaveMode] STIP 1: Improve SaveMode design #4771

Closed
2 of 6 tasks
EricJoy2048 opened this issue May 17, 2023 · 3 comments
Closed
2 of 6 tasks

[Feature][SaveMode] STIP 1: Improve SaveMode design #4771

EricJoy2048 opened this issue May 17, 2023 · 3 comments
Labels

Comments

@EricJoy2048
Copy link
Member

EricJoy2048 commented May 17, 2023

Search before asking

  • I had searched in the feature and found no similar feature requirement.

Description

Current state:

  • Discuss
  • Accepted
  • Rejected

Motivation

It is a common requirement to handle data (files or other content) that is already present in the target data source when a data synchronization job starts running. So we need to design a universal interface that is implemented by Sink connectors to handle data that already exists in the target data source.
We will name this function the SaveMode function.

Preconditions

The Sink Connector must implement Catalog Interface

Public Interfaces

org.apache.seatunnel.api.sink.DataSaveMode

DataSaveMode is an enumeration type that internally defines the complete set of SaveMode methods that SeaTunnel can support, and each Sink connector can only implement one or more of them.

package org.apache.seatunnel.api.sink;
 
/**
 * The SaveMode for the Sink connectors that use table or other table structures to organize data
 */
public enum DataSaveMode {
    // Will drop table in MySQL, Will drop path for File Connector.
    DROP_SCHEMA,
 
    // Only drop the data in MySQL, Only drop the files in the path for File Connector.
    KEEP_SCHEMA_DROP_DATA,
 
    // Keep the table and data and continue to write data to the existing table for MySQL. Keep the
    // path and files in the path, create new files in the path.
    KEEP_SCHEMA_AND_DATA,
 
    // The connector provides custom processing methods, such as running user provided SQL or shell scripts, etc
    CUSTOM_PROCESSING,
 
    // Throw error when table is exists for MySQL. Throw error when path is exists.
    ERROR_WHEN_EXISTS
}

org.apache.seatunnel.api.sink.SupportDataSaveMode

The SupportDataSaveMode interface defines an interface that Sink needs to implement if the SaveMode function is implemented.

package org.apache.seatunnel.api.sink;
 
/**
 * The Sink Connectors which support data SaveMode should implement this interface
 */
public interface SupportDataSaveMode {
    /**
     * Return the value of DataSaveMode configured by user in the job config file.
     *
     * @return
     */
    DataSaveMode getUserConfigSaveMode();
 
    /**
     * The implementation of specific logic according to different {@link DataSaveMode}
     */
    void handleSaveMode(DataSaveMode userConfigSaveMode);
}

SaveMode should be processed before the job starts, so in the SeaTunnel Zeta engine, the handleSaveMode method of the connector can be called to process after Sink is built
This code is in the MultipleTableJobConfigParser class

private SinkAction<?, ?, ?, ?> createSinkAction(
            CatalogTable catalogTable,
            Map<TablePath, CatalogTable> sinkTableMap,
            Set<Action> inputActions,
            ReadonlyConfig readonlyConfig,
            ClassLoader classLoader,
            Set<URL> factoryUrls,
            String factoryId,
            int parallelism,
            int configIndex) {
        Optional<CatalogTable> insteadTable;
        if (sinkTableMap.size() == 1) {
            insteadTable = sinkTableMap.values().stream().findFirst();
        } else {
            // TODO: another table full name map
            insteadTable =
                    Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath()));
        }
        if (insteadTable.isPresent()) {
            catalogTable = insteadTable.get();
        }
        SeaTunnelSink<?, ?, ?, ?> sink =
                FactoryUtil.createAndPrepareSink(
                        catalogTable, readonlyConfig, classLoader, factoryId);
        sink.setJobContext(jobConfig.getJobContext());
        SinkConfig actionConfig =
                new SinkConfig(catalogTable.getTableId().toTablePath().toString());
        long id = idGenerator.getNextId();
        String actionName =
                JobConfigParser.createSinkActionName(
                        configIndex, factoryId, actionConfig.getMultipleRowTableId());
        SinkAction<?, ?, ?, ?> sinkAction =
                new SinkAction<>(
                        id,
                        actionName,
                        new ArrayList<>(inputActions),
                        sink,
                        factoryUrls,
                        actionConfig);
        handleSaveMode(sink);
        sinkAction.setParallelism(parallelism);
        return sinkAction;
    }
 
    public static void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
        if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
            SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;
            DataSaveMode dataSaveMode = saveModeSink.getUserConfigSaveMode();
            saveModeSink.handleSaveMode(dataSaveMode);
        }
    }

The processing remains unchanged in Spark and Flink engines. The new interface has removed the checkOption method from SupportDataSaveMode, so the corresponding method calls also need to be deleted.

Proposed Changes

Delete the original logical code

public class SinkCommonOptions {
 
    public static final String DATA_SAVE_MODE = "save_mode";
}

Original code modification

org.apache.seatunnel.api.table.factory.FactoryUtil

Because we defined the getDataSaveModeOption method in SupportDataSaveModeTableSinkFactory, theoretically all connectors that implement DataSaveMode will define their own parameters for the savemode in the factory. So there's no longer a need for it in FactoryUtil
Special handling of the savemode parameter.

/**
 * This method is called by SeaTunnel Web to get the full option rule of a sink.
 *
 * @return Option rule
 */
public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory factory) {
    OptionRule sinkOptionRule = factory.optionRule();
    if (sinkOptionRule == null) {
        throw new FactoryException("sinkOptionRule can not be null");
    }
 
    try {
        TableSink sink = factory.createSink(null);
        if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
            SupportDataSaveMode supportDataSaveModeSink = (SupportDataSaveMode) sink;
            Option<DataSaveMode> saveMode =
                    Options.key(SinkCommonOptions.DATA_SAVE_MODE)
                            .singleChoice(
                                    DataSaveMode.class,
                                    supportDataSaveModeSink.supportedDataSaveModeValues())
                            .noDefaultValue()
                            .withDescription("data save mode");
            OptionRule sinkCommonOptionRule = OptionRule.builder().required(saveMode).build();
            sinkOptionRule
                    .getOptionalOptions()
                    .addAll(sinkCommonOptionRule.getOptionalOptions());
        }
    } catch (Exception e) {
        LOG.warn(
                "Add save mode option need sink connector support create sink by TableSinkFactory");
    }
 
    return sinkOptionRule;
}

Change to:

public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory factory) {
    OptionRule sinkOptionRule = factory.optionRule();
    if (sinkOptionRule == null) {
        throw new FactoryException("sinkOptionRule can not be null");
    }
    return sinkOptionRule;
}

Compatibility

The code logic is incompatible with changes, and existing connectors that support SaveMode need to be modified. Users did not have this feature before using it, so there are no compatibility issues.
The following Sink Connectors needs to be modified:

  • seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java

  • seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java

Test Plan

Test coverage scenarios

  • The automatic table creation function of the above connectors can operate normally.

Rejected Alternatives

None

Risk

None

Usage Scenario

No response

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@TyrantLucifer
Copy link
Member

How's it going? Any progress?

@github-actions
Copy link

github-actions bot commented Jul 9, 2023

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

@github-actions github-actions bot added the stale label Jul 9, 2023
@EricJoy2048 EricJoy2048 moved this from Todo to Doing in SeaTunnel RoadMap Jul 13, 2023
@github-actions
Copy link

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.

@davidzollo davidzollo moved this from Doing to Done in SeaTunnel RoadMap Aug 8, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: Done
Development

No branches or pull requests

2 participants