Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support hyperscan and fixed bugs #67

Merged
merged 7 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
<spring.version>3.2.13.RELEASE</spring.version>
<auto-service.version>1.0-rc5</auto-service.version>
<mysql-connector.version>5.1.40</mysql-connector.version>
<fastjson.version>1.2.27</fastjson.version>
<fastjson.version>1.2.78</fastjson.version>
<quartz.version>2.2.1</quartz.version>
<httpclient.version>4.5.2</httpclient.version>
<commons-io.version>2.5</commons-io.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,23 @@
package org.apache.rocketmq.streams.db.sink;

import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import org.apache.rocketmq.streams.common.channel.IChannel;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
import org.apache.rocketmq.streams.common.component.AbstractComponent;
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.metadata.MetaDataField;
import org.apache.rocketmq.streams.common.utils.SQLUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.db.driver.DriverBuilder;
Expand All @@ -42,12 +43,9 @@
* 主要用于写db,输入可以是一个insert/replace 模版,也可以是metadata对象,二者选一即可。都支持批量插入,提高吞吐 sql 模版:insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}') MetaData:主要是描述每个字段的类型,是否必须 二者选一个即可。sql模式,系统会把一批(batchSize)数据拼成一个大sql。metadata模式,基于字段描述,最终也是拼成一个大sql
*/
public class DBSink extends AbstractSink {

protected String insertSQLTemplate;//完成插入部分的工作,和metadata二选一。insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}')

protected MetaData metaData;//可以指定meta data,和insertSQL二选一

protected String tableName; //指定要插入的数据表
public static final String SQL_MODE_DEFAULT = "default";
public static final String SQL_MODE_REPLACE = "replace";
public static final String SQL_MODE_IGNORE = "ignore";

@ENVDependence
protected String jdbcDriver = AbstractComponent.DEFAULT_JDBC_DRIVER;
Expand All @@ -56,9 +54,17 @@ public class DBSink extends AbstractSink {
@ENVDependence
protected String userName;
@ENVDependence
protected String tableName; //指定要插入的数据表
@ENVDependence
protected String password;
@ENVDependence
protected String sqlMode;

protected boolean openSqlCache=false;
protected MetaData metaData;//可以指定meta data,和insertSQL二选一

protected String insertSQLTemplate;//完成插入部分的工作,和metadata二选一。insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}')

protected boolean openSqlCache = true;

protected transient IMessageCache<String> sqlCache;//cache sql, batch submit sql

Expand All @@ -83,59 +89,83 @@ public DBSink(String insertSQL, String dbInfoNamePrefix) {
}

public DBSink() {
setType(IChannel.TYPE);
this(null, null, null, null);
}

public DBSink(String url, String userName, String password) {
setType(IChannel.TYPE);
this.url = url;
this.userName = userName;
this.password = password;
public DBSink(String url, String userName, String password, String tableName) {
this(url, userName, password, tableName, SQL_MODE_DEFAULT);
}

public DBSink(String insertSQL, String url, String userName, String password) {
public DBSink(String url, String userName, String password, String tableName, String sqlMode) {
this(url, userName, password, tableName, sqlMode, null);
}

public DBSink(String url, String userName, String password, String tableName, String sqlMode, MetaData metaData) {
setType(IChannel.TYPE);
this.url = url;
this.userName = userName;
this.password = password;
this.insertSQLTemplate = insertSQL;
this.tableName = tableName;
this.sqlMode = sqlMode;
this.metaData = metaData;
}

@Override
protected boolean initConfigurable() {
try {
Class.forName("com.mysql.jdbc.Driver");
if (StringUtil.isNotEmpty(this.tableName)) {
Connection connection = DriverManager.getConnection(url, userName, password);
DatabaseMetaData metaData = connection.getMetaData();
ResultSet metaResult = metaData.getColumns(connection.getCatalog(), "%", this.tableName, null);
this.metaData = MetaData.createMetaData(metaResult);
this.metaData.setTableName(this.tableName);
sqlCache=new MessageCache<>(new IMessageFlushCallBack<String>() {
@Override public boolean flushMessage(List<String> sqls) {
JDBCDriver dataSource = DriverBuilder.createDriver(jdbcDriver, url, userName, password);
try {
dataSource.executSqls(sqls);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
if (dataSource != null) {
dataSource.destroy();
}
}
return true;
}
});
((MessageCache<String>) sqlCache).setAutoFlushTimeGap(100000);
((MessageCache<String>) sqlCache).setAutoFlushSize(50);
sqlCache.openAutoFlush();
if (this.metaData == null) {
try {
Class.forName("com.mysql.jdbc.Driver");
if (StringUtil.isNotEmpty(this.tableName)) {
Connection connection = DriverManager.getConnection(this.url, this.userName, this.password);
DatabaseMetaData connectionMetaData = connection.getMetaData();
ResultSet metaResult = connectionMetaData.getColumns(connection.getCatalog(), "%", this.tableName, null);
this.metaData = MetaData.createMetaData(metaResult);
this.metaData.setTableName(this.tableName);
}
} catch (Exception e) {
e.printStackTrace();
}
return super.initConfigurable();
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
}
return false;
List<MetaDataField> fieldList = this.metaData.getMetaDataFields();
List<String> insertFields = Lists.newArrayList();
List<String> insertValues = Lists.newArrayList();
List<String> duplicateKeys = Lists.newArrayList();
fieldList.forEach(field -> {
String fieldName = field.getFieldName();
insertFields.add(fieldName);
insertValues.add("'#{" + fieldName + "}'");
duplicateKeys.add(fieldName + " = VALUES(" + fieldName + ")");
});

String sql = "insert";
if (sqlMode == null || SQL_MODE_DEFAULT.equals(sqlMode)) {
sql = sql + " into ";
} else if (SQL_MODE_IGNORE.equals(sqlMode)) {
sql = sql + " ignore into ";
} else if (SQL_MODE_REPLACE.equals(sqlMode)) {
sql = sql + " into ";
}
sql = sql + tableName + "(" + String.join(",", insertFields) + ") values (" + String.join(",", insertValues) + ") ";
if (SQL_MODE_REPLACE.equals(sqlMode)) {
sql = sql + " on duplicate key update " + String.join(",", duplicateKeys);
}
this.insertSQLTemplate = sql;
this.sqlCache = new MessageCache<>(sqls -> {
JDBCDriver dataSource = DriverBuilder.createDriver(jdbcDriver, url, userName, password);
try {
dataSource.executSqls(sqls);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
dataSource.destroy();
}
return true;
});
((MessageCache<String>) this.sqlCache).setAutoFlushTimeGap(100000);
((MessageCache<String>) this.sqlCache).setAutoFlushSize(50);
this.sqlCache.openAutoFlush();
return super.initConfigurable();
}

@Override
Expand All @@ -148,7 +178,7 @@ protected boolean batchInsert(List<IMessage> messageList) {
List<JSONObject> messages = convertJsonObjectFromMessage(messageList);
if (StringUtil.isEmpty(insertSQLTemplate) && metaData != null) {
String sql = SQLUtil.createInsertSql(metaData, messages.get(0));
sql = sql + SQLUtil.createInsertValuesSQL(metaData, messages.subList(1, messages.size()));
sql += SQLUtil.createInsertValuesSQL(metaData, messages.subList(1, messages.size()));
executeSQL(dbDataSource, sql);
return true;
}
Expand All @@ -160,22 +190,11 @@ protected boolean batchInsert(List<IMessage> messageList) {
}
return true;
} else {
StringBuilder sb = new StringBuilder();
String insertSQL;
boolean isFirst = true;
int i = 0;
List<String> subInsert = Lists.newArrayList();
for (JSONObject message : messages) {
insertSQL = parseSQL(message, insertValueSQL);
if (isFirst) {
isFirst = false;
} else {
sb.append(",");
}
i++;

sb.append(insertSQL);
subInsert.add(parseSQL(message, insertValueSQL));
}
insertSQL = this.insertSQLTemplate.replace(insertValueSQL, sb.toString());
String insertSQL = this.insertSQLTemplate.replace(insertValueSQL, String.join(",", subInsert));
executeSQL(dbDataSource, insertSQL);
return true;
}
Expand All @@ -184,20 +203,30 @@ protected boolean batchInsert(List<IMessage> messageList) {
}
}

@Override public boolean checkpoint(Set<String> splitIds) {
if(sqlCache!=null){
@Override
public boolean checkpoint(Set<String> splitIds) {
if (sqlCache != null) {
sqlCache.flush(splitIds);
}
return true;
}

protected void executeSQL(JDBCDriver dbDataSource, String sql) {
if(isOpenSqlCache()){
if (isOpenSqlCache()) {
this.sqlCache.addCache(sql);
}else {
} else {
dbDataSource.execute(sql);
}
}

protected void executeSQL(JDBCDriver dbDataSource, List<String> sqls) {
if (isOpenSqlCache()) {
for (String sql : sqls) {
this.sqlCache.addCache(sql);
}
} else {
dbDataSource.executSqls(sqls);
}
}

/**
Expand All @@ -211,7 +240,7 @@ protected String parseInsertValues(String insertSQL) {
return null;
}
String valuesSQL = insertSQL.substring(start + VALUES_NAME.length());
int end = valuesSQL.toLowerCase().lastIndexOf(")");
int end = valuesSQL.toLowerCase().indexOf(")");
if (end == -1) {
return null;
}
Expand Down Expand Up @@ -262,14 +291,6 @@ public void setPassword(String password) {
this.password = password;
}

public MetaData getMetaData() {
return metaData;
}

public void setMetaData(MetaData metaData) {
this.metaData = metaData;
}

public String getTableName() {
return tableName;
}
Expand All @@ -278,11 +299,28 @@ public void setTableName(String tableName) {
this.tableName = tableName;
}

public String getSqlMode() {
return sqlMode;
}

public void setSqlMode(String sqlMode) {
this.sqlMode = sqlMode;
}

public MetaData getMetaData() {
return metaData;
}

public void setMetaData(MetaData metaData) {
this.metaData = metaData;
}

public boolean isOpenSqlCache() {
return openSqlCache;
}

public void setOpenSqlCache(boolean openSqlCache) {
this.openSqlCache = openSqlCache;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@
package org.apache.rocketmq.streams.db.sink;

import com.google.auto.service.AutoService;
import java.util.List;
import java.util.Properties;
import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.metadata.MetaDataField;
import org.apache.rocketmq.streams.common.model.ServiceName;
import org.apache.rocketmq.streams.common.utils.DataTypeUtil;

@AutoService(IChannelBuilder.class)
@ServiceName(DBSinkBuilder.TYPE)
Expand All @@ -36,29 +33,11 @@ public class DBSinkBuilder implements IChannelBuilder {
public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
DBSink sink = new DBSink();
sink.setUrl(properties.getProperty("url"));
sink.setUserName("userName");
sink.setPassword("password");
List<MetaDataField> fieldList = metaData.getMetaDataFields();
StringBuilder insertSQL = new StringBuilder();
StringBuilder insertValueSQL = new StringBuilder();
boolean isFirst = true;
for (MetaDataField field : fieldList) {
String fieldName = field.getFieldName();
if (isFirst) {
isFirst = false;
} else {
insertSQL.append(",");
insertValueSQL.append(",");
}
insertSQL.append(fieldName);
if (DataTypeUtil.isNumber(field.getDataType())) {
insertValueSQL.append(fieldName);
} else {
insertValueSQL.append("'#{" + fieldName + "}'");
}
}
String sql = "insert into " + properties.getProperty("tableName") + "(" + insertSQL.toString() + ")values(" + insertValueSQL.toString() + ")";
sink.setInsertSQLTemplate(sql);
sink.setUserName(properties.getProperty("userName"));
sink.setPassword(properties.getProperty("password"));
sink.setTableName(properties.getProperty("tableName"));
sink.setSqlMode(properties.getProperty("sqlMode"));
sink.setMetaData(metaData);
return sink;
}

Expand Down
Loading