diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java index 1aab2bab..886e23ae 100644 --- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java +++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java @@ -18,40 +18,34 @@ import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.util.List; +import java.util.Locale; +import java.util.Set; import org.apache.rocketmq.streams.common.channel.IChannel; import org.apache.rocketmq.streams.common.channel.sink.AbstractSink; import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache; -import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack; import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache; import org.apache.rocketmq.streams.common.component.AbstractComponent; import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence; import org.apache.rocketmq.streams.common.context.IMessage; import org.apache.rocketmq.streams.common.metadata.MetaData; +import org.apache.rocketmq.streams.common.metadata.MetaDataField; import org.apache.rocketmq.streams.common.utils.SQLUtil; import org.apache.rocketmq.streams.common.utils.StringUtil; import org.apache.rocketmq.streams.db.driver.DriverBuilder; import org.apache.rocketmq.streams.db.driver.JDBCDriver; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.List; -import java.util.Set; - /** * 主要用于写db,输入可以是一个insert/replace 模版,也可以是metadata对象,二者选一即可。都支持批量插入,提高吞吐 sql 模版:insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}') MetaData:主要是描述每个字段的类型,是否必须 二者选一个即可。sql模式,系统会把一批(batchSize)数据拼成一个大sql。metadata模式,基于字段描述,最终也是拼成一个大sql */ public class DBSink extends AbstractSink { - - protected String insertSQLTemplate;//完成插入部分的工作,和metadata二选一。insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}') - - protected String duplicateSQLTemplate; //通过on duplicate key update 来对已经存在的信息进行更新 - - protected MetaData metaData;//可以指定meta data,和insertSQL二选一 - - protected String tableName; //指定要插入的数据表 + public static final String SQL_MODE_DEFAULT = "default"; + public static final String SQL_MODE_REPLACE = "replace"; + public static final String SQL_MODE_IGNORE = "ignore"; @ENVDependence protected String jdbcDriver = AbstractComponent.DEFAULT_JDBC_DRIVER; @@ -60,7 +54,15 @@ public class DBSink extends AbstractSink { @ENVDependence protected String userName; @ENVDependence + protected String tableName; //指定要插入的数据表 + @ENVDependence protected String password; + @ENVDependence + protected String sqlMode; + + protected MetaData metaData;//可以指定meta data,和insertSQL二选一 + + protected String insertSQLTemplate;//完成插入部分的工作,和metadata二选一。insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}') protected boolean openSqlCache = true; @@ -87,60 +89,83 @@ public DBSink(String insertSQL, String dbInfoNamePrefix) { } public DBSink() { - setType(IChannel.TYPE); + this(null, null, null, null); } - public DBSink(String url, String userName, String password) { - setType(IChannel.TYPE); - this.url = url; - this.userName = userName; - this.password = password; + public DBSink(String url, String userName, String password, String tableName) { + this(url, userName, password, tableName, SQL_MODE_DEFAULT); } - public DBSink(String insertSQL, String url, String userName, String password) { + public DBSink(String url, String userName, String password, String tableName, String sqlMode) { + this(url, userName, password, tableName, sqlMode, null); + } + + public DBSink(String url, String userName, String password, String tableName, String sqlMode, MetaData metaData) { setType(IChannel.TYPE); this.url = url; this.userName = userName; this.password = password; - this.insertSQLTemplate = insertSQL; + this.tableName = tableName; + this.sqlMode = sqlMode; + this.metaData = metaData; } @Override protected boolean initConfigurable() { - try { - Class.forName("com.mysql.jdbc.Driver"); - if (StringUtil.isNotEmpty(this.tableName)) { - Connection connection = DriverManager.getConnection(url, userName, password); - DatabaseMetaData metaData = connection.getMetaData(); - ResultSet metaResult = metaData.getColumns(connection.getCatalog(), "%", this.tableName, null); - this.metaData = MetaData.createMetaData(metaResult); - this.metaData.setTableName(this.tableName); - } - sqlCache = new MessageCache<>(new IMessageFlushCallBack() { - @Override - public boolean flushMessage(List sqls) { - JDBCDriver dataSource = DriverBuilder.createDriver(jdbcDriver, url, userName, password); - try { - dataSource.executSqls(sqls); - } catch (Exception e) { - e.printStackTrace(); - throw new RuntimeException(e); - } finally { - if (dataSource != null) { - dataSource.destroy(); - } - } - return true; + if (this.metaData == null) { + try { + Class.forName("com.mysql.jdbc.Driver"); + if (StringUtil.isNotEmpty(this.tableName)) { + Connection connection = DriverManager.getConnection(this.url, this.userName, this.password); + DatabaseMetaData connectionMetaData = connection.getMetaData(); + ResultSet metaResult = connectionMetaData.getColumns(connection.getCatalog(), "%", this.tableName, null); + this.metaData = MetaData.createMetaData(metaResult); + this.metaData.setTableName(this.tableName); } - }); - ((MessageCache) sqlCache).setAutoFlushTimeGap(100000); - ((MessageCache) sqlCache).setAutoFlushSize(50); - sqlCache.openAutoFlush(); - return super.initConfigurable(); - } catch (ClassNotFoundException | SQLException e) { - e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } + } + List fieldList = this.metaData.getMetaDataFields(); + List insertFields = Lists.newArrayList(); + List insertValues = Lists.newArrayList(); + List duplicateKeys = Lists.newArrayList(); + fieldList.forEach(field -> { + String fieldName = field.getFieldName(); + insertFields.add(fieldName); + insertValues.add("'#{" + fieldName + "}'"); + duplicateKeys.add(fieldName + " = VALUES(" + fieldName + ")"); + }); + + String sql = "insert"; + if (sqlMode == null || SQL_MODE_DEFAULT.equals(sqlMode)) { + sql = sql + " into "; + } else if (SQL_MODE_IGNORE.equals(sqlMode)) { + sql = sql + " ignore into "; + } else if (SQL_MODE_REPLACE.equals(sqlMode)) { + sql = sql + " into "; } - return false; + sql = sql + tableName + "(" + String.join(",", insertFields) + ") values (" + String.join(",", insertValues) + ") "; + if (SQL_MODE_REPLACE.equals(sqlMode)) { + sql = sql + " on duplicate key update " + String.join(",", duplicateKeys); + } + this.insertSQLTemplate = sql; + this.sqlCache = new MessageCache<>(sqls -> { + JDBCDriver dataSource = DriverBuilder.createDriver(jdbcDriver, url, userName, password); + try { + dataSource.executSqls(sqls); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } finally { + dataSource.destroy(); + } + return true; + }); + ((MessageCache) this.sqlCache).setAutoFlushTimeGap(100000); + ((MessageCache) this.sqlCache).setAutoFlushSize(50); + this.sqlCache.openAutoFlush(); + return super.initConfigurable(); } @Override @@ -154,7 +179,6 @@ protected boolean batchInsert(List messageList) { if (StringUtil.isEmpty(insertSQLTemplate) && metaData != null) { String sql = SQLUtil.createInsertSql(metaData, messages.get(0)); sql += SQLUtil.createInsertValuesSQL(metaData, messages.subList(1, messages.size())); - sql += this.duplicateSQLTemplate; executeSQL(dbDataSource, sql); return true; } @@ -162,7 +186,6 @@ protected boolean batchInsert(List messageList) { if (StringUtil.isEmpty(insertValueSQL) || insertSQLTemplate.replace(insertValueSQL, "").contains("#{")) { for (JSONObject message : messages) { String sql = parseSQL(message, insertSQLTemplate); - sql += this.duplicateSQLTemplate; executeSQL(dbDataSource, sql); } return true; @@ -172,7 +195,6 @@ protected boolean batchInsert(List messageList) { subInsert.add(parseSQL(message, insertValueSQL)); } String insertSQL = this.insertSQLTemplate.replace(insertValueSQL, String.join(",", subInsert)); - insertSQL += this.duplicateSQLTemplate; executeSQL(dbDataSource, insertSQL); return true; } @@ -195,7 +217,16 @@ protected void executeSQL(JDBCDriver dbDataSource, String sql) { } else { dbDataSource.execute(sql); } + } + protected void executeSQL(JDBCDriver dbDataSource, List sqls) { + if (isOpenSqlCache()) { + for (String sql : sqls) { + this.sqlCache.addCache(sql); + } + } else { + dbDataSource.executSqls(sqls); + } } /** @@ -209,7 +240,7 @@ protected String parseInsertValues(String insertSQL) { return null; } String valuesSQL = insertSQL.substring(start + VALUES_NAME.length()); - int end = valuesSQL.toLowerCase().lastIndexOf(")"); + int end = valuesSQL.toLowerCase().indexOf(")"); if (end == -1) { return null; } @@ -260,14 +291,6 @@ public void setPassword(String password) { this.password = password; } - public MetaData getMetaData() { - return metaData; - } - - public void setMetaData(MetaData metaData) { - this.metaData = metaData; - } - public String getTableName() { return tableName; } @@ -276,6 +299,22 @@ public void setTableName(String tableName) { this.tableName = tableName; } + public String getSqlMode() { + return sqlMode; + } + + public void setSqlMode(String sqlMode) { + this.sqlMode = sqlMode; + } + + public MetaData getMetaData() { + return metaData; + } + + public void setMetaData(MetaData metaData) { + this.metaData = metaData; + } + public boolean isOpenSqlCache() { return openSqlCache; } @@ -284,11 +323,4 @@ public void setOpenSqlCache(boolean openSqlCache) { this.openSqlCache = openSqlCache; } - public String getDuplicateSQLTemplate() { - return duplicateSQLTemplate; - } - - public void setDuplicateSQLTemplate(String duplicateSQLTemplate) { - this.duplicateSQLTemplate = duplicateSQLTemplate; - } } diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java index 87f5fb77..1743f627 100644 --- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java +++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java @@ -17,17 +17,13 @@ package org.apache.rocketmq.streams.db.sink; import com.google.auto.service.AutoService; -import com.google.common.collect.Lists; +import java.util.Properties; import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder; import org.apache.rocketmq.streams.common.channel.sink.ISink; import org.apache.rocketmq.streams.common.channel.source.ISource; import org.apache.rocketmq.streams.common.metadata.MetaData; -import org.apache.rocketmq.streams.common.metadata.MetaDataField; import org.apache.rocketmq.streams.common.model.ServiceName; -import java.util.List; -import java.util.Properties; - @AutoService(IChannelBuilder.class) @ServiceName(DBSinkBuilder.TYPE) public class DBSinkBuilder implements IChannelBuilder { @@ -39,21 +35,9 @@ public ISink createSink(String namespace, String name, Properties properties, Me sink.setUrl(properties.getProperty("url")); sink.setUserName(properties.getProperty("userName")); sink.setPassword(properties.getProperty("password")); - List fieldList = metaData.getMetaDataFields(); - - List insertFields = Lists.newArrayList(); - List insertValues = Lists.newArrayList(); - List duplicateKeys = Lists.newArrayList(); - fieldList.forEach(field -> { - String fieldName = field.getFieldName(); - insertFields.add(fieldName); - insertValues.add("'#{" + fieldName + "}'"); - duplicateKeys.add(fieldName + " = VALUES(" + fieldName + ")"); - }); - - String sql = "insert into " + properties.getProperty("tableName") + "(" + String.join(",", insertFields) + ") values (" + String.join(",", insertValues) + ") "; - sink.setInsertSQLTemplate(sql); - sink.setDuplicateSQLTemplate(" on duplicate key update " + String.join(",", duplicateKeys)); + sink.setTableName(properties.getProperty("tableName")); + sink.setSqlMode(properties.getProperty("sqlMode")); + sink.setMetaData(metaData); return sink; } diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java index f9260eab..08bfe7f0 100644 --- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java +++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java @@ -73,7 +73,8 @@ public DataStream(PipelineBuilder pipelineBuilder, ChainStage currentChainSta this.currentChainStage = currentChainStage; } - public DataStream(PipelineBuilder pipelineBuilder, Set pipelineBuilders, ChainStage currentChainStage) { + public DataStream(PipelineBuilder pipelineBuilder, Set pipelineBuilders, + ChainStage currentChainStage) { this.mainPipelineBuilder = pipelineBuilder; this.otherPipelineBuilders = pipelineBuilders; this.currentChainStage = currentChainStage; @@ -97,11 +98,11 @@ public DataStream map(MapFunction mapFunction) { @Override protected T operate(IMessage message, AbstractContext context) { try { - O o = (O)(message.getMessageValue()); - T result = (T)mapFunction.map(o); + O o = (O) (message.getMessageValue()); + T result = (T) mapFunction.map(o); if (result != message.getMessageValue()) { if (result instanceof JSONObject) { - message.setMessageBody((JSONObject)result); + message.setMessageBody((JSONObject) result); } else { message.setMessageBody(new UserDefinedMessage(result)); } @@ -118,28 +119,28 @@ protected T operate(IMessage message, AbstractContext context) { return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, stage); } - public DataStream flatMap(FlatMapFunction mapFunction) { StageBuilder stageBuilder = new StageBuilder() { @Override protected T operate(IMessage message, AbstractContext context) { try { - O o = (O)(message.getMessageValue()); - List result =(List)mapFunction.flatMap(o); - if(result==null||result.size()==0){ + O o = (O) (message.getMessageValue()); + List result = (List) mapFunction.flatMap(o); + if (result == null || result.size() == 0) { context.breakExecute(); } - List splitMessages=new ArrayList<>(); - for(T t:result){ - Message subMessage=null; + List splitMessages = new ArrayList<>(); + for (T t : result) { + Message subMessage = null; if (result instanceof JSONObject) { - subMessage=new Message((JSONObject)t); + subMessage = new Message((JSONObject) t); } else { - subMessage=new Message(new UserDefinedMessage(t)); + subMessage = new Message(new UserDefinedMessage(t)); } splitMessages.add(subMessage); } - context.openSplitModel();; + context.openSplitModel(); + ; context.setSplitMessages(splitMessages); return null; } catch (Exception e) { @@ -159,7 +160,7 @@ public DataStream filter(final FilterFunction filterFunction) { @Override protected T operate(IMessage message, AbstractContext context) { try { - boolean isFilter = filterFunction.filter((O)message.getMessageValue()); + boolean isFilter = filterFunction.filter((O) message.getMessageValue()); if (isFilter) { context.breakExecute(); } @@ -232,7 +233,7 @@ public DataStream union(DataStream rightStream) { Union union = new Union(); //处理左流,做流的isMain设置成true - UDFUnionChainStage chainStage = (UDFUnionChainStage)this.mainPipelineBuilder.createStage(union); + UDFUnionChainStage chainStage = (UDFUnionChainStage) this.mainPipelineBuilder.createStage(union); chainStage.setMainStream(true); this.mainPipelineBuilder.setTopologyStages(currentChainStage, chainStage); @@ -272,7 +273,8 @@ protected T operate(IMessage message, AbstractContext context) { * @param sqlOrTableName * @return */ - public JoinStream join(String url, String userName, String password, String sqlOrTableName, long pollingTimeMintue) { + public JoinStream join(String url, String userName, String password, String sqlOrTableName, + long pollingTimeMintue) { return join(url, userName, password, sqlOrTableName, null, pollingTimeMintue); } @@ -285,7 +287,8 @@ public JoinStream join(String url, String userName, String password, String sqlO * @param sqlOrTableName * @return */ - public JoinStream join(String url, String userName, String password, String sqlOrTableName, String jdbcDriver, long pollingTimeMinute) { + public JoinStream join(String url, String userName, String password, String sqlOrTableName, String jdbcDriver, + long pollingTimeMinute) { DBDim dbDim = new DBDim(); dbDim.setUrl(url); dbDim.setUserName(userName); @@ -308,7 +311,7 @@ public DataStream forEach(ForEachFunction forEachFunction) { StageBuilder selfChainStage = new StageBuilder() { @Override protected T operate(IMessage message, AbstractContext context) { - forEachFunction.foreach((O)message.getMessageValue()); + forEachFunction.foreach((O) message.getMessageValue()); return null; } }; @@ -367,7 +370,6 @@ protected void start(boolean isAsyn) { return; } - ConfigurableComponent configurableComponent = ComponentCreator.getComponent(mainPipelineBuilder.getPipelineNameSpace(), ConfigurableComponent.class, ConfigureFileKey.CONNECT_TYPE + ":memory"); ChainPipeline pipeline = this.mainPipelineBuilder.build(configurableComponent.getService()); pipeline.startChannel(); @@ -403,21 +405,23 @@ protected void addOtherDataStream(DataStream rightSource) { this.otherPipelineBuilders.addAll(rightSource.otherPipelineBuilders); } - public DataStreamAction toFile(String filePath,int batchSize,boolean isAppend) { - FileSink fileChannel = new FileSink(filePath,isAppend); - if(batchSize>0){ + public DataStreamAction toFile(String filePath, int batchSize, boolean isAppend) { + FileSink fileChannel = new FileSink(filePath, isAppend); + if (batchSize > 0) { fileChannel.setBatchSize(batchSize); } ChainStage output = mainPipelineBuilder.createStage(fileChannel); mainPipelineBuilder.setTopologyStages(currentChainStage, output); return new DataStreamAction(this.mainPipelineBuilder, this.otherPipelineBuilders, output); } - public DataStreamAction toFile(String filePath,boolean isAppend) { - FileSink fileChannel = new FileSink(filePath,isAppend); + + public DataStreamAction toFile(String filePath, boolean isAppend) { + FileSink fileChannel = new FileSink(filePath, isAppend); ChainStage output = mainPipelineBuilder.createStage(fileChannel); mainPipelineBuilder.setTopologyStages(currentChainStage, output); return new DataStreamAction(this.mainPipelineBuilder, this.otherPipelineBuilders, output); } + public DataStreamAction toFile(String filePath) { FileSink fileChannel = new FileSink(filePath); ChainStage output = mainPipelineBuilder.createStage(fileChannel); @@ -440,28 +444,26 @@ public DataStreamAction toPrint(int batchSize) { } public DataStreamAction toDB(String url, String userName, String password, String tableName) { - DBSink dbChannel = new DBSink(url, userName, password); - dbChannel.setTableName(tableName); + DBSink dbChannel = new DBSink(url, userName, password, tableName); ChainStage output = this.mainPipelineBuilder.createStage(dbChannel); this.mainPipelineBuilder.setTopologyStages(currentChainStage, output); return new DataStreamAction(this.mainPipelineBuilder, this.otherPipelineBuilders, output); } public DataStreamAction toRocketmq(String topic) { - return toRocketmq(topic, "*", null,-1, null); + return toRocketmq(topic, "*", null, -1, null); } - - public DataStreamAction toRocketmq(String topic,String namesrvAddr) { - return toRocketmq(topic, "*", null,-1, namesrvAddr); + public DataStreamAction toRocketmq(String topic, String namesrvAddr) { + return toRocketmq(topic, "*", null, -1, namesrvAddr); } public DataStreamAction toRocketmq(String topic, String tags, - String namesrvAddr) { - return toRocketmq(topic, tags,null,-1, namesrvAddr); + String namesrvAddr) { + return toRocketmq(topic, tags, null, -1, namesrvAddr); } - public DataStreamAction toRocketmq(String topic, String tags,String groupName, int batchSize, String namesrvAddr) { + public DataStreamAction toRocketmq(String topic, String tags, String groupName, int batchSize, String namesrvAddr) { RocketMQSink rocketMQSink = new RocketMQSink(); rocketMQSink.setTopic(topic); rocketMQSink.setTags(tags); diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/AbstractMetaData.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/AbstractMetaData.java index 7efd6d3a..59f7464e 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/AbstractMetaData.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/AbstractMetaData.java @@ -197,7 +197,7 @@ public String toJson() { Iterator i$ = this.metaDataFields.iterator(); while (i$.hasNext()) { - MetaDataField field = (MetaDataField)i$.next(); + MetaDataField field = (MetaDataField) i$.next(); jsonArray.add(field.toJson()); } diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaData.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaData.java index 3deec903..b4f04d3c 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaData.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaData.java @@ -164,7 +164,7 @@ public static MetaData createMetaDate(Object object, Map paras) } public static String getTableName(Class clazz) { - TableClassName tableClass = (TableClassName)clazz.getAnnotation(TableClassName.class); + TableClassName tableClass = (TableClassName) clazz.getAnnotation(TableClassName.class); if (tableClass != null) { String className = tableClass.value(); if (StringUtil.isNotEmpty(className)) { diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataField.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataField.java index 9ec882ba..5a035ae9 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataField.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataField.java @@ -116,7 +116,7 @@ public String toJson() { JSONObject jsonObject = new JSONObject(); jsonObject.put("fieldName", fieldName); if (dataType == null) { - dataType = (DataType)new StringDataType(); + dataType = (DataType) new StringDataType(); } jsonObject.put("dataType", dataType.toJson()); jsonObject.put("isRequired", isRequired); @@ -134,8 +134,8 @@ public void toObject(String jsonString) { this.isPrimary = jsonObject.getBoolean("isPrimary"); } - public static DataType getDataTypeByStr(String dataType) { - DataType dt = null; + public static DataType getDataTypeByStr(String dataType) { + DataType dt = null; if ("String".equals(dataType)) { dt = new StringDataType(); } else if ("long".equals(dataType)) { @@ -152,22 +152,20 @@ public static DataType getDataTypeByStr(String dataType) { return dt; } - public static String getDataTypeStrByType(DataType dataType) { - String dataTypestr = ""; - if (StringDataType.class.isInstance(dataType)) { - dataTypestr = "String"; - } else if (LongDataType.class.isInstance(dataType)) { - dataTypestr = "long"; - } else if (IntDataType.class.isInstance(dataType)) { - dataTypestr = "int"; - } else if (FloatDataType.class.isInstance(dataType)) { - dataTypestr = "float"; - } else if (Boolean.class.isInstance(dataType)) { - dataTypestr = "boolean"; + public static String getDataTypeStrByType(DataType dataType) { + String dataTypeStr = ""; + if (dataType instanceof StringDataType) { + dataTypeStr = "String"; + } else if (dataType instanceof LongDataType) { + dataTypeStr = "long"; + } else if (dataType instanceof IntDataType) { + dataTypeStr = "int"; + } else if (dataType instanceof FloatDataType) { + dataTypeStr = "float"; } else { - dataTypestr = "String"; + dataTypeStr = "String"; } - return dataTypestr; + return dataTypeStr; } } diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java index 5cbac0f8..a715ca3d 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java @@ -82,13 +82,12 @@ protected IMessage doProcess(IMessage message, AbstractContext context) { */ if (openMockChannel()) { if (mockSink != null) { - mockSink.batchAdd(message.deepCopy()); + mockSink.batchAdd(message); return message; } return message; } - sink.batchAdd(message.deepCopy()); - + sink.batchAdd(message); return message; }