From 34ecdd501170fac49202cb6d38c838747f129f4b Mon Sep 17 00:00:00 2001 From: liuli Date: Sun, 12 Jun 2022 22:18:34 +0800 Subject: [PATCH 1/3] Add SeaTunnel jdbc sink (#1946) --- .../pom.xml | 5 + .../seatunnel-connectors-seatunnel/pom.xml | 1 + .../pom.xml | 52 ++ .../jdbc/internal/JdbcOutputFormat.java | 257 ++++++++++ .../internal/connection/DataSourceUtils.java | 104 ++++ .../connection/JdbcConnectionProvider.java | 70 +++ .../SimpleJdbcConnectionProvider.java | 154 ++++++ .../executor/BiConsumerWithException.java | 61 +++ .../executor/JdbcBatchStatementExecutor.java | 36 ++ .../executor/JdbcStatementBuilder.java | 29 ++ .../SimpleBatchStatementExecutor.java | 79 +++ .../jdbc/internal/options/JdbcConfig.java | 55 ++ .../options/JdbcConnectorOptions.java | 253 +++++++++ .../internal/xa/GroupXaOperationResult.java | 80 +++ .../internal/xa/SemanticXidGenerator.java | 113 +++++ .../seatunnel/jdbc/internal/xa/XaFacade.java | 113 +++++ .../internal/xa/XaFacadeImplAutoLoad.java | 478 ++++++++++++++++++ .../jdbc/internal/xa/XaGroupOps.java | 43 ++ .../jdbc/internal/xa/XaGroupOpsImpl.java | 151 ++++++ .../jdbc/internal/xa/XidGenerator.java | 62 +++ .../seatunnel/jdbc/internal/xa/XidImpl.java | 136 +++++ .../jdbc/sink/JdbcExactlyOnceSinkWriter.java | 189 +++++++ .../seatunnel/jdbc/sink/JdbcSink.java | 142 ++++++ .../sink/JdbcSinkAggregatedCommitter.java | 91 ++++ .../jdbc/sink/JdbcSinkCommitter.java | 70 +++ .../seatunnel/jdbc/sink/JdbcSinkWriter.java | 96 ++++ .../jdbc/state/JdbcAggregatedCommitInfo.java | 30 ++ .../seatunnel/jdbc/state/JdbcSinkState.java | 31 ++ .../seatunnel/jdbc/state/XidInfo.java | 37 ++ .../seatunnel/jdbc/utils/ExceptionUtils.java | 53 ++ .../seatunnel/jdbc/utils/JdbcUtils.java | 158 ++++++ .../jdbc/utils/ThrowingRunnable.java | 52 ++ 32 files changed, 3281 insertions(+) create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/pom.xml create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/JdbcConnectionProvider.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BiConsumerWithException.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcBatchStatementExecutor.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcStatementBuilder.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConfig.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectorOptions.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/GroupXaOperationResult.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacade.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacadeImplAutoLoad.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidImpl.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkCommitter.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcAggregatedCommitInfo.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSinkState.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/XidInfo.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ExceptionUtils.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java create mode 100644 seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ThrowingRunnable.java diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml index e50ae874b21..15723b1b78b 100644 --- a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml @@ -50,6 +50,11 @@ seatunnel-connector-seatunnel-hive ${project.version} + + org.apache.seatunnel + seatunnel-connector-seatunnel-jdbc + ${project.version} + diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml index 9254ba5fd35..3170ed9ae5f 100644 --- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml @@ -35,5 +35,6 @@ seatunnel-connector-seatunnel-console seatunnel-connector-seatunnel-fake seatunnel-connector-seatunnel-kafka + seatunnel-connector-seatunnel-jdbc \ No newline at end of file diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/pom.xml new file mode 100644 index 00000000000..d5c9ad2797b --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/pom.xml @@ -0,0 +1,52 @@ + + + + + seatunnel-connectors-seatunnel + org.apache.seatunnel + ${revision} + + 4.0.0 + + seatunnel-connector-seatunnel-jdbc + + + + + org.apache.seatunnel + seatunnel-api + ${project.version} + + + + mysql + mysql-connector-java + + + + org.postgresql + postgresql + + + + + \ No newline at end of file diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java new file mode 100644 index 00000000000..f7738140ced --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions; +import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ExceptionUtils; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +/** + * A JDBC outputFormat + */ +public class JdbcOutputFormat> + implements Serializable { + + protected final JdbcConnectionProvider connectionProvider; + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(JdbcOutputFormat.class); + + private final JdbcConnectorOptions jdbcConnectorOptions; + private final StatementExecutorFactory statementExecutorFactory; + + private transient E jdbcStatementExecutor; + private transient int batchCount = 0; + private transient volatile boolean closed = false; + + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture scheduledFuture; + private transient volatile Exception flushException; + + public JdbcOutputFormat( + JdbcConnectionProvider connectionProvider, + JdbcConnectorOptions jdbcConnectorOptions, + StatementExecutorFactory statementExecutorFactory) { + this.connectionProvider = checkNotNull(connectionProvider); + this.jdbcConnectorOptions = checkNotNull(jdbcConnectorOptions); + this.statementExecutorFactory = checkNotNull(statementExecutorFactory); + } + + /** + * Connects to the target database and initializes the prepared statement. + */ + + public void open() + throws IOException { + try { + connectionProvider.getOrEstablishConnection(); + } + catch (Exception e) { + throw new IOException("unable to open JDBC writer", e); + } + jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory); + + if (jdbcConnectorOptions.getBatchIntervalMs() != 0 && jdbcConnectorOptions.getBatchSize() != 1) { + this.scheduler = + Executors.newScheduledThreadPool( + 1, runnable -> { + AtomicInteger cnt = new AtomicInteger(0); + Thread thread = new Thread(runnable); + thread.setDaemon(true); + thread.setName("jdbc-upsert-output-format" + "-" + cnt.incrementAndGet()); + return thread; + }); + this.scheduledFuture = + this.scheduler.scheduleWithFixedDelay( + () -> { + synchronized (JdbcOutputFormat.this) { + if (!closed) { + try { + flush(); + } + catch (Exception e) { + flushException = e; + } + } + } + }, + jdbcConnectorOptions.getBatchIntervalMs(), + jdbcConnectorOptions.getBatchIntervalMs(), + TimeUnit.MILLISECONDS); + } + } + + private E createAndOpenStatementExecutor( + StatementExecutorFactory statementExecutorFactory) + throws IOException { + E exec = statementExecutorFactory.get(); + try { + exec.prepareStatements(connectionProvider.getConnection()); + } + catch (SQLException e) { + throw new IOException("unable to open JDBC writer", e); + } + return exec; + } + + private void checkFlushException() { + if (flushException != null) { + throw new RuntimeException("Writing records to JDBC failed.", flushException); + } + } + + public final synchronized void writeRecord(I record) + throws IOException { + checkFlushException(); + try { + addToBatch(record); + batchCount++; + if (jdbcConnectorOptions.getBatchSize() > 0 + && batchCount >= jdbcConnectorOptions.getBatchSize()) { + flush(); + } + } + catch (Exception e) { + throw new IOException("Writing records to JDBC failed.", e); + } + } + + protected void addToBatch(I record) + throws SQLException { + jdbcStatementExecutor.addToBatch(record); + } + + public synchronized void flush() + throws IOException { + checkFlushException(); + final int sleepMs = 1000; + for (int i = 0; i <= jdbcConnectorOptions.getMaxRetries(); i++) { + try { + attemptFlush(); + batchCount = 0; + break; + } + catch (SQLException e) { + LOG.error("JDBC executeBatch error, retry times = {}", i, e); + if (i >= jdbcConnectorOptions.getMaxRetries()) { + ExceptionUtils.rethrowIOException(e); + } + try { + if (!connectionProvider.isConnectionValid()) { + updateExecutor(true); + } + } + catch (Exception exception) { + LOG.error( + "JDBC connection is not valid, and reestablish connection failed.", + exception); + throw new IOException("Reestablish JDBC connection failed", exception); + } + try { + Thread.sleep(sleepMs * i); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException( + "unable to flush; interrupted while doing another attempt", e); + } + } + } + } + + protected void attemptFlush() + throws SQLException { + jdbcStatementExecutor.executeBatch(); + } + + /** + * Executes prepared statement and closes all resources of this instance. + */ + public synchronized void close() { + if (!closed) { + closed = true; + + if (this.scheduledFuture != null) { + scheduledFuture.cancel(false); + this.scheduler.shutdown(); + } + + if (batchCount > 0) { + try { + flush(); + } + catch (Exception e) { + LOG.warn("Writing records to JDBC failed.", e); + throw new RuntimeException("Writing records to JDBC failed.", e); + } + } + + try { + if (jdbcStatementExecutor != null) { + jdbcStatementExecutor.closeStatements(); + } + } + catch (SQLException e) { + LOG.warn("Close JDBC writer failed.", e); + } + } + connectionProvider.closeConnection(); + checkFlushException(); + } + + public void updateExecutor(boolean reconnect) + throws SQLException, ClassNotFoundException { + jdbcStatementExecutor.closeStatements(); + jdbcStatementExecutor.prepareStatements( + reconnect ? connectionProvider.reestablishConnection() : connectionProvider.getConnection()); + } + + @VisibleForTesting + public Connection getConnection() { + return connectionProvider.getConnection(); + } + + /** + * A factory for creating {@link JdbcBatchStatementExecutor} instance. + * + * @param The type of instance. + */ + public interface StatementExecutorFactory> + extends Supplier, Serializable {} + + ; +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java new file mode 100644 index 00000000000..1e2e213a754 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions; + +import com.google.common.base.CaseFormat; +import lombok.NonNull; + +import javax.sql.CommonDataSource; +import javax.sql.DataSource; + +import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +public class DataSourceUtils + implements Serializable { + private static final String GETTER_PREFIX = "get"; + + private static final String SETTER_PREFIX = "set"; + + public static CommonDataSource buildCommonDataSource(@NonNull JdbcConnectorOptions jdbcConnectorOptions) + throws InvocationTargetException, IllegalAccessException { + CommonDataSource dataSource = (CommonDataSource) loadDataSource(jdbcConnectorOptions.getXaDataSourceClassName()); + setProperties(dataSource, buildDatabaseAccessConfig(jdbcConnectorOptions)); + return dataSource; + } + + private static Map buildDatabaseAccessConfig(JdbcConnectorOptions jdbcConnectorOptions) { + HashMap accessConfig = new HashMap<>(); + accessConfig.put("url", jdbcConnectorOptions.getUrl()); + if (jdbcConnectorOptions.getUsername().isPresent()) { + accessConfig.put("user", jdbcConnectorOptions.getUsername().get()); + } + if (jdbcConnectorOptions.getPassword().isPresent()) { + accessConfig.put("password", jdbcConnectorOptions.getPassword().get()); + } + + return accessConfig; + } + + private static void setProperties(final CommonDataSource commonDataSource, final Map databaseAccessConfig) + throws InvocationTargetException, IllegalAccessException { + for (Map.Entry entry : databaseAccessConfig.entrySet()) { + Optional method = findSetterMethod(commonDataSource.getClass().getMethods(), entry.getKey()); + if (method.isPresent()) { + method.get().invoke(commonDataSource, entry.getValue()); + } + } + } + + private static Method findGetterMethod(final DataSource dataSource, final String propertyName) + throws NoSuchMethodException { + String getterMethodName = GETTER_PREFIX + CaseFormat.LOWER_CAMEL.to(CaseFormat.UPPER_CAMEL, propertyName); + Method result = dataSource.getClass().getMethod(getterMethodName); + result.setAccessible(true); + return result; + } + + private static Optional findSetterMethod(final Method[] methods, final String property) { + String setterMethodName = SETTER_PREFIX + CaseFormat.LOWER_CAMEL.to(CaseFormat.UPPER_CAMEL, property); + return Arrays.stream(methods) + .filter(each -> each.getName().equals(setterMethodName) && 1 == each.getParameterTypes().length) + .findFirst(); + } + + private static Object loadDataSource(final String xaDataSourceClassName) { + Class xaDataSourceClass; + try { + xaDataSourceClass = Thread.currentThread().getContextClassLoader().loadClass(xaDataSourceClassName); + } catch (final ClassNotFoundException ignored) { + try { + xaDataSourceClass = Class.forName(xaDataSourceClassName); + } catch (final ClassNotFoundException ex) { + throw new RuntimeException("Failed to load [" + xaDataSourceClassName + "]", ex); + } + } + try { + return xaDataSourceClass.getDeclaredConstructor().newInstance(); + } catch (final ReflectiveOperationException ex) { + throw new RuntimeException("Failed to instance [" + xaDataSourceClassName + "]", ex); + } + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/JdbcConnectionProvider.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/JdbcConnectionProvider.java new file mode 100644 index 00000000000..94c10dc2106 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/JdbcConnectionProvider.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection; + +import java.sql.Connection; +import java.sql.SQLException; + +/** + * JDBC connection provider. + */ + +public interface JdbcConnectionProvider { + /** + * Get existing connection. + * + * @return existing connection + */ + + Connection getConnection(); + + /** + * Check whether possible existing connection is valid or not through {@link + * Connection#isValid(int)}. + * + * @return true if existing connection is valid + * @throws SQLException sql exception throw from {@link Connection#isValid(int)} + */ + boolean isConnectionValid() + throws SQLException; + + /** + * Get existing connection or establish an new one if there is none. + * + * @return existing connection or newly established connection + * @throws SQLException sql exception + * @throws ClassNotFoundException driver class not found + */ + Connection getOrEstablishConnection() + throws SQLException, ClassNotFoundException; + + /** + * Close possible existing connection. + */ + void closeConnection(); + + /** + * Close possible existing connection and establish an new one. + * + * @return newly established connection + * @throws SQLException sql exception + * @throws ClassNotFoundException driver class not found + */ + Connection reestablishConnection() + throws SQLException, ClassNotFoundException; +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java new file mode 100644 index 00000000000..67c1702f31d --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions; + +import lombok.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Enumeration; +import java.util.Properties; + +/** + * Simple JDBC connection provider. + */ +public class SimpleJdbcConnectionProvider + implements JdbcConnectionProvider, Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(SimpleJdbcConnectionProvider.class); + + private static final long serialVersionUID = 1L; + + private final JdbcConnectorOptions jdbcOptions; + + private transient Driver loadedDriver; + private transient Connection connection; + + static { + // Load DriverManager first to avoid deadlock between DriverManager's + // static initialization block and specific driver class's static + // initialization block when two different driver classes are loading + // concurrently using Class.forName while DriverManager is uninitialized + // before. + // + // This could happen in JDK 8 but not above as driver loading has been + // moved out of DriverManager's static initialization block since JDK 9. + DriverManager.getDrivers(); + } + + public SimpleJdbcConnectionProvider(@NonNull JdbcConnectorOptions jdbcOptions) { + this.jdbcOptions = jdbcOptions; + } + + @Override + public Connection getConnection() { + return connection; + } + + @Override + public boolean isConnectionValid() + throws SQLException { + return connection != null + && connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds()); + } + + private static Driver loadDriver(String driverName) + throws SQLException, ClassNotFoundException { + checkNotNull(driverName); + Enumeration drivers = DriverManager.getDrivers(); + while (drivers.hasMoreElements()) { + Driver driver = drivers.nextElement(); + if (driver.getClass().getName().equals(driverName)) { + return driver; + } + } + + // We could reach here for reasons: + // * Class loader hell of DriverManager(see JDK-8146872). + // * driver is not installed as a service provider. + Class clazz = + Class.forName(driverName, true, Thread.currentThread().getContextClassLoader()); + try { + return (Driver) clazz.newInstance(); + } catch (Exception ex) { + throw new SQLException("Fail to create driver of class " + driverName, ex); + } + } + + private Driver getLoadedDriver() + throws SQLException, ClassNotFoundException { + if (loadedDriver == null) { + loadedDriver = loadDriver(jdbcOptions.getDriverName()); + } + return loadedDriver; + } + + @Override + public Connection getOrEstablishConnection() + throws SQLException, ClassNotFoundException { + if (connection != null) { + return connection; + } + Driver driver = getLoadedDriver(); + Properties info = new Properties(); + if (jdbcOptions.getUsername().isPresent()) { + info.setProperty("user", jdbcOptions.getUsername().get()); + } + if (jdbcOptions.getPassword().isPresent()) { + info.setProperty("password", jdbcOptions.getPassword().get()); + } + connection = driver.connect(jdbcOptions.getUrl(), info); + if (connection == null) { + // Throw same exception as DriverManager.getConnection when no driver found to match + // caller expectation. + throw new SQLException( + "No suitable driver found for " + jdbcOptions.getUrl(), "08001"); + } + + return connection; + } + + @Override + public void closeConnection() { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + LOG.warn("JDBC connection close failed.", e); + } finally { + connection = null; + } + } + } + + @Override + public Connection reestablishConnection() + throws SQLException, ClassNotFoundException { + closeConnection(); + return getOrEstablishConnection(); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BiConsumerWithException.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BiConsumerWithException.java new file mode 100644 index 00000000000..24d9d17d57a --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BiConsumerWithException.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ExceptionUtils; + +import java.util.function.BiConsumer; + +/** + * A checked extension of the {@link BiConsumer} interface. + * + * @param type of the first argument + * @param type of the second argument + * @param type of the thrown exception + */ +public interface BiConsumerWithException { + + /** + * Performs this operation on the given arguments. + * + * @param t the first input argument + * @param u the second input argument + * @throws E in case of an error + */ + void accept(T t, U u) throws E; + + /** + * Convert a {@link BiConsumerWithException} into a {@link BiConsumer}. + * + * @param biConsumerWithException BiConsumer with exception to convert into a {@link + * BiConsumer}. + * @param first input type + * @param second input type + * @return {@link BiConsumer} which rethrows all checked exceptions as unchecked. + */ + static BiConsumer unchecked( + BiConsumerWithException biConsumerWithException) { + return (A a, B b) -> { + try { + biConsumerWithException.accept(a, b); + } catch (Throwable t) { + ExceptionUtils.rethrow(t); + } + }; + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcBatchStatementExecutor.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcBatchStatementExecutor.java new file mode 100644 index 00000000000..ec0bde8dfff --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcBatchStatementExecutor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor; + +import java.sql.Connection; +import java.sql.SQLException; + +/** Executes the given JDBC statement in batch for the accumulated records. */ +public interface JdbcBatchStatementExecutor { + + /** Create statements from connection. */ + void prepareStatements(Connection connection) throws SQLException; + + void addToBatch(T record) throws SQLException; + + /** Submits a batch of commands to the database for execution. */ + void executeBatch() throws SQLException; + + /** Close JDBC related statements. */ + void closeStatements() throws SQLException; +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcStatementBuilder.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcStatementBuilder.java new file mode 100644 index 00000000000..f8b778c49f3 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcStatementBuilder.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor; + +import java.io.Serializable; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** + * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a specific type of + * StreamRecord. + */ +public interface JdbcStatementBuilder + extends BiConsumerWithException, Serializable {} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java new file mode 100644 index 00000000000..bb4340b5fdb --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +/** + * A {@link JdbcBatchStatementExecutor} that executes supplied statement for given the records + * (without any pre-processing). + */ +public class SimpleBatchStatementExecutor implements JdbcBatchStatementExecutor { + + private static final Logger LOG = LoggerFactory.getLogger(SimpleBatchStatementExecutor.class); + + private final String sql; + private final JdbcStatementBuilder parameterSetter; + private final List batch; + + private transient PreparedStatement st; + + public SimpleBatchStatementExecutor( + String sql, JdbcStatementBuilder statementBuilder) { + this.sql = sql; + this.parameterSetter = statementBuilder; + this.batch = new ArrayList(); + } + + @Override + public void prepareStatements(Connection connection) throws SQLException { + this.st = connection.prepareStatement(sql); + } + + @Override + public void addToBatch(T record) { + batch.add(record); + } + + @Override + public void executeBatch() throws SQLException { + if (!batch.isEmpty()) { + for (T r : batch) { + parameterSetter.accept(st, r); + st.addBatch(); + } + st.executeBatch(); + batch.clear(); + } + } + + @Override + public void closeStatements() throws SQLException { + if (st != null) { + st.close(); + st = null; + } + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConfig.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConfig.java new file mode 100644 index 00000000000..01348508cc0 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConfig.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options; + +import java.io.Serializable; + +public class JdbcConfig implements Serializable { + + public static final String URL = "url"; + + public static final String DRIVER = "driver"; + + public static final String CONNECTION_CHECK_TIMEOUT_SEC = "connection_check_timeout_sec"; + + public static final String MAX_RETRIES = "max_retries"; + + public static final String USER = "user"; + + public static final String PASSWORD = "password"; + + public static final String QUERY = "query"; + + public static final String PARALLELISM = "parallelism"; + + + public static final String BATCH_SIZE = "batch_size"; + + public static final String BATCH_INTERVAL_MS = "batch_interval_ms"; + + + public static final String IS_EXACTLY_ONCE = "is_exactly_once"; + + public static final String XA_DATA_SOURCE_CLASS_NAME = "xa_data_source_class_name"; + + + public static final String MAX_COMMIT_ATTEMPTS = "max_commit_attempts"; + + public static final String TRANSACTION_TIMEOUT_SEC = "transaction_timeout_sec"; + +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectorOptions.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectorOptions.java new file mode 100644 index 00000000000..7b1207f1781 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectorOptions.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.NonNull; + +import java.io.Serializable; +import java.util.Optional; + +public class JdbcConnectorOptions + implements Serializable { + private static final long serialVersionUID = 1L; + + private static final int DEFAULT_CONNECTION_CHECK_TIMEOUT_SEC = 30; + private static final int DEFAULT_MAX_RETRIES = 3; + private static final int DEFAULT_BATCH_SIZE = 300; + private static final int DEFAULT_BATCH_INTERVAL_MS = 1000; + private static final boolean DEFAULT_IS_EXACTLY_ONCE = false; + private static final int DEFAULT_MAX_COMMIT_ATTEMPTS = 3; + private static final int DEFAULT_TRANSACTION_TIMEOUT_SEC = -1; + + private String url; + private String driverName; + private int connectionCheckTimeoutSeconds = DEFAULT_CONNECTION_CHECK_TIMEOUT_SEC; + private int maxRetries = DEFAULT_MAX_RETRIES; + private String username; + private String password; + private String query; + + private int batchSize = DEFAULT_BATCH_SIZE; + private int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS; + + private boolean isExactlyOnce = DEFAULT_IS_EXACTLY_ONCE; + private String xaDataSourceClassName; + + private int maxCommitAttempts = DEFAULT_MAX_COMMIT_ATTEMPTS; + + private int transactionTimeoutSec = DEFAULT_TRANSACTION_TIMEOUT_SEC; + + private JdbcConnectorOptions() { + } + + public JdbcConnectorOptions(@NonNull Config config) { + this.url = config.getString(JdbcConfig.URL); + this.driverName = config.getString(JdbcConfig.DRIVER); + if (config.hasPath(JdbcConfig.USER)) { + this.username = config.getString(JdbcConfig.USER); + } + if (config.hasPath(JdbcConfig.PASSWORD)) { + this.password = config.getString(JdbcConfig.PASSWORD); + } + this.query = config.getString(JdbcConfig.QUERY); + + if (config.hasPath(JdbcConfig.MAX_RETRIES)) { + this.maxRetries = config.getInt(JdbcConfig.MAX_RETRIES); + } + if (config.hasPath(JdbcConfig.CONNECTION_CHECK_TIMEOUT_SEC)) { + this.connectionCheckTimeoutSeconds = config.getInt(JdbcConfig.CONNECTION_CHECK_TIMEOUT_SEC); + } + if (config.hasPath(JdbcConfig.BATCH_SIZE)) { + this.batchSize = config.getInt(JdbcConfig.BATCH_SIZE); + } + if (config.hasPath(JdbcConfig.BATCH_INTERVAL_MS)) { + this.batchIntervalMs = config.getInt(JdbcConfig.BATCH_INTERVAL_MS); + } + + if (config.hasPath(JdbcConfig.IS_EXACTLY_ONCE)) { + this.isExactlyOnce = true; + this.xaDataSourceClassName = config.getString(JdbcConfig.XA_DATA_SOURCE_CLASS_NAME); + if (config.hasPath(JdbcConfig.MAX_COMMIT_ATTEMPTS)) { + this.maxCommitAttempts = config.getInt(JdbcConfig.MAX_COMMIT_ATTEMPTS); + } + if (config.hasPath(JdbcConfig.TRANSACTION_TIMEOUT_SEC)) { + this.transactionTimeoutSec = config.getInt(JdbcConfig.TRANSACTION_TIMEOUT_SEC); + } + } + } + + public String getUrl() { + return url; + } + + public String getDriverName() { + return driverName; + } + + public int getConnectionCheckTimeoutSeconds() { + return connectionCheckTimeoutSeconds; + } + + public int getMaxRetries() { + return maxRetries; + } + + public Optional getUsername() { + return Optional.ofNullable(username); + } + + public Optional getPassword() { + return Optional.ofNullable(password); + } + + public String getQuery() { + return query; + } + + public int getBatchSize() { + return batchSize; + } + + public int getBatchIntervalMs() { + return batchIntervalMs; + } + + public boolean isExactlyOnce() { + return isExactlyOnce; + } + + public String getXaDataSourceClassName() { + return xaDataSourceClassName; + } + + public int getMaxCommitAttempts() { + return maxCommitAttempts; + } + + public Optional getTransactionTimeoutSec() { + return transactionTimeoutSec < 0 ? Optional.empty() : Optional.of(transactionTimeoutSec); + } + + public static JdbcConnectorOptionsBuilder builder() { + return new JdbcConnectorOptionsBuilder(); + } + + public static final class JdbcConnectorOptionsBuilder { + private String url; + private String driverName; + private int connectionCheckTimeoutSeconds = DEFAULT_CONNECTION_CHECK_TIMEOUT_SEC; + private int maxRetries = DEFAULT_MAX_RETRIES; + private String username; + private String password; + private String query; + private int batchSize = DEFAULT_BATCH_SIZE; + private int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS; + private boolean isExactlyOnce = DEFAULT_IS_EXACTLY_ONCE; + private String xaDataSourceClassName; + private int maxCommitAttempts = DEFAULT_MAX_COMMIT_ATTEMPTS; + private int transactionTimeoutSec = DEFAULT_TRANSACTION_TIMEOUT_SEC; + + private JdbcConnectorOptionsBuilder() { + } + + public JdbcConnectorOptionsBuilder withUrl(String url) { + this.url = url; + return this; + } + + public JdbcConnectorOptionsBuilder withDriverName(String driverName) { + this.driverName = driverName; + return this; + } + + public JdbcConnectorOptionsBuilder withConnectionCheckTimeoutSeconds(int connectionCheckTimeoutSeconds) { + this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds; + return this; + } + + public JdbcConnectorOptionsBuilder withMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + public JdbcConnectorOptionsBuilder withUsername(String username) { + this.username = username; + return this; + } + + public JdbcConnectorOptionsBuilder withPassword(String password) { + this.password = password; + return this; + } + + public JdbcConnectorOptionsBuilder withQuery(String query) { + this.query = query; + return this; + } + + public JdbcConnectorOptionsBuilder withBatchSize(int batchSize) { + this.batchSize = batchSize; + return this; + } + + public JdbcConnectorOptionsBuilder withBatchIntervalMs(int batchIntervalMs) { + this.batchIntervalMs = batchIntervalMs; + return this; + } + + public JdbcConnectorOptionsBuilder withIsExactlyOnce(boolean isExactlyOnce) { + this.isExactlyOnce = isExactlyOnce; + return this; + } + + public JdbcConnectorOptionsBuilder withXaDataSourceClassName(String xaDataSourceClassName) { + this.xaDataSourceClassName = xaDataSourceClassName; + return this; + } + + public JdbcConnectorOptionsBuilder withMaxCommitAttempts(int maxCommitAttempts) { + this.maxCommitAttempts = maxCommitAttempts; + return this; + } + + public JdbcConnectorOptionsBuilder withTransactionTimeoutSec(int transactionTimeoutSec) { + this.transactionTimeoutSec = transactionTimeoutSec; + return this; + } + + public JdbcConnectorOptions build() { + JdbcConnectorOptions jdbcConnectorOptions = new JdbcConnectorOptions(); + jdbcConnectorOptions.batchSize = this.batchSize; + jdbcConnectorOptions.batchIntervalMs = this.batchIntervalMs; + jdbcConnectorOptions.driverName = this.driverName; + jdbcConnectorOptions.maxRetries = this.maxRetries; + jdbcConnectorOptions.password = this.password; + jdbcConnectorOptions.connectionCheckTimeoutSeconds = this.connectionCheckTimeoutSeconds; + jdbcConnectorOptions.query = this.query; + jdbcConnectorOptions.url = this.url; + jdbcConnectorOptions.username = this.username; + jdbcConnectorOptions.transactionTimeoutSec = this.transactionTimeoutSec; + jdbcConnectorOptions.maxCommitAttempts = this.maxCommitAttempts; + jdbcConnectorOptions.isExactlyOnce = this.isExactlyOnce; + jdbcConnectorOptions.xaDataSourceClassName = this.xaDataSourceClassName; + return jdbcConnectorOptions; + } + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/GroupXaOperationResult.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/GroupXaOperationResult.java new file mode 100644 index 00000000000..15a831be4a9 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/GroupXaOperationResult.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +public class GroupXaOperationResult { + private final List succeeded = new ArrayList<>(); + private final List failed = new ArrayList<>(); + private final List toRetry = new ArrayList<>(); + private Optional failure = Optional.empty(); + private Optional transientFailure = Optional.empty(); + + void failedTransiently(T x, XaFacade.TransientXaException e) { + toRetry.add(x); + transientFailure = + getTransientFailure().isPresent() ? getTransientFailure() : Optional.of(e); + } + + void failed(T x, Exception e) { + failed.add(x); + failure = failure.isPresent() ? failure : Optional.of(e); + } + + void succeeded(T x) { + succeeded.add(x); + } + + private RuntimeException wrapFailure( + Exception error, String formatWithCounts, int errCount) { + return new RuntimeException( + String.format(formatWithCounts, errCount, total()), error); + } + + private int total() { + return succeeded.size() + failed.size() + toRetry.size(); + } + + public List getForRetry() { + return toRetry; + } + + Optional getTransientFailure() { + return transientFailure; + } + + boolean hasNoFailures() { + return !failure.isPresent() && !transientFailure.isPresent(); + } + + void throwIfAnyFailed(String action) { + failure.map( + f -> + wrapFailure( + f, + "failed to " + action + " %d transactions out of %d", + toRetry.size() + failed.size())) + .ifPresent( + f -> { + throw f; + }); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java new file mode 100644 index 00000000000..98825a57a59 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.seatunnel.api.common.SeaTunnelContext; +import org.apache.seatunnel.api.sink.SinkWriter; + +import javax.transaction.xa.Xid; + +import java.security.SecureRandom; +import java.util.Arrays; + +/** + * Generates {@link Xid} from: + * + *
    + *
  1. To provide uniqueness over other jobs and apps, and other instances + *
  2. of this job, gtrid consists of + *
  3. job id (16 bytes) + *
  4. subtask index (4 bytes) + *
  5. checkpoint id (4 bytes) + *
  6. bqual consists of 4 random bytes (generated using {@link SecureRandom}) + *
+ * + *

Each {@link SemanticXidGenerator} instance MUST be used for only one Sink (otherwise Xids will + * collide). + */ +class SemanticXidGenerator + implements XidGenerator { + private static final long serialVersionUID = 1L; + + private static final SecureRandom SECURE_RANDOM = new SecureRandom(); + + private static final int JOB_ID_BYTES = 32; + private static final int FORMAT_ID = 201; + + private transient byte[] gtridBuffer; + private transient byte[] bqualBuffer; + + @Override + public void open() { + // globalTransactionId = job id + task index + checkpoint id + gtridBuffer = new byte[JOB_ID_BYTES + Integer.BYTES + Long.BYTES]; + // branchQualifier = random bytes + bqualBuffer = getRandomBytes(Integer.BYTES); + } + + @Override + public Xid generateXid(SeaTunnelContext context, SinkWriter.Context sinkContext, long checkpointId) { + byte[] jobIdBytes = context.getJobId().getBytes(); + checkArgument(jobIdBytes.length <= JOB_ID_BYTES); + System.arraycopy(jobIdBytes, 0, gtridBuffer, 0, JOB_ID_BYTES); + + writeNumber(sinkContext.getIndexOfSubtask(), Integer.BYTES, gtridBuffer, JOB_ID_BYTES); + writeNumber(checkpointId, Long.BYTES, gtridBuffer, JOB_ID_BYTES + Integer.BYTES); + // relying on arrays copying inside XidImpl constructor + return new XidImpl(FORMAT_ID, gtridBuffer, bqualBuffer); + } + + @Override + public boolean belongsToSubtask(Xid xid, SeaTunnelContext context, SinkWriter.Context sinkContext) { + if (xid.getFormatId() != FORMAT_ID) { + return false; + } + int subtaskIndex = readNumber(xid.getGlobalTransactionId(), JOB_ID_BYTES, Integer.BYTES); + if (subtaskIndex != sinkContext.getIndexOfSubtask() + && subtaskIndex <= sinkContext.getNumberOfParallelSubtasks() - 1) { + return false; + } + byte[] jobIdBytes = new byte[JOB_ID_BYTES]; + System.arraycopy(xid.getGlobalTransactionId(), 0, jobIdBytes, 0, JOB_ID_BYTES); + return Arrays.equals(jobIdBytes, context.getJobId().getBytes()); + } + + private static int readNumber(byte[] bytes, int offset, int numBytes) { + final int number = 0xff; + int result = 0; + for (int i = 0; i < numBytes; i++) { + result |= (bytes[offset + i] & number) << Byte.SIZE * i; + } + return result; + } + + private static void writeNumber(long number, int numBytes, byte[] dst, int dstOffset) { + for (int i = dstOffset; i < dstOffset + numBytes; i++) { + dst[i] = (byte) number; + number >>>= Byte.SIZE; + } + } + + private byte[] getRandomBytes(int size) { + byte[] bytes = new byte[size]; + SECURE_RANDOM.nextBytes(bytes); + return bytes; + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacade.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacade.java new file mode 100644 index 00000000000..2f8b78bee75 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacade.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.Xid; + +import java.io.Serializable; +import java.util.Collection; + +/** + * + *

Typical workflow: + * + *

    + *
  1. {@link #open} + *
  2. {@link #start} transaction + *
  3. {@link #getConnection}, write some data + *
  4. {@link #endAndPrepare} (or {@link #failAndRollback}) + *
  5. {@link #commit} / {@link #rollback} + *
  6. {@link #close} + *
+ * + * {@link #recover} can be used to get abandoned prepared transactions for cleanup. + */ + +public interface XaFacade + extends JdbcConnectionProvider, Serializable, AutoCloseable { + + static XaFacade fromJdbcConnectionOptions( + JdbcConnectorOptions jdbcConnectorOptions) { + return new XaFacadeImplAutoLoad(jdbcConnectorOptions); + } + + void open() throws Exception; + + boolean isOpen(); + + /** Start a new transaction. */ + void start(Xid xid) throws Exception; + + /** End and then prepare the transaction. Transaction can't be resumed afterwards. */ + void endAndPrepare(Xid xid) throws Exception; + + /** + * Commit previously prepared transaction. + * + * @param ignoreUnknown whether to ignore {@link XAException#XAER_NOTA + * XAER_NOTA} error. + */ + void commit(Xid xid, boolean ignoreUnknown) throws TransientXaException; + + /** Rollback previously prepared transaction. */ + void rollback(Xid xid) throws TransientXaException; + + /** + * End transaction as {@link javax.transaction.xa.XAResource#TMFAIL failed}; in case of error, + * try to roll it back. + */ + void failAndRollback(Xid xid) throws TransientXaException; + + /** + * Note: this can block on some non-MVCC databases if there are ended not prepared transactions. + */ + Collection recover() throws TransientXaException; + + /** + * Thrown by {@link XaFacade} when RM responds with {@link + * javax.transaction.xa.XAResource#XA_RDONLY XA_RDONLY} indicating that the transaction doesn't + * include any changes. When such a transaction is committed RM may return an error (usually, + * {@link XAException#XAER_NOTA XAER_NOTA}). + */ + class EmptyXaTransactionException extends RuntimeException { + private final Xid xid; + + EmptyXaTransactionException(Xid xid) { + super("end response XA_RDONLY, xid: " + xid); + this.xid = xid; + } + + public Xid getXid() { + return xid; + } + } + + /** + * Indicates a transient or unknown failure from the resource manager (see {@link + * XAException#XA_RBTRANSIENT XA_RBTRANSIENT}, {@link XAException#XAER_RMFAIL XAER_RMFAIL}). + */ + class TransientXaException extends RuntimeException { + TransientXaException(XAException cause) { + super(cause); + } + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacadeImplAutoLoad.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacadeImplAutoLoad.java new file mode 100644 index 00000000000..d8c969ce76b --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacadeImplAutoLoad.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static javax.transaction.xa.XAException.XAER_NOTA; +import static javax.transaction.xa.XAException.XAER_RMFAIL; +import static javax.transaction.xa.XAException.XA_HEURCOM; +import static javax.transaction.xa.XAException.XA_HEURHAZ; +import static javax.transaction.xa.XAException.XA_HEURMIX; +import static javax.transaction.xa.XAException.XA_HEURRB; +import static javax.transaction.xa.XAException.XA_RBBASE; +import static javax.transaction.xa.XAException.XA_RBTIMEOUT; +import static javax.transaction.xa.XAException.XA_RBTRANSIENT; +import static javax.transaction.xa.XAResource.TMENDRSCAN; +import static javax.transaction.xa.XAResource.TMNOFLAGS; +import static javax.transaction.xa.XAResource.TMSTARTRSCAN; +import static java.util.Optional.empty; +import static java.util.Optional.of; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.DataSourceUtils; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions; +import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ThrowingRunnable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.sql.XAConnection; +import javax.sql.XADataSource; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Default {@link org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade} implementation. + */ +public class XaFacadeImplAutoLoad + implements XaFacade { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(XaFacadeImplAutoLoad.class); + private static final Set TRANSIENT_ERR_CODES = + new HashSet<>(Arrays.asList(XA_RBTRANSIENT, XAER_RMFAIL)); + private static final Set HEUR_ERR_CODES = + new HashSet<>(Arrays.asList(XA_HEURRB, XA_HEURCOM, XA_HEURHAZ, XA_HEURMIX)); + private static final int MAX_RECOVER_CALLS = 100; + + private final JdbcConnectorOptions jdbcConnectorOptions; + private transient XAResource xaResource; + private transient Connection connection; + private transient XAConnection xaConnection; + + XaFacadeImplAutoLoad(JdbcConnectorOptions jdbcConnectorOptions) { + checkState(jdbcConnectorOptions.isExactlyOnce(), "is_exactly_once config error"); + this.jdbcConnectorOptions = jdbcConnectorOptions; + } + + @Override + public void open() throws SQLException { + checkState(!isOpen(), "already connected"); + XADataSource ds; + try { + ds = (XADataSource) DataSourceUtils.buildCommonDataSource(jdbcConnectorOptions); + } + catch (Exception e) { + throw new SQLException(e); + } + xaConnection = ds.getXAConnection(); + xaResource = xaConnection.getXAResource(); + if (jdbcConnectorOptions.getTransactionTimeoutSec().isPresent()) { + try { + xaResource.setTransactionTimeout(jdbcConnectorOptions.getTransactionTimeoutSec().get()); + } + catch (XAException e) { + throw new SQLException(e); + } + } + connection = xaConnection.getConnection(); + connection.setReadOnly(false); + connection.setAutoCommit(false); + checkState(!connection.getAutoCommit()); + } + + @Override + public void close() throws SQLException { + if (connection != null) { + connection.close(); // close connection - likely a wrapper + connection = null; + } + try { + xaConnection.close(); // close likely a pooled AND the underlying connection + } + catch (SQLException e) { + // Some databases (e.g. MySQL) rollback changes on normal client disconnect which + // causes an exception if an XA transaction was prepared. Note that resources are + // still released in case of an error. Pinning MySQL connections doesn't help as + // SuspendableXAConnection has the same close() logic. + // Other DBs don't rollback, e.g. for PgSql the previous connection.close() call + // disassociates the connection (and that call works because it has a check for XA) + // and rollback() is not called. + // In either case, not closing the XA connection here leads to the resource leak. + LOG.warn("unable to close XA connection", e); + } + xaResource = null; + } + + @Override + public Connection getConnection() { + checkNotNull(connection); + return connection; + } + + @Override + public boolean isConnectionValid() throws SQLException { + return isOpen() && connection.isValid(connection.getNetworkTimeout()); + } + + @Override + public Connection getOrEstablishConnection() throws SQLException { + if (!isOpen()) { + open(); + } + return connection; + } + + @Override + public void closeConnection() { + try { + close(); + } + catch (SQLException e) { + LOG.warn("Connection close failed.", e); + } + } + + @Override + public Connection reestablishConnection() { + throw new UnsupportedOperationException(); + } + + @Override + public void start(Xid xid) { + execute(Command.fromRunnable("start", xid, () -> xaResource.start(xid, TMNOFLAGS))); + } + + @Override + public void endAndPrepare(Xid xid) { + execute(Command.fromRunnable("end", xid, () -> xaResource.end(xid, XAResource.TMSUCCESS))); + int prepResult = execute(new Command<>("prepare", of(xid), () -> xaResource.prepare(xid))); + if (prepResult == XAResource.XA_RDONLY) { + throw new EmptyXaTransactionException(xid); + } + else if (prepResult != XAResource.XA_OK) { + throw new RuntimeException( + formatErrorMessage("prepare", of(xid), empty(), "response: " + prepResult)); + } + } + + @Override + public void failAndRollback(Xid xid) { + execute( + Command.fromRunnable( + "end (fail)", + xid, + () -> { + xaResource.end(xid, XAResource.TMFAIL); + xaResource.rollback(xid); + }, + err -> { + if (err.errorCode >= XA_RBBASE) { + rollback(xid); + } + else { + LOG.warn( + formatErrorMessage( + "end (fail)", of(xid), of(err.errorCode))); + } + })); + } + + @Override + public void commit(Xid xid, boolean ignoreUnknown) { + execute( + Command.fromRunnableRecoverByWarn( + "commit", + xid, + () -> + xaResource.commit( + xid, + false /* not onePhase because the transaction should be prepared already */), + e -> buildCommitErrorDesc(e, ignoreUnknown))); + } + + @Override + public void rollback(Xid xid) { + execute( + Command.fromRunnableRecoverByWarn( + "rollback", + xid, + () -> xaResource.rollback(xid), + this::buildRollbackErrorDesc)); + } + + private void forget(Xid xid) { + execute( + Command.fromRunnableRecoverByWarn( + "forget", + xid, + () -> xaResource.forget(xid), + e -> of("manual cleanup may be required"))); + } + + @Override + public Collection recover() { + return execute( + new Command<>( + "recover", + empty(), + () -> { + List list = recover(TMSTARTRSCAN); + try { + for (int i = 0; list.addAll(recover(TMNOFLAGS)); i++) { + // H2 sometimes returns same tx list here - should probably use + // recover(TMSTARTRSCAN | TMENDRSCAN) + checkState( + i < MAX_RECOVER_CALLS, "too many xa_recover() calls"); + } + } + finally { + recover(TMENDRSCAN); + } + return list; + })); + } + + @Override + public boolean isOpen() { + return xaResource != null; + } + + private List recover(int flags) throws XAException { + return Arrays.asList(xaResource.recover(flags)); + } + + private T execute(Command cmd) throws RuntimeException { + checkState(isOpen(), "not connected"); + LOG.debug("{}, xid={}", cmd.name, cmd.xid); + try { + T result = cmd.callable.call(); + LOG.trace("{} succeeded , xid={}", cmd.name, cmd.xid); + return result; + } + catch (XAException e) { + if (HEUR_ERR_CODES.contains(e.errorCode)) { + cmd.xid.ifPresent(this::forget); + } + return cmd.recover.apply(e).orElseThrow(() -> wrapException(cmd.name, cmd.xid, e)); + } + catch (RuntimeException e) { + throw e; + } + catch (Exception e) { + throw wrapException(cmd.name, cmd.xid, e); + } + } + + private static RuntimeException wrapException( + String action, Optional xid, Exception ex) { + if (ex instanceof XAException) { + XAException xa = (XAException) ex; + if (TRANSIENT_ERR_CODES.contains(xa.errorCode)) { + throw new TransientXaException(xa); + } + else { + throw new RuntimeException( + formatErrorMessage(action, xid, of(xa.errorCode), xa.getMessage())); + } + } + else { + throw new RuntimeException( + formatErrorMessage(action, xid, empty(), ex.getMessage()), ex); + } + } + + private Optional buildCommitErrorDesc(XAException err, boolean ignoreUnknown) { + if (err.errorCode == XA_HEURCOM) { + return Optional.of("transaction was heuristically committed earlier"); + } + else if (ignoreUnknown && err.errorCode == XAER_NOTA) { + return Optional.of("transaction is unknown to RM (ignoring)"); + } + else { + return empty(); + } + } + + private Optional buildRollbackErrorDesc(XAException err) { + if (err.errorCode == XA_HEURRB) { + return Optional.of("transaction was already heuristically rolled back"); + } + else if (err.errorCode >= XA_RBBASE) { + return Optional.of("transaction was already marked for rollback"); + } + else { + return empty(); + } + } + + private static String formatErrorMessage( + String action, Optional xid, Optional errorCode, String... more) { + return String.format( + "unable to %s%s%s%s", + action, + xid.map(x -> " XA transaction, xid: " + x).orElse(""), + errorCode + .map(code -> String.format(", error %d: %s", code, descError(code))) + .orElse(""), + more == null || more.length == 0 ? "" : ". " + Arrays.toString(more)); + } + + /** + * @return error description from {@link XAException} javadoc from to ease debug. + */ + private static String descError(int code) { + switch (code) { + case XA_HEURCOM: + return "heuristic commit decision was made"; + case XAException.XA_HEURHAZ: + return "heuristic decision may have been made"; + case XAException.XA_HEURMIX: + return "heuristic mixed decision was made"; + case XA_HEURRB: + return "heuristic rollback decision was made"; + case XAException.XA_NOMIGRATE: + return "the transaction resumption must happen where the suspension occurred"; + case XAException.XA_RBCOMMFAIL: + return "rollback happened due to a communications failure"; + case XAException.XA_RBDEADLOCK: + return "rollback happened because deadlock was detected"; + case XAException.XA_RBINTEGRITY: + return "rollback happened because an internal integrity check failed"; + case XAException.XA_RBOTHER: + return "rollback happened for some reason not fitting any of the other rollback error codes"; + case XAException.XA_RBPROTO: + return "rollback happened due to a protocol error in the resource manager"; + case XAException.XA_RBROLLBACK: + return "rollback happened for an unspecified reason"; + case XA_RBTIMEOUT: + return "rollback happened because of a timeout"; + case XA_RBTRANSIENT: + return "rollback happened due to a transient failure"; + case XAException.XA_RDONLY: + return "the transaction branch was read-only, and has already been committed"; + case XAException.XA_RETRY: + return "the method invoked returned without having any effect, and that it may be invoked again"; + case XAException.XAER_ASYNC: + return "an asynchronous operation is outstanding"; + case XAException.XAER_DUPID: + return "Xid given as an argument is already known to the resource manager"; + case XAException.XAER_INVAL: + return "invalid arguments were passed"; + case XAER_NOTA: + return "Xid is not valid"; + case XAException.XAER_OUTSIDE: + return "the resource manager is doing work outside the global transaction"; + case XAException.XAER_PROTO: + return "protocol error"; + case XAException.XAER_RMERR: + return "resource manager error has occurred"; + case XAER_RMFAIL: + return "the resource manager has failed and is not available"; + default: + return ""; + } + } + + private static class Command { + private final String name; + private final Optional xid; + private final Callable callable; + private final Function> recover; + + static Command fromRunnable( + String action, Xid xid, ThrowingRunnable runnable) { + return fromRunnable( + action, + xid, + runnable, + e -> { + throw wrapException(action, of(xid), e); + }); + } + + static Command fromRunnableRecoverByWarn( + String action, + Xid xid, + ThrowingRunnable runnable, + Function> err2msg) { + return fromRunnable( + action, + xid, + runnable, + e -> + LOG.warn( + formatErrorMessage( + action, + of(xid), + of(e.errorCode), + err2msg.apply(e) + .orElseThrow( + () -> + wrapException( + action, of(xid), e))))); + } + + private static Command fromRunnable( + String action, + Xid xid, + ThrowingRunnable runnable, + Consumer recover) { + return new Command<>( + action, + of(xid), + () -> { + runnable.run(); + return null; + }, + e -> { + recover.accept(e); + return Optional.of(""); + }); + } + + private Command(String name, Optional xid, Callable callable) { + this(name, xid, callable, e -> empty()); + } + + private Command( + String name, + Optional xid, + Callable callable, + Function> recover) { + this.name = name; + this.xid = xid; + this.callable = callable; + this.recover = recover; + } + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java new file mode 100644 index 00000000000..e37e6b05a47 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa; + +import org.apache.seatunnel.api.common.SeaTunnelContext; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo; + +import javax.transaction.xa.Xid; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; + +public interface XaGroupOps + extends Serializable { + + // Commit a batch of transactions + public GroupXaOperationResult commit( + List xids, boolean allowOutOfOrderCommits, int maxCommitAttempts); + + void rollback(List xids); + + GroupXaOperationResult failAndRollback(Collection xids); + + void recoverAndRollback(SeaTunnelContext context, SinkWriter.Context sinkContext, XidGenerator xidGenerator, Xid excludeXid); + +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java new file mode 100644 index 00000000000..05ecce1603e --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa; + +import org.apache.seatunnel.api.common.SeaTunnelContext; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.transaction.xa.Xid; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +public class XaGroupOpsImpl + implements XaGroupOps { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(XaGroupOpsImpl.class); + + private final XaFacade xaFacade; + + public XaGroupOpsImpl(XaFacade xaFacade) { + this.xaFacade = xaFacade; + } + + @Override + public GroupXaOperationResult commit( + List xids, boolean allowOutOfOrderCommits, int maxCommitAttempts) { + GroupXaOperationResult result = new GroupXaOperationResult<>(); + int origSize = xids.size(); + LOG.debug("commit {} transactions", origSize); + for (Iterator i = xids.iterator(); + i.hasNext() && (result.hasNoFailures() || allowOutOfOrderCommits); ) { + XidInfo x = i.next(); + i.remove(); + try { + xaFacade.commit(x.getXid(), false); + result.succeeded(x); + } catch (XaFacade.TransientXaException e) { + result.failedTransiently(x.withAttemptsIncremented(), e); + } catch (Exception e) { + result.failed(x, e); + } + } + result.getForRetry().addAll(xids); + result.throwIfAnyFailed("commit"); + throwIfAnyReachedMaxAttempts(result, maxCommitAttempts); + result.getTransientFailure() + .ifPresent( + f -> + LOG.warn( + "failed to commit {} transactions out of {} (keep them to retry later)", + result.getForRetry().size(), + origSize, + f)); + return result; + } + + @Override + public void rollback(List xids) { + for (XidInfo x : xids) { + xaFacade.rollback(x.getXid()); + } + } + + @Override + public GroupXaOperationResult failAndRollback(Collection xids) { + GroupXaOperationResult result = new GroupXaOperationResult<>(); + if (xids.isEmpty()) { + return result; + } + if (LOG.isDebugEnabled()) { + LOG.debug("rolling back {} transactions: {}", xids.size(), xids); + } + for (XidInfo x : xids) { + try { + xaFacade.failAndRollback(x.getXid()); + result.succeeded(x); + } catch (XaFacade.TransientXaException e) { + LOG.info("unable to fail/rollback transaction, xid={}: {}", x, e.getMessage()); + result.failedTransiently(x, e); + } catch (Exception e) { + LOG.warn("unable to fail/rollback transaction, xid={}: {}", x, e.getMessage()); + result.failed(x, e); + } + } + if (!result.getForRetry().isEmpty()) { + LOG.info("failed to roll back {} transactions", result.getForRetry().size()); + } + return result; + } + + @Override + public void recoverAndRollback(SeaTunnelContext context, SinkWriter.Context sinkContext, XidGenerator xidGenerator, Xid excludeXid) { + Collection recovered = xaFacade.recover(); + recovered.remove(excludeXid); + if (recovered.isEmpty()) { + return; + } + LOG.warn("rollback {} recovered transactions", recovered.size()); + for (Xid xid : recovered) { + if (xidGenerator.belongsToSubtask(xid, context, sinkContext)) { + try { + xaFacade.rollback(xid); + } catch (Exception e) { + LOG.info("unable to rollback recovered transaction, xid={}", xid, e); + } + } + } + } + + private static void throwIfAnyReachedMaxAttempts( + GroupXaOperationResult result, int maxAttempts) { + List reached = null; + for (XidInfo x : result.getForRetry()) { + if (x.getAttempts() >= maxAttempts) { + if (reached == null) { + reached = new ArrayList<>(); + } + reached.add(x); + } + } + if (reached != null) { + throw new RuntimeException( + String.format( + "reached max number of commit attempts (%d) for transactions: %s", + maxAttempts, reached)); + } + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java new file mode 100644 index 00000000000..a801750540f --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa; + +import org.apache.seatunnel.api.common.SeaTunnelContext; +import org.apache.seatunnel.api.sink.SinkWriter; + +import javax.transaction.xa.Xid; + +import java.io.Serializable; +import java.security.SecureRandom; + +/** + * {@link Xid} generator. + */ +public interface XidGenerator + extends Serializable, AutoCloseable { + + Xid generateXid(SeaTunnelContext context, SinkWriter.Context sinkContext, long checkpointId); + + default void open() {} + + /** + * @return true if the provided transaction belongs to this subtask + */ + boolean belongsToSubtask(Xid xid, SeaTunnelContext context, SinkWriter.Context sinkContext); + + @Override + default void close() {} + + /** + * Creates a {@link XidGenerator} that generates {@link Xid xids} from: + * + *
    + *
  1. job id + *
  2. subtask index + *
  3. checkpoint id + *
  4. four random bytes generated using {@link SecureRandom}) + *
+ * + *

Each created {@link XidGenerator} instance MUST be used for only one Sink instance + * (otherwise Xids could collide). + */ + static XidGenerator semanticXidGenerator() { + return new SemanticXidGenerator(); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidImpl.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidImpl.java new file mode 100644 index 00000000000..f5b1f0576fc --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidImpl.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa; + +import static com.google.common.base.Preconditions.checkArgument; + +import javax.transaction.xa.Xid; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Objects; + +/** + * A simple {@link Xid} implementation that stores branch and global transaction identifiers as byte + * arrays. + */ +final class XidImpl implements Xid, Serializable { + + private static final long serialVersionUID = 1L; + private static final char[] HEX_CHARS = { + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' + }; + + private final int formatId; + private final byte[] globalTransactionId; + private final byte[] branchQualifier; + + public XidImpl(int formatId, byte[] globalTransactionId, byte[] branchQualifier) { + checkArgument(globalTransactionId.length <= Xid.MAXGTRIDSIZE); + checkArgument(branchQualifier.length <= Xid.MAXBQUALSIZE); + this.formatId = formatId; + this.globalTransactionId = Arrays.copyOf(globalTransactionId, globalTransactionId.length); + this.branchQualifier = Arrays.copyOf(branchQualifier, branchQualifier.length); + } + + @Override + public int getFormatId() { + return formatId; + } + + @Override + public byte[] getGlobalTransactionId() { + return globalTransactionId; + } + + @Override + public byte[] getBranchQualifier() { + return branchQualifier; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof XidImpl)) { + return false; + } + XidImpl xid = (XidImpl) o; + return formatId == xid.formatId + && Arrays.equals(globalTransactionId, xid.globalTransactionId) + && Arrays.equals(branchQualifier, xid.branchQualifier); + } + + @Override + public int hashCode() { + final int number = 31; + int result = Objects.hash(formatId); + result = number * result + Arrays.hashCode(globalTransactionId); + result = number * result + Arrays.hashCode(branchQualifier); + return result; + } + + @Override + public String toString() { + return formatId + + ":" + + byteToHexString(globalTransactionId) + + ":" + + byteToHexString(branchQualifier); + } + + /** + * Given an array of bytes it will convert the bytes to a hex string representation of the + * bytes. + * + * @param bytes the bytes to convert in a hex string + * @param start start index, inclusively + * @param end end index, exclusively + * @return hex string representation of the byte array + */ + public static String byteToHexString(final byte[] bytes, final int start, final int end) { + final int number0xf0 = 0xF0; + final int number0x0f = 0x0F; + final int number4 = 4; + if (bytes == null) { + throw new IllegalArgumentException("bytes == null"); + } + + int length = end - start; + char[] out = new char[length * 2]; + + for (int i = start, j = 0; i < end; i++) { + out[j++] = HEX_CHARS[(number0xf0 & bytes[i]) >>> number4]; + out[j++] = HEX_CHARS[number0x0f & bytes[i]]; + } + + return new String(out); + } + + /** + * Given an array of bytes it will convert the bytes to a hex string representation of the + * bytes. + * + * @param bytes the bytes to convert in a hex string + * @return hex string representation of the byte array + */ + public static String byteToHexString(final byte[] bytes) { + return byteToHexString(bytes, 0, bytes.length); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java new file mode 100644 index 00000000000..849ca50c890 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.sink; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import org.apache.seatunnel.api.common.SeaTunnelContext; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcStatementBuilder; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XidGenerator; +import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState; +import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo; +import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ExceptionUtils; + +import org.apache.commons.lang3.SerializationUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.transaction.xa.Xid; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +public class JdbcExactlyOnceSinkWriter + implements SinkWriter { + private static final Logger LOG = LoggerFactory.getLogger(JdbcExactlyOnceSinkWriter.class); + + private final SinkWriter.Context sinkcontext; + + private final SeaTunnelContext context; + + private final List recoverStates; + + private final XaFacade xaFacade; + + private final XaGroupOps xaGroupOps; + + private final XidGenerator xidGenerator; + + private final JdbcOutputFormat> outputFormat; + + private transient boolean isOpen; + + private transient Xid currentXid; + private transient Xid prepareXid; + + public JdbcExactlyOnceSinkWriter( + SinkWriter.Context sinkcontext, + SeaTunnelContext context, + JdbcStatementBuilder statementBuilder, + JdbcConnectorOptions jdbcConnectorOptions, + List states) { + checkArgument( + jdbcConnectorOptions.getMaxRetries() == 0, + "JDBC XA sink requires maxRetries equal to 0, otherwise it could " + + "cause duplicates."); + + this.context = context; + this.sinkcontext = sinkcontext; + this.recoverStates = states; + this.xidGenerator = XidGenerator.semanticXidGenerator(); + this.xaFacade = XaFacade.fromJdbcConnectionOptions( + jdbcConnectorOptions); + + this.outputFormat = new JdbcOutputFormat<>( + xaFacade, + jdbcConnectorOptions, + () -> new SimpleBatchStatementExecutor<>(jdbcConnectorOptions.getQuery(), statementBuilder)); + + this.xaGroupOps = new XaGroupOpsImpl(xaFacade); + } + + private void tryOpen() throws IOException { + if (!isOpen) { + isOpen = true; + try { + xidGenerator.open(); + xaFacade.open(); + outputFormat.open(); + if (!recoverStates.isEmpty()) { + Xid xid = recoverStates.get(0).getXid(); + // Rollback pending transactions that should not include recoverStates + xaGroupOps.recoverAndRollback(context, sinkcontext, xidGenerator, xid); + } + beginTx(); + } catch (Exception e) { + ExceptionUtils.rethrowIOException(e); + } + } + } + + @Override + public List snapshotState(long checkpointId) { + checkState(prepareXid != null, "prepare xid must not be null"); + return Collections.singletonList(new JdbcSinkState(prepareXid)); + } + + @Override + public void write(SeaTunnelRow element) + throws IOException { + tryOpen(); + checkState(currentXid != null, "current xid must not be null"); + SeaTunnelRow copy = SerializationUtils.clone(element); + outputFormat.writeRecord(copy); + } + + @Override + public Optional prepareCommit() + throws IOException { + prepareCurrentTx(); + this.currentXid = null; + beginTx(); + checkState(prepareXid != null, "prepare xid must not be null"); + return Optional.of(new XidInfo(prepareXid, 0)); + } + + @Override + public void abortPrepare() { + + } + + @Override + public void close() + throws IOException { + if (currentXid != null && xaFacade.isOpen()) { + try { + LOG.debug("remove current transaction before closing, xid={}", currentXid); + xaFacade.failAndRollback(currentXid); + } catch (Exception e) { + LOG.warn("unable to fail/rollback current transaction, xid={}", currentXid, e); + } + } + try { + xaFacade.close(); + } catch (Exception e) { + ExceptionUtils.rethrowIOException(e); + } + xidGenerator.close(); + currentXid = null; + prepareXid = null; + } + + private void beginTx() throws IOException { + checkState(currentXid == null, "currentXid not null"); + currentXid = xidGenerator.generateXid(context, sinkcontext, System.currentTimeMillis()); + try { + xaFacade.start(currentXid); + } catch (Exception e) { + ExceptionUtils.rethrowIOException(e); + } + } + + private void prepareCurrentTx() throws IOException { + checkState(currentXid != null, "no current xid"); + outputFormat.flush(); + try { + xaFacade.endAndPrepare(currentXid); + prepareXid = currentXid; + } catch (Exception e) { + ExceptionUtils.rethrowIOException(e); + } + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java new file mode 100644 index 00000000000..bd37cde4c2c --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.sink; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelContext; +import org.apache.seatunnel.api.serialization.DefaultSerializer; +import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcStatementBuilder; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions; +import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState; +import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo; +import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +@AutoService(SeaTunnelSink.class) +public class JdbcSink + implements SeaTunnelSink { + + private Config pluginConfig; + + private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo; + + private SeaTunnelContext seaTunnelContext; + + private JdbcConnectorOptions jdbcConnectorOptions; + + @Override + public String getPluginName() { + return "jdbc"; + } + + @Override + public void prepare(Config pluginConfig) + throws PrepareFailException { + this.pluginConfig = pluginConfig; + this.jdbcConnectorOptions = new JdbcConnectorOptions(this.pluginConfig); + } + + @Override + public SinkWriter createWriter(SinkWriter.Context context) + throws IOException { + SinkWriter sinkWriter; + // TODO SeatunnelTyoeInfo is not good enough to get typesArray + JdbcStatementBuilder statementBuilder = (st, row) -> JdbcUtils.setRecordToStatement(st, null, row); + if (jdbcConnectorOptions.isExactlyOnce()) { + sinkWriter = new JdbcExactlyOnceSinkWriter( + context, + seaTunnelContext, + statementBuilder, + jdbcConnectorOptions, + new ArrayList<>() + ); + } else { + sinkWriter = new JdbcSinkWriter( + context, + statementBuilder, + jdbcConnectorOptions); + } + + return sinkWriter; + } + + @Override + public SinkWriter restoreWriter(SinkWriter.Context context, List states) + throws IOException { + if (jdbcConnectorOptions.isExactlyOnce()) { + JdbcStatementBuilder statementBuilder = (st, row) -> JdbcUtils.setRecordToStatement(st, null, row); + return new JdbcExactlyOnceSinkWriter( + context, + seaTunnelContext, + statementBuilder, + jdbcConnectorOptions, + states + ); + } + return SeaTunnelSink.super.restoreWriter(context, states); + } + + @Override + public Optional> createAggregatedCommitter() + throws IOException { + if (jdbcConnectorOptions.isExactlyOnce()) { + return Optional.of(new JdbcSinkAggregatedCommitter(jdbcConnectorOptions)); + } + return Optional.empty(); + } + + @Override + public void setTypeInfo(SeaTunnelRowTypeInfo seaTunnelRowTypeInfo) { + this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo; + } + + @Override + public SeaTunnelContext getSeaTunnelContext() { + return seaTunnelContext; + } + + @Override + public Optional> getAggregatedCommitInfoSerializer() { + return Optional.of(new DefaultSerializer<>()); + } + + @Override + public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) { + this.seaTunnelContext = seaTunnelContext; + } + + @Override + public Optional> getCommitInfoSerializer() { + return Optional.of(new DefaultSerializer<>()); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java new file mode 100644 index 00000000000..e5c9f308bb1 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.sink; + +import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.GroupXaOperationResult; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl; +import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo; +import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ExceptionUtils; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +public class JdbcSinkAggregatedCommitter + implements SinkAggregatedCommitter { + + private final XaFacade xaFacade; + private final XaGroupOps xaGroupOps; + private final JdbcConnectorOptions jdbcConnectorOptions; + + public JdbcSinkAggregatedCommitter( + JdbcConnectorOptions jdbcConnectorOptions + ) { + this.xaFacade = XaFacade.fromJdbcConnectionOptions( + jdbcConnectorOptions); + this.xaGroupOps = new XaGroupOpsImpl(xaFacade); + this.jdbcConnectorOptions = jdbcConnectorOptions; + } + + private void tryOpen() throws IOException { + if (!xaFacade.isOpen()) { + try { + xaFacade.open(); + } catch (Exception e) { + ExceptionUtils.rethrowIOException(e); + } + } + } + + @Override + public List commit(List aggregatedCommitInfos) throws IOException { + tryOpen(); + return aggregatedCommitInfos.stream().map(aggregatedCommitInfo -> { + GroupXaOperationResult result = xaGroupOps.commit(aggregatedCommitInfo.getXidInfoList(), false, jdbcConnectorOptions.getMaxCommitAttempts()); + return new JdbcAggregatedCommitInfo(result.getForRetry()); + }).filter(ainfo -> !ainfo.getXidInfoList().isEmpty()).collect(Collectors.toList()); + } + + @Override + public JdbcAggregatedCommitInfo combine(List commitInfos) { + return new JdbcAggregatedCommitInfo(commitInfos); + } + + @Override + public void abort(List aggregatedCommitInfo) throws IOException { + tryOpen(); + for (JdbcAggregatedCommitInfo commitInfos : aggregatedCommitInfo) { + xaGroupOps.rollback(commitInfos.getXidInfoList()); + } + } + + @Override + public void close() + throws IOException { + try { + xaFacade.close(); + } catch (Exception e) { + ExceptionUtils.rethrowIOException(e); + } + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkCommitter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkCommitter.java new file mode 100644 index 00000000000..626f379a550 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkCommitter.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.sink; + +import org.apache.seatunnel.api.sink.SinkCommitter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl; +import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo; +import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ExceptionUtils; + +import java.io.IOException; +import java.util.List; + +public class JdbcSinkCommitter + implements SinkCommitter { + private final XaFacade xaFacade; + private final XaGroupOps xaGroupOps; + private final JdbcConnectorOptions jdbcConnectorOptions; + + public JdbcSinkCommitter( + JdbcConnectorOptions jdbcConnectorOptions + ) + throws IOException { + this.jdbcConnectorOptions = jdbcConnectorOptions; + this.xaFacade = XaFacade.fromJdbcConnectionOptions( + jdbcConnectorOptions); + this.xaGroupOps = new XaGroupOpsImpl(xaFacade); + try { + xaFacade.open(); + } + catch (Exception e) { + ExceptionUtils.rethrowIOException(e); + } + } + + @Override + public List commit(List committables) { + return xaGroupOps + .commit(committables, false, jdbcConnectorOptions.getMaxCommitAttempts()) + .getForRetry(); + } + + @Override + public void abort(List commitInfos) + throws IOException { + try { + xaGroupOps.rollback(commitInfos); + } + catch (Exception e) { + ExceptionUtils.rethrowIOException(e); + } + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java new file mode 100644 index 00000000000..548f10735ee --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.sink; + +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcStatementBuilder; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions; +import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState; +import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo; + +import org.apache.commons.lang3.SerializationUtils; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +public class JdbcSinkWriter implements SinkWriter { + + private final JdbcOutputFormat> outputFormat; + private final SinkWriter.Context context; + private transient boolean isOpen; + + public JdbcSinkWriter( + SinkWriter.Context context, + JdbcStatementBuilder statementBuilder, + JdbcConnectorOptions jdbcConnectorOptions) { + + JdbcConnectionProvider connectionProvider = new SimpleJdbcConnectionProvider(jdbcConnectorOptions); + + this.context = context; + this.outputFormat = new JdbcOutputFormat<>( + connectionProvider, + jdbcConnectorOptions, + () -> new SimpleBatchStatementExecutor<>(jdbcConnectorOptions.getQuery(), statementBuilder)); + } + + private void tryOpen() throws IOException { + if (!isOpen) { + isOpen = true; + outputFormat.open(); + } + } + + @Override + public List snapshotState(long checkpointId) { + return Collections.emptyList(); + } + + @Override + public void write(SeaTunnelRow element) + throws IOException { + tryOpen(); + SeaTunnelRow copy = SerializationUtils.clone(element); + outputFormat.writeRecord(copy); + } + + @Override + public Optional prepareCommit() + throws IOException { + outputFormat.flush(); + return Optional.empty(); + } + + @Override + public void abortPrepare() { + + } + + @Override + public void close() + throws IOException { + outputFormat.close(); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcAggregatedCommitInfo.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcAggregatedCommitInfo.java new file mode 100644 index 00000000000..e04a06dc1f8 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcAggregatedCommitInfo.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.state; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +@AllArgsConstructor +public class JdbcAggregatedCommitInfo implements Serializable { + private final List xidInfoList; +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSinkState.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSinkState.java new file mode 100644 index 00000000000..d3261e8c16c --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSinkState.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.state; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import javax.transaction.xa.Xid; + +import java.io.Serializable; + +@Data +@AllArgsConstructor +public class JdbcSinkState implements Serializable { + private final Xid xid; +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/XidInfo.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/XidInfo.java new file mode 100644 index 00000000000..80cbe63ff52 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/XidInfo.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.state; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import javax.transaction.xa.Xid; + +import java.io.Serializable; + +@Data +@AllArgsConstructor +public class XidInfo implements Serializable { + + final Xid xid; + final int attempts; + + public XidInfo withAttemptsIncremented() { + return new XidInfo(xid, attempts + 1); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ExceptionUtils.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ExceptionUtils.java new file mode 100644 index 00000000000..f1d66d82f4b --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ExceptionUtils.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.utils; + +import java.io.IOException; + +public class ExceptionUtils { + public static void rethrow(Throwable t) { + if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new RuntimeException(t); + } + } + + /** + * Re-throws the given {@code Throwable} in scenarios where the signatures allows only + * IOExceptions (and RuntimeException and Error). + * + *

Throws this exception directly, if it is an IOException, a RuntimeException, or an Error. + * Otherwise it wraps it in an IOException and throws it. + * + * @param t The Throwable to be thrown. + */ + public static void rethrowIOException(Throwable t) throws IOException { + if (t instanceof IOException) { + throw (IOException) t; + } else if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else { + throw new IOException(t.getMessage(), t); + } + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java new file mode 100644 index 00000000000..b3b32f5c26a --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.utils; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** Utils for jdbc connectors. */ +public class JdbcUtils { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class); + + /** + * Adds a record to the prepared statement. + * + *

When this method is called, the output format is guaranteed to be opened. + * + *

WARNING: this may fail when no column types specified (because a best effort approach is + * attempted in order to insert a null value but it's not guaranteed that the JDBC driver + * handles PreparedStatement.setObject(pos, null)) + * + * @param upload The prepared statement. + * @param typesArray The jdbc types of the row. + * @param row The records to add to the output. + * @see PreparedStatement + */ + public static void setRecordToStatement(PreparedStatement upload, int[] typesArray, SeaTunnelRow row) + throws SQLException { + if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getFields().length) { + LOG.warn( + "Column SQL types array doesn't match arity of passed Row! Check the passed array..."); + } + if (typesArray == null) { + // no types provided + for (int index = 0; index < row.getFields().length; index++) { + upload.setObject(index + 1, row.getFields()[index]); + } + } else { + // types provided + for (int i = 0; i < row.getFields().length; i++) { + setField(upload, typesArray[i], row.getFields()[i], i); + } + } + } + + public static void setField(PreparedStatement upload, int type, Object field, int index) + throws SQLException { + if (field == null) { + upload.setNull(index + 1, type); + } else { + try { + // casting values as suggested by + // http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html + switch (type) { + case java.sql.Types.NULL: + upload.setNull(index + 1, type); + break; + case java.sql.Types.BOOLEAN: + case java.sql.Types.BIT: + upload.setBoolean(index + 1, (boolean) field); + break; + case java.sql.Types.CHAR: + case java.sql.Types.NCHAR: + case java.sql.Types.VARCHAR: + case java.sql.Types.LONGVARCHAR: + case java.sql.Types.LONGNVARCHAR: + upload.setString(index + 1, (String) field); + break; + case java.sql.Types.TINYINT: + upload.setByte(index + 1, (byte) field); + break; + case java.sql.Types.SMALLINT: + upload.setShort(index + 1, (short) field); + break; + case java.sql.Types.INTEGER: + upload.setInt(index + 1, (int) field); + break; + case java.sql.Types.BIGINT: + upload.setLong(index + 1, (long) field); + break; + case java.sql.Types.REAL: + upload.setFloat(index + 1, (float) field); + break; + case java.sql.Types.FLOAT: + case java.sql.Types.DOUBLE: + upload.setDouble(index + 1, (double) field); + break; + case java.sql.Types.DECIMAL: + case java.sql.Types.NUMERIC: + upload.setBigDecimal(index + 1, (java.math.BigDecimal) field); + break; + case java.sql.Types.DATE: + upload.setDate(index + 1, (java.sql.Date) field); + break; + case java.sql.Types.TIME: + upload.setTime(index + 1, (java.sql.Time) field); + break; + case java.sql.Types.TIMESTAMP: + upload.setTimestamp(index + 1, (java.sql.Timestamp) field); + break; + case java.sql.Types.BINARY: + case java.sql.Types.VARBINARY: + case java.sql.Types.LONGVARBINARY: + upload.setBytes(index + 1, (byte[]) field); + break; + default: + upload.setObject(index + 1, field); + LOG.warn( + "Unmanaged sql type ({}) for column {}. Best effort approach to set its value: {}.", + type, + index + 1, + field); + // case java.sql.Types.SQLXML + // case java.sql.Types.ARRAY: + // case java.sql.Types.JAVA_OBJECT: + // case java.sql.Types.BLOB: + // case java.sql.Types.CLOB: + // case java.sql.Types.NCLOB: + // case java.sql.Types.DATALINK: + // case java.sql.Types.DISTINCT: + // case java.sql.Types.OTHER: + // case java.sql.Types.REF: + // case java.sql.Types.ROWID: + // case java.sql.Types.STRUC + } + } catch (ClassCastException e) { + // enrich the exception with detailed information. + String errorMessage = + String.format( + "%s, field index: %s, field value: %s.", + e.getMessage(), index, field); + ClassCastException enrichedException = new ClassCastException(errorMessage); + enrichedException.setStackTrace(e.getStackTrace()); + throw enrichedException; + } + } + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ThrowingRunnable.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ThrowingRunnable.java new file mode 100644 index 00000000000..8119da4d8d5 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ThrowingRunnable.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.utils; + + +/** + * Similar to a {@link Runnable}, this interface is used to capture a block of code to be executed. + * In contrast to {@code Runnable}, this interface allows throwing checked exceptions. + */ + +@FunctionalInterface +public interface ThrowingRunnable { + + /** + * The work method. + * + * @throws E Exceptions may be thrown. + */ + void run() throws E; + + /** + * Converts a {@link ThrowingRunnable} into a {@link Runnable} which throws all checked + * exceptions as unchecked. + * + * @param throwingRunnable to convert into a {@link Runnable} + * @return {@link Runnable} which throws all checked exceptions as unchecked. + */ + static Runnable unchecked(ThrowingRunnable throwingRunnable) { + return () -> { + try { + throwingRunnable.run(); + } catch (Throwable t) { + ExceptionUtils.rethrow(t); + } + }; + } +} From a34ad1d17eb24b92d43643fb575ff3dac65f5163 Mon Sep 17 00:00:00 2001 From: liuli Date: Mon, 13 Jun 2022 11:08:40 +0800 Subject: [PATCH 2/3] fix code style err --- .../seatunnel/jdbc/internal/xa/GroupXaOperationResult.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/GroupXaOperationResult.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/GroupXaOperationResult.java index 15a831be4a9..349522493c3 100644 --- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/GroupXaOperationResult.java +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/GroupXaOperationResult.java @@ -67,7 +67,7 @@ boolean hasNoFailures() { void throwIfAnyFailed(String action) { failure.map( - f -> + f -> wrapFailure( f, "failed to " + action + " %d transactions out of %d", From 0295075bb6e71630269688222b56e6bdfa2f8f03 Mon Sep 17 00:00:00 2001 From: liuli Date: Mon, 13 Jun 2022 15:24:44 +0800 Subject: [PATCH 3/3] fix use deprecated method --- .../jdbc/internal/connection/SimpleJdbcConnectionProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java index 67c1702f31d..7e69e990ef8 100644 --- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java @@ -93,7 +93,7 @@ private static Driver loadDriver(String driverName) Class clazz = Class.forName(driverName, true, Thread.currentThread().getContextClassLoader()); try { - return (Driver) clazz.newInstance(); + return (Driver) clazz.getDeclaredConstructor().newInstance(); } catch (Exception ex) { throw new SQLException("Fail to create driver of class " + driverName, ex); }