Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Sqlmode #63

Merged
merged 4 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,34 @@

import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import org.apache.rocketmq.streams.common.channel.IChannel;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
import org.apache.rocketmq.streams.common.component.AbstractComponent;
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.metadata.MetaDataField;
import org.apache.rocketmq.streams.common.utils.SQLUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.db.driver.DriverBuilder;
import org.apache.rocketmq.streams.db.driver.JDBCDriver;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;

/**
* 主要用于写db,输入可以是一个insert/replace 模版,也可以是metadata对象,二者选一即可。都支持批量插入,提高吞吐 sql 模版:insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}') MetaData:主要是描述每个字段的类型,是否必须 二者选一个即可。sql模式,系统会把一批(batchSize)数据拼成一个大sql。metadata模式,基于字段描述,最终也是拼成一个大sql
*/
public class DBSink extends AbstractSink {

protected String insertSQLTemplate;//完成插入部分的工作,和metadata二选一。insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}')

protected String duplicateSQLTemplate; //通过on duplicate key update 来对已经存在的信息进行更新

protected MetaData metaData;//可以指定meta data,和insertSQL二选一

protected String tableName; //指定要插入的数据表
public static final String SQL_MODE_DEFAULT = "default";
public static final String SQL_MODE_REPLACE = "replace";
public static final String SQL_MODE_IGNORE = "ignore";

@ENVDependence
protected String jdbcDriver = AbstractComponent.DEFAULT_JDBC_DRIVER;
Expand All @@ -60,7 +54,15 @@ public class DBSink extends AbstractSink {
@ENVDependence
protected String userName;
@ENVDependence
protected String tableName; //指定要插入的数据表
@ENVDependence
protected String password;
@ENVDependence
protected String sqlMode;

protected MetaData metaData;//可以指定meta data,和insertSQL二选一

protected String insertSQLTemplate;//完成插入部分的工作,和metadata二选一。insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}')

protected boolean openSqlCache = true;

Expand All @@ -87,60 +89,83 @@ public DBSink(String insertSQL, String dbInfoNamePrefix) {
}

public DBSink() {
setType(IChannel.TYPE);
this(null, null, null, null);
}

public DBSink(String url, String userName, String password) {
setType(IChannel.TYPE);
this.url = url;
this.userName = userName;
this.password = password;
public DBSink(String url, String userName, String password, String tableName) {
this(url, userName, password, tableName, SQL_MODE_DEFAULT);
}

public DBSink(String insertSQL, String url, String userName, String password) {
public DBSink(String url, String userName, String password, String tableName, String sqlMode) {
this(url, userName, password, tableName, sqlMode, null);
}

public DBSink(String url, String userName, String password, String tableName, String sqlMode, MetaData metaData) {
setType(IChannel.TYPE);
this.url = url;
this.userName = userName;
this.password = password;
this.insertSQLTemplate = insertSQL;
this.tableName = tableName;
this.sqlMode = sqlMode;
this.metaData = metaData;
}

@Override
protected boolean initConfigurable() {
try {
Class.forName("com.mysql.jdbc.Driver");
if (StringUtil.isNotEmpty(this.tableName)) {
Connection connection = DriverManager.getConnection(url, userName, password);
DatabaseMetaData metaData = connection.getMetaData();
ResultSet metaResult = metaData.getColumns(connection.getCatalog(), "%", this.tableName, null);
this.metaData = MetaData.createMetaData(metaResult);
this.metaData.setTableName(this.tableName);
}
sqlCache = new MessageCache<>(new IMessageFlushCallBack<String>() {
@Override
public boolean flushMessage(List<String> sqls) {
JDBCDriver dataSource = DriverBuilder.createDriver(jdbcDriver, url, userName, password);
try {
dataSource.executSqls(sqls);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
if (dataSource != null) {
dataSource.destroy();
}
}
return true;
if (this.metaData == null) {
try {
Class.forName("com.mysql.jdbc.Driver");
if (StringUtil.isNotEmpty(this.tableName)) {
Connection connection = DriverManager.getConnection(this.url, this.userName, this.password);
DatabaseMetaData connectionMetaData = connection.getMetaData();
ResultSet metaResult = connectionMetaData.getColumns(connection.getCatalog(), "%", this.tableName, null);
this.metaData = MetaData.createMetaData(metaResult);
this.metaData.setTableName(this.tableName);
}
});
((MessageCache<String>) sqlCache).setAutoFlushTimeGap(100000);
((MessageCache<String>) sqlCache).setAutoFlushSize(50);
sqlCache.openAutoFlush();
return super.initConfigurable();
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
List<MetaDataField> fieldList = this.metaData.getMetaDataFields();
List<String> insertFields = Lists.newArrayList();
List<String> insertValues = Lists.newArrayList();
List<String> duplicateKeys = Lists.newArrayList();
fieldList.forEach(field -> {
String fieldName = field.getFieldName();
insertFields.add(fieldName);
insertValues.add("'#{" + fieldName + "}'");
duplicateKeys.add(fieldName + " = VALUES(" + fieldName + ")");
});

String sql = "insert";
if (sqlMode == null || SQL_MODE_DEFAULT.equals(sqlMode)) {
sql = sql + " into ";
} else if (SQL_MODE_IGNORE.equals(sqlMode)) {
sql = sql + " ignore into ";
} else if (SQL_MODE_REPLACE.equals(sqlMode)) {
sql = sql + " into ";
}
return false;
sql = sql + tableName + "(" + String.join(",", insertFields) + ") values (" + String.join(",", insertValues) + ") ";
if (SQL_MODE_REPLACE.equals(sqlMode)) {
sql = sql + " on duplicate key update " + String.join(",", duplicateKeys);
}
this.insertSQLTemplate = sql;
this.sqlCache = new MessageCache<>(sqls -> {
JDBCDriver dataSource = DriverBuilder.createDriver(jdbcDriver, url, userName, password);
try {
dataSource.executSqls(sqls);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
dataSource.destroy();
}
return true;
});
((MessageCache<String>) this.sqlCache).setAutoFlushTimeGap(100000);
((MessageCache<String>) this.sqlCache).setAutoFlushSize(50);
this.sqlCache.openAutoFlush();
return super.initConfigurable();
}

@Override
Expand All @@ -154,15 +179,13 @@ protected boolean batchInsert(List<IMessage> messageList) {
if (StringUtil.isEmpty(insertSQLTemplate) && metaData != null) {
String sql = SQLUtil.createInsertSql(metaData, messages.get(0));
sql += SQLUtil.createInsertValuesSQL(metaData, messages.subList(1, messages.size()));
sql += this.duplicateSQLTemplate;
executeSQL(dbDataSource, sql);
return true;
}
String insertValueSQL = parseInsertValues(insertSQLTemplate);
if (StringUtil.isEmpty(insertValueSQL) || insertSQLTemplate.replace(insertValueSQL, "").contains("#{")) {
for (JSONObject message : messages) {
String sql = parseSQL(message, insertSQLTemplate);
sql += this.duplicateSQLTemplate;
executeSQL(dbDataSource, sql);
}
return true;
Expand All @@ -172,7 +195,6 @@ protected boolean batchInsert(List<IMessage> messageList) {
subInsert.add(parseSQL(message, insertValueSQL));
}
String insertSQL = this.insertSQLTemplate.replace(insertValueSQL, String.join(",", subInsert));
insertSQL += this.duplicateSQLTemplate;
executeSQL(dbDataSource, insertSQL);
return true;
}
Expand All @@ -195,7 +217,16 @@ protected void executeSQL(JDBCDriver dbDataSource, String sql) {
} else {
dbDataSource.execute(sql);
}
}

protected void executeSQL(JDBCDriver dbDataSource, List<String> sqls) {
if (isOpenSqlCache()) {
for (String sql : sqls) {
this.sqlCache.addCache(sql);
}
} else {
dbDataSource.executSqls(sqls);
}
}

/**
Expand All @@ -209,7 +240,7 @@ protected String parseInsertValues(String insertSQL) {
return null;
}
String valuesSQL = insertSQL.substring(start + VALUES_NAME.length());
int end = valuesSQL.toLowerCase().lastIndexOf(")");
int end = valuesSQL.toLowerCase().indexOf(")");
if (end == -1) {
return null;
}
Expand Down Expand Up @@ -260,14 +291,6 @@ public void setPassword(String password) {
this.password = password;
}

public MetaData getMetaData() {
return metaData;
}

public void setMetaData(MetaData metaData) {
this.metaData = metaData;
}

public String getTableName() {
return tableName;
}
Expand All @@ -276,6 +299,22 @@ public void setTableName(String tableName) {
this.tableName = tableName;
}

public String getSqlMode() {
return sqlMode;
}

public void setSqlMode(String sqlMode) {
this.sqlMode = sqlMode;
}

public MetaData getMetaData() {
return metaData;
}

public void setMetaData(MetaData metaData) {
this.metaData = metaData;
}

public boolean isOpenSqlCache() {
return openSqlCache;
}
Expand All @@ -284,11 +323,4 @@ public void setOpenSqlCache(boolean openSqlCache) {
this.openSqlCache = openSqlCache;
}

public String getDuplicateSQLTemplate() {
return duplicateSQLTemplate;
}

public void setDuplicateSQLTemplate(String duplicateSQLTemplate) {
this.duplicateSQLTemplate = duplicateSQLTemplate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,13 @@
package org.apache.rocketmq.streams.db.sink;

import com.google.auto.service.AutoService;
import com.google.common.collect.Lists;
import java.util.Properties;
import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.metadata.MetaDataField;
import org.apache.rocketmq.streams.common.model.ServiceName;

import java.util.List;
import java.util.Properties;

@AutoService(IChannelBuilder.class)
@ServiceName(DBSinkBuilder.TYPE)
public class DBSinkBuilder implements IChannelBuilder {
Expand All @@ -39,21 +35,9 @@ public ISink createSink(String namespace, String name, Properties properties, Me
sink.setUrl(properties.getProperty("url"));
sink.setUserName(properties.getProperty("userName"));
sink.setPassword(properties.getProperty("password"));
List<MetaDataField> fieldList = metaData.getMetaDataFields();

List<String> insertFields = Lists.newArrayList();
List<String> insertValues = Lists.newArrayList();
List<String> duplicateKeys = Lists.newArrayList();
fieldList.forEach(field -> {
String fieldName = field.getFieldName();
insertFields.add(fieldName);
insertValues.add("'#{" + fieldName + "}'");
duplicateKeys.add(fieldName + " = VALUES(" + fieldName + ")");
});

String sql = "insert into " + properties.getProperty("tableName") + "(" + String.join(",", insertFields) + ") values (" + String.join(",", insertValues) + ") ";
sink.setInsertSQLTemplate(sql);
sink.setDuplicateSQLTemplate(" on duplicate key update " + String.join(",", duplicateKeys));
sink.setTableName(properties.getProperty("tableName"));
sink.setSqlMode(properties.getProperty("sqlMode"));
sink.setMetaData(metaData);
return sink;
}

Expand Down
Loading