Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multi-row inserts #107

Merged
merged 2 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/sink-connector-config-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ Writes

Use standard SQL ``INSERT`` statements.

``multi``

Use multi-row inserts, e.g. ``INSERT INTO table_name (column_list) VALUES (value_list_1), (value_list_2), ... (value_list_n);``

``upsert``

Use the appropriate upsert semantics for the target database if it is supported by the connector, e.g. ``INSERT .. ON CONFLICT .. DO UPDATE SET ..``.
Expand All @@ -68,7 +72,7 @@ Writes

* Type: string
* Default: insert
* Valid Values: [insert, upsert, update]
* Valid Values: [insert, multi, upsert, update]
* Importance: high

``batch.size``
Expand Down
8 changes: 8 additions & 0 deletions docs/sink-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ from Kafka.
This mode is used by default. To enable it explicitly, set
`insert.mode=insert`.

### Multi Mode

In this mode, the connector executes an `INSERT` SQL query with multiple
values (effectively inserting multiple row/records per query).
Supported in `SqliteDatabaseDialect` and `PostgreSqlDatabaseDialect`.

To use this mode, set `insert.mode=multi`

### Update Mode

In this mode, the connector executes `UPDATE` SQL query on each record
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,24 @@ String buildInsertStatement(
Collection<ColumnId> nonKeyColumns
);

/**
* Build an INSERT statement for multiple rows.
*
* @param table the identifier of the table; may not be null
* @param records number of rows which will be inserted; must be a positive number
* @param keyColumns the identifiers of the columns in the primary/unique key; may not be null
* but may be empty
* @param nonKeyColumns the identifiers of the other columns in the table; may not be null but may
* be empty
* @return the INSERT statement; may not be null
*/
String buildMultiInsertStatement(
TableId table,
int records,
Collection<ColumnId> keyColumns,
Collection<ColumnId> nonKeyColumns
);

/**
* Build the INSERT prepared statement expression for the given table and its columns.
*
Expand Down Expand Up @@ -494,7 +512,18 @@ interface StatementBinder {
* @param record the sink record with values to be bound into the statement; never null
* @throws SQLException if there is a problem binding values into the statement
*/
void bindRecord(SinkRecord record) throws SQLException;
default void bindRecord(SinkRecord record) throws SQLException {
bindRecord(1, record);
}

/**
* Bind the values in the supplied record, starting at the specified index.
*
* @param index the index at which binding starts; must be positive
* @param record the sink record with values to be bound into the statement; never null
* @throws SQLException if there is a problem binding values into the statement
*/
int bindRecord(int index, SinkRecord record) throws SQLException;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.TimeZone;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.connect.data.Date;
Expand Down Expand Up @@ -85,6 +86,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.aiven.connect.jdbc.util.CollectionUtils.isEmpty;
import static java.util.Objects.requireNonNull;
import static java.util.stream.IntStream.range;

/**
* A {@link DatabaseDialect} implementation that provides functionality based upon JDBC and SQL.
*
Expand Down Expand Up @@ -1350,6 +1355,44 @@ public String buildInsertStatement(
return builder.toString();
}

@Override
public String buildMultiInsertStatement(final TableId table,
final int records,
final Collection<ColumnId> keyColumns,
final Collection<ColumnId> nonKeyColumns) {

if (records < 1) {
akudiyar marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalArgumentException("number of records must be a positive number, but got: " + records);
}
if (isEmpty(keyColumns) && isEmpty(nonKeyColumns)) {
throw new IllegalArgumentException("no columns specified");
}
requireNonNull(table, "table must not be null");

final String insertStatement = expressionBuilder()
.append("INSERT INTO ")
.append(table)
.append("(")
.appendList()
.delimitedBy(",")
.transformedBy(ExpressionBuilder.columnNames())
.of(keyColumns, nonKeyColumns)
.append(") VALUES ")
.toString();

final String singleRowPlaceholder = expressionBuilder()
.append("(")
.appendMultiple(",", "?", keyColumns.size() + nonKeyColumns.size())
.append(")")
.toString();

final String allRowsPlaceholder = range(1, records + 1)
.mapToObj(i -> singleRowPlaceholder)
.collect(Collectors.joining(","));

return insertStatement + allRowsPlaceholder;
}

@Override
public String buildUpdateStatement(
final TableId table,
Expand Down
144 changes: 106 additions & 38 deletions src/main/java/io/aiven/connect/jdbc/sink/BufferedRecords.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.aiven.connect.jdbc.sink.JdbcSinkConfig.InsertMode.MULTI;

public class BufferedRecords {
private static final Logger log = LoggerFactory.getLogger(BufferedRecords.class);

Expand All @@ -53,6 +55,7 @@ public class BufferedRecords {
private List<SinkRecord> records = new ArrayList<>();
private SchemaPair currentSchemaPair;
private FieldsMetadata fieldsMetadata;
private TableDefinition tableDefinition;
private PreparedStatement preparedStatement;
private StatementBinder preparedStatementBinder;

Expand All @@ -76,39 +79,10 @@ public List<SinkRecord> add(final SinkRecord record) throws SQLException {
record.valueSchema()
);

if (currentSchemaPair == null) {
currentSchemaPair = schemaPair;
// re-initialize everything that depends on the record schema
fieldsMetadata = FieldsMetadata.extract(
tableId.tableName(),
config.pkMode,
config.pkFields,
config.fieldsWhitelist,
currentSchemaPair
);
dbStructure.createOrAmendIfNecessary(
config,
connection,
tableId,
fieldsMetadata
);
log.debug("buffered records in list {}", records.size());

final TableDefinition tableDefinition = dbStructure.tableDefinitionFor(tableId, connection);
final String sql = getInsertSql(tableDefinition);
log.debug(
"{} sql: {}",
config.insertMode,
sql
);
close();
preparedStatement = connection.prepareStatement(sql);
preparedStatementBinder = dbDialect.statementBinder(
preparedStatement,
config.pkMode,
schemaPair,
fieldsMetadata,
config.insertMode
);
if (currentSchemaPair == null) {
reInitialize(schemaPair);
}

final List<SinkRecord> flushed;
Expand All @@ -134,27 +108,74 @@ public List<SinkRecord> add(final SinkRecord record) throws SQLException {
return flushed;
}

private void prepareStatement() throws SQLException {
final String sql;
log.debug("Generating query for insert mode {} and {} records", config.insertMode, records.size());
if (config.insertMode == MULTI) {
sql = getMultiInsertSql();
} else {
sql = getInsertSql();
}

log.debug("Prepared SQL {} for insert mode {}", sql, config.insertMode);

close();
preparedStatement = connection.prepareStatement(sql);
preparedStatementBinder = dbDialect.statementBinder(
preparedStatement,
config.pkMode,
currentSchemaPair,
fieldsMetadata,
config.insertMode
);
}

/**
* Re-initialize everything that depends on the record schema
*/
private void reInitialize(final SchemaPair schemaPair) throws SQLException {
currentSchemaPair = schemaPair;
fieldsMetadata = FieldsMetadata.extract(
tableId.tableName(),
config.pkMode,
config.pkFields,
config.fieldsWhitelist,
currentSchemaPair
);
dbStructure.createOrAmendIfNecessary(
config,
connection,
tableId,
fieldsMetadata
);

tableDefinition = dbStructure.tableDefinitionFor(tableId, connection);
}

public List<SinkRecord> flush() throws SQLException {
if (records.isEmpty()) {
log.debug("Records is empty");
return new ArrayList<>();
}
log.debug("Flushing {} buffered records", records.size());
for (final SinkRecord record : records) {
preparedStatementBinder.bindRecord(record);
}
prepareStatement();
bindRecords();

int totalUpdateCount = 0;
boolean successNoInfo = false;
for (final int updateCount : preparedStatement.executeBatch()) {

log.debug("Executing batch...");
for (final int updateCount : executeBatch()) {
if (updateCount == Statement.SUCCESS_NO_INFO) {
successNoInfo = true;
continue;
}
totalUpdateCount += updateCount;
}
log.debug("Done executing batch.");
if (totalUpdateCount != records.size() && !successNoInfo) {
switch (config.insertMode) {
case INSERT:
case MULTI:
throw new ConnectException(String.format(
"Update count (%d) did not sum up to total number of records inserted (%d)",
totalUpdateCount,
Expand Down Expand Up @@ -186,6 +207,30 @@ public List<SinkRecord> flush() throws SQLException {
return flushedRecords;
}

private int[] executeBatch() throws SQLException {
if (config.insertMode == MULTI) {
preparedStatement.addBatch();
}
log.debug("Executing batch with insert mode {}", config.insertMode);
return preparedStatement.executeBatch();
}

private void bindRecords() throws SQLException {
log.debug("Binding {} buffered records", records.size());
int index = 1;
for (final SinkRecord record : records) {
if (config.insertMode == MULTI) {
// All records are bound to the same prepared statement,
// so when binding fields for record N (N > 0)
// we need to start at the index where binding fields for record N - 1 stopped.
index = preparedStatementBinder.bindRecord(index, record);
} else {
preparedStatementBinder.bindRecord(record);
}
}
log.debug("Done binding records.");
}

public void close() throws SQLException {
log.info("Closing BufferedRecords with preparedStatement: {}", preparedStatement);
if (preparedStatement != null) {
Expand All @@ -194,7 +239,30 @@ public void close() throws SQLException {
}
}

private String getInsertSql(final TableDefinition tableDefinition) {
private String getMultiInsertSql() {
if (config.insertMode != MULTI) {
throw new ConnectException(String.format(
"Multi-row first insert SQL unsupported by insert mode %s",
config.insertMode
));
}
try {
return dbDialect.buildMultiInsertStatement(
tableId,
records.size(),
asColumns(fieldsMetadata.keyFieldNames),
asColumns(fieldsMetadata.nonKeyFieldNames)
);
} catch (final UnsupportedOperationException e) {
throw new ConnectException(String.format(
"Write to table '%s' in MULTI mode is not supported with the %s dialect.",
tableId,
dbDialect.name()
));
}
}

private String getInsertSql() {
switch (config.insertMode) {
case INSERT:
return dbDialect.buildInsertStatement(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class JdbcSinkConfig extends JdbcConfig {

public enum InsertMode {
INSERT,
MULTI,
UPSERT,
UPDATE;
}
Expand Down Expand Up @@ -122,6 +123,8 @@ public enum PrimaryKeyMode {
"The insertion mode to use. Supported modes are:\n"
+ "``insert``\n"
+ " Use standard SQL ``INSERT`` statements.\n"
+ "``multi``\n"
+ " Use multi-row ``INSERT`` statements.\n"
+ "``upsert``\n"
+ " Use the appropriate upsert semantics for the target database if it is supported by "
+ "the connector, e.g. ``INSERT .. ON CONFLICT .. DO UPDATE SET ..``.\n"
Expand Down
Loading