Skip to content

Commit

Permalink
Add Sqlmode (#63)
Browse files Browse the repository at this point in the history
* add update logic for the DBSinker 、 upgrade the concat_ws function

* Add the field level cache to reduce duplicate data entry #60

* Sqlmode can be used to write data to the database as specified
#62

Co-authored-by: junjie.cheng <junjie.cheng@alibaba-inc.com>
  • Loading branch information
j-ching and j-ching authored Sep 15, 2021
1 parent 0742cf4 commit 87a3222
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,34 @@

import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import org.apache.rocketmq.streams.common.channel.IChannel;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
import org.apache.rocketmq.streams.common.component.AbstractComponent;
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.metadata.MetaDataField;
import org.apache.rocketmq.streams.common.utils.SQLUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.db.driver.DriverBuilder;
import org.apache.rocketmq.streams.db.driver.JDBCDriver;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;

/**
* 主要用于写db,输入可以是一个insert/replace 模版,也可以是metadata对象,二者选一即可。都支持批量插入,提高吞吐 sql 模版:insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}') MetaData:主要是描述每个字段的类型,是否必须 二者选一个即可。sql模式,系统会把一批(batchSize)数据拼成一个大sql。metadata模式,基于字段描述,最终也是拼成一个大sql
*/
public class DBSink extends AbstractSink {

protected String insertSQLTemplate;//完成插入部分的工作,和metadata二选一。insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}')

protected String duplicateSQLTemplate; //通过on duplicate key update 来对已经存在的信息进行更新

protected MetaData metaData;//可以指定meta data,和insertSQL二选一

protected String tableName; //指定要插入的数据表
public static final String SQL_MODE_DEFAULT = "default";
public static final String SQL_MODE_REPLACE = "replace";
public static final String SQL_MODE_IGNORE = "ignore";

@ENVDependence
protected String jdbcDriver = AbstractComponent.DEFAULT_JDBC_DRIVER;
Expand All @@ -60,7 +54,15 @@ public class DBSink extends AbstractSink {
@ENVDependence
protected String userName;
@ENVDependence
protected String tableName; //指定要插入的数据表
@ENVDependence
protected String password;
@ENVDependence
protected String sqlMode;

protected MetaData metaData;//可以指定meta data,和insertSQL二选一

protected String insertSQLTemplate;//完成插入部分的工作,和metadata二选一。insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}')

protected boolean openSqlCache = true;

Expand All @@ -87,60 +89,83 @@ public DBSink(String insertSQL, String dbInfoNamePrefix) {
}

public DBSink() {
setType(IChannel.TYPE);
this(null, null, null, null);
}

public DBSink(String url, String userName, String password) {
setType(IChannel.TYPE);
this.url = url;
this.userName = userName;
this.password = password;
public DBSink(String url, String userName, String password, String tableName) {
this(url, userName, password, tableName, SQL_MODE_DEFAULT);
}

public DBSink(String insertSQL, String url, String userName, String password) {
public DBSink(String url, String userName, String password, String tableName, String sqlMode) {
this(url, userName, password, tableName, sqlMode, null);
}

public DBSink(String url, String userName, String password, String tableName, String sqlMode, MetaData metaData) {
setType(IChannel.TYPE);
this.url = url;
this.userName = userName;
this.password = password;
this.insertSQLTemplate = insertSQL;
this.tableName = tableName;
this.sqlMode = sqlMode;
this.metaData = metaData;
}

@Override
protected boolean initConfigurable() {
try {
Class.forName("com.mysql.jdbc.Driver");
if (StringUtil.isNotEmpty(this.tableName)) {
Connection connection = DriverManager.getConnection(url, userName, password);
DatabaseMetaData metaData = connection.getMetaData();
ResultSet metaResult = metaData.getColumns(connection.getCatalog(), "%", this.tableName, null);
this.metaData = MetaData.createMetaData(metaResult);
this.metaData.setTableName(this.tableName);
}
sqlCache = new MessageCache<>(new IMessageFlushCallBack<String>() {
@Override
public boolean flushMessage(List<String> sqls) {
JDBCDriver dataSource = DriverBuilder.createDriver(jdbcDriver, url, userName, password);
try {
dataSource.executSqls(sqls);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
if (dataSource != null) {
dataSource.destroy();
}
}
return true;
if (this.metaData == null) {
try {
Class.forName("com.mysql.jdbc.Driver");
if (StringUtil.isNotEmpty(this.tableName)) {
Connection connection = DriverManager.getConnection(this.url, this.userName, this.password);
DatabaseMetaData connectionMetaData = connection.getMetaData();
ResultSet metaResult = connectionMetaData.getColumns(connection.getCatalog(), "%", this.tableName, null);
this.metaData = MetaData.createMetaData(metaResult);
this.metaData.setTableName(this.tableName);
}
});
((MessageCache<String>) sqlCache).setAutoFlushTimeGap(100000);
((MessageCache<String>) sqlCache).setAutoFlushSize(50);
sqlCache.openAutoFlush();
return super.initConfigurable();
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
List<MetaDataField> fieldList = this.metaData.getMetaDataFields();
List<String> insertFields = Lists.newArrayList();
List<String> insertValues = Lists.newArrayList();
List<String> duplicateKeys = Lists.newArrayList();
fieldList.forEach(field -> {
String fieldName = field.getFieldName();
insertFields.add(fieldName);
insertValues.add("'#{" + fieldName + "}'");
duplicateKeys.add(fieldName + " = VALUES(" + fieldName + ")");
});

String sql = "insert";
if (sqlMode == null || SQL_MODE_DEFAULT.equals(sqlMode)) {
sql = sql + " into ";
} else if (SQL_MODE_IGNORE.equals(sqlMode)) {
sql = sql + " ignore into ";
} else if (SQL_MODE_REPLACE.equals(sqlMode)) {
sql = sql + " into ";
}
return false;
sql = sql + tableName + "(" + String.join(",", insertFields) + ") values (" + String.join(",", insertValues) + ") ";
if (SQL_MODE_REPLACE.equals(sqlMode)) {
sql = sql + " on duplicate key update " + String.join(",", duplicateKeys);
}
this.insertSQLTemplate = sql;
this.sqlCache = new MessageCache<>(sqls -> {
JDBCDriver dataSource = DriverBuilder.createDriver(jdbcDriver, url, userName, password);
try {
dataSource.executSqls(sqls);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
dataSource.destroy();
}
return true;
});
((MessageCache<String>) this.sqlCache).setAutoFlushTimeGap(100000);
((MessageCache<String>) this.sqlCache).setAutoFlushSize(50);
this.sqlCache.openAutoFlush();
return super.initConfigurable();
}

@Override
Expand All @@ -154,15 +179,13 @@ protected boolean batchInsert(List<IMessage> messageList) {
if (StringUtil.isEmpty(insertSQLTemplate) && metaData != null) {
String sql = SQLUtil.createInsertSql(metaData, messages.get(0));
sql += SQLUtil.createInsertValuesSQL(metaData, messages.subList(1, messages.size()));
sql += this.duplicateSQLTemplate;
executeSQL(dbDataSource, sql);
return true;
}
String insertValueSQL = parseInsertValues(insertSQLTemplate);
if (StringUtil.isEmpty(insertValueSQL) || insertSQLTemplate.replace(insertValueSQL, "").contains("#{")) {
for (JSONObject message : messages) {
String sql = parseSQL(message, insertSQLTemplate);
sql += this.duplicateSQLTemplate;
executeSQL(dbDataSource, sql);
}
return true;
Expand All @@ -172,7 +195,6 @@ protected boolean batchInsert(List<IMessage> messageList) {
subInsert.add(parseSQL(message, insertValueSQL));
}
String insertSQL = this.insertSQLTemplate.replace(insertValueSQL, String.join(",", subInsert));
insertSQL += this.duplicateSQLTemplate;
executeSQL(dbDataSource, insertSQL);
return true;
}
Expand All @@ -195,7 +217,16 @@ protected void executeSQL(JDBCDriver dbDataSource, String sql) {
} else {
dbDataSource.execute(sql);
}
}

protected void executeSQL(JDBCDriver dbDataSource, List<String> sqls) {
if (isOpenSqlCache()) {
for (String sql : sqls) {
this.sqlCache.addCache(sql);
}
} else {
dbDataSource.executSqls(sqls);
}
}

/**
Expand All @@ -209,7 +240,7 @@ protected String parseInsertValues(String insertSQL) {
return null;
}
String valuesSQL = insertSQL.substring(start + VALUES_NAME.length());
int end = valuesSQL.toLowerCase().lastIndexOf(")");
int end = valuesSQL.toLowerCase().indexOf(")");
if (end == -1) {
return null;
}
Expand Down Expand Up @@ -260,14 +291,6 @@ public void setPassword(String password) {
this.password = password;
}

public MetaData getMetaData() {
return metaData;
}

public void setMetaData(MetaData metaData) {
this.metaData = metaData;
}

public String getTableName() {
return tableName;
}
Expand All @@ -276,6 +299,22 @@ public void setTableName(String tableName) {
this.tableName = tableName;
}

public String getSqlMode() {
return sqlMode;
}

public void setSqlMode(String sqlMode) {
this.sqlMode = sqlMode;
}

public MetaData getMetaData() {
return metaData;
}

public void setMetaData(MetaData metaData) {
this.metaData = metaData;
}

public boolean isOpenSqlCache() {
return openSqlCache;
}
Expand All @@ -284,11 +323,4 @@ public void setOpenSqlCache(boolean openSqlCache) {
this.openSqlCache = openSqlCache;
}

public String getDuplicateSQLTemplate() {
return duplicateSQLTemplate;
}

public void setDuplicateSQLTemplate(String duplicateSQLTemplate) {
this.duplicateSQLTemplate = duplicateSQLTemplate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,13 @@
package org.apache.rocketmq.streams.db.sink;

import com.google.auto.service.AutoService;
import com.google.common.collect.Lists;
import java.util.Properties;
import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.metadata.MetaDataField;
import org.apache.rocketmq.streams.common.model.ServiceName;

import java.util.List;
import java.util.Properties;

@AutoService(IChannelBuilder.class)
@ServiceName(DBSinkBuilder.TYPE)
public class DBSinkBuilder implements IChannelBuilder {
Expand All @@ -39,21 +35,9 @@ public ISink createSink(String namespace, String name, Properties properties, Me
sink.setUrl(properties.getProperty("url"));
sink.setUserName(properties.getProperty("userName"));
sink.setPassword(properties.getProperty("password"));
List<MetaDataField> fieldList = metaData.getMetaDataFields();

List<String> insertFields = Lists.newArrayList();
List<String> insertValues = Lists.newArrayList();
List<String> duplicateKeys = Lists.newArrayList();
fieldList.forEach(field -> {
String fieldName = field.getFieldName();
insertFields.add(fieldName);
insertValues.add("'#{" + fieldName + "}'");
duplicateKeys.add(fieldName + " = VALUES(" + fieldName + ")");
});

String sql = "insert into " + properties.getProperty("tableName") + "(" + String.join(",", insertFields) + ") values (" + String.join(",", insertValues) + ") ";
sink.setInsertSQLTemplate(sql);
sink.setDuplicateSQLTemplate(" on duplicate key update " + String.join(",", duplicateKeys));
sink.setTableName(properties.getProperty("tableName"));
sink.setSqlMode(properties.getProperty("sqlMode"));
sink.setMetaData(metaData);
return sink;
}

Expand Down
Loading

0 comments on commit 87a3222

Please sign in to comment.