Skip to content

Commit

Permalink
Merge pull request apache#81 from cw68ster/base-line
Browse files Browse the repository at this point in the history
add batch stream features and cycle schedule
  • Loading branch information
yuanxiaodong authored Oct 18, 2021
2 parents 93db774 + 32c0b8b commit 905aabc
Show file tree
Hide file tree
Showing 85 changed files with 5,418 additions and 116 deletions.
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<module>rocketmq-streams-state</module>
<module>rocketmq-streams-examples</module>
<module>rocketmq-streams-checkpoint</module>

<module>rocketmq-streams-connectors</module>
</modules>

<properties>
Expand Down Expand Up @@ -89,6 +89,8 @@
<scala-library.version>2.12.4</scala-library.version>
<logback-core.version>1.2.3</logback-core.version>
<minio.version>3.0.10</minio.version>
<rocksdbjni.version>6.6.4</rocksdbjni.version>

</properties>


Expand Down Expand Up @@ -283,6 +285,11 @@
<artifactId>rocketmq-streams-channel-rocketmq</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-connectors</artifactId>
<version>${project.version}</version>
</dependency>

<!-- ================================================= -->
<!-- rocketmq library -->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.db;

import java.io.Serializable;

/**
* @description
*/
public class CycleSplit extends DynamicMultipleDBSplit implements Serializable {

private static final long serialVersionUID = 4309494143340650195L;
String cyclePeriod;

public CycleSplit(){

}

public CycleSplit(String version){
this.cyclePeriod = version;
}

@Override
public String getQueueId() {
return String.join("_", logicTableName, suffix, cyclePeriod);
}

public String getCyclePeriod() {
return cyclePeriod;
}

public void setCyclePeriod(String cyclePeriod) {
this.cyclePeriod = cyclePeriod;
}

@Override
public String toString() {
return "CycleSplit{" +
"cyclePeriod='" + cyclePeriod + '\'' +
", suffix='" + suffix + '\'' +
", logicTableName='" + logicTableName + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.db;

import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;

/**
* @description
*/
public class DynamicMultipleDBSplit extends BasedConfigurable implements ISplit<DynamicMultipleDBSplit, String> {

String suffix;
String logicTableName;

public DynamicMultipleDBSplit() {
}

public DynamicMultipleDBSplit(String suffix, String logicTableName) {
this.suffix = suffix;
this.logicTableName = logicTableName;
}

public String getSuffix() {
return suffix;
}

public void setSuffix(String suffix) {
this.suffix = suffix;
}

public String getLogicTableName() {
return logicTableName;
}

public void setLogicTableName(String logicTableName) {
this.logicTableName = logicTableName;
}

@Override
public String getQueueId() {
return logicTableName + "_" + suffix;
}

@Override
public String getPlusQueueId() {
throw new RuntimeException("unsupported getPlusQueueId!");
}

@Override
public String getQueue() {
return logicTableName + "_" + suffix;
}

@Override
public int compareTo(DynamicMultipleDBSplit o) {
return getQueue().compareTo(o.getQueue());
}

@Override
public String toString() {
return "DynamicMultipleDBSplit{" +
"logicTableName='" + logicTableName + '\'' +
", suffix='" + suffix + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.rocketmq.streams.db.sink;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -27,21 +28,34 @@
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.functions.MultiTableSplitFunction;

public abstract class AbstractMultiTableSink extends DBSink {
protected transient ConcurrentHashMap<String, DBSink> tableSinks = new ConcurrentHashMap();
public abstract class AbstractMultiTableSink extends EnhanceDBSink {

protected transient ConcurrentHashMap<String, EnhanceDBSink> tableSinks = new ConcurrentHashMap();
protected transient AtomicLong messageCount = new AtomicLong(0);
protected transient MultiTableSplitFunction<IMessage> multiTableSplitFunction;

public AbstractMultiTableSink(){
}

public AbstractMultiTableSink(String url, String userName, String password) {
this.url = url;
this.userName = userName;
this.password = password;
}

@Override
protected boolean initConfigurable(){
Iterator<EnhanceDBSink> it = tableSinks.values().iterator();
while(it.hasNext()){
it.next().initConfigurable();
}
return true;
}

@Override
public boolean batchAdd(IMessage message, ISplit split) {

DBSink sink = getOrCreateDBSink(split.getQueueId());
EnhanceDBSink sink = getOrCreateDBSink(split.getQueueId());
boolean success = sink.batchAdd(message, split);
long count = messageCount.incrementAndGet();
if (count >= getBatchSize()) {
Expand Down Expand Up @@ -69,50 +83,56 @@ public boolean flush(Set<String> splitIds) {
return true;
}
for (String splitId : splitIds) {
DBSink sink = getOrCreateDBSink(splitId);
EnhanceDBSink sink = getOrCreateDBSink(splitId);
sink.flush();
}
return true;
}

@Override
public boolean flush() {
for (DBSink dbSink : tableSinks.values()) {
for (EnhanceDBSink dbSink : tableSinks.values()) {
dbSink.flush();
}
return true;
}

@Override
public boolean checkpoint(Set<String> splitIds) {
return flush();
}

@Override
public void openAutoFlush() {
for (DBSink dbSink : tableSinks.values()) {
for (EnhanceDBSink dbSink : tableSinks.values()) {
dbSink.openAutoFlush();
}
}

@Override
public void closeAutoFlush() {
for (DBSink dbSink : tableSinks.values()) {
for (EnhanceDBSink dbSink : tableSinks.values()) {
dbSink.closeAutoFlush();
}
}

protected DBSink getOrCreateDBSink(String splitId) {
DBSink sink = this.tableSinks.get(splitId);
protected EnhanceDBSink getOrCreateDBSink(String splitId) {
EnhanceDBSink sink = this.tableSinks.get(splitId);
if (sink != null) {
return sink;
}
sink = new DBSink();
sink = new EnhanceDBSink();
sink.setUrl(url);
sink.setPassword(password);
sink.setUserName(userName);
sink.setTableName(createTableName(splitId));
sink.openAutoFlush();
sink.setBatchSize(batchSize);
sink.setJdbcDriver(this.jdbcDriver);
sink.setMessageCache(new SingleDBSinkCache(sink));
sink.setMultiple(true);
sink.init();
DBSink existDBSink = this.tableSinks.putIfAbsent(splitId, sink);
sink.openAutoFlush();
EnhanceDBSink existDBSink = this.tableSinks.putIfAbsent(splitId, sink);
if (existDBSink != null) {
return existDBSink;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.util.List;
import java.util.Locale;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.IChannel;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
Expand All @@ -34,15 +37,20 @@
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.metadata.MetaData;
import org.apache.rocketmq.streams.common.metadata.MetaDataField;
import org.apache.rocketmq.streams.common.metadata.MetaDataUtils;
import org.apache.rocketmq.streams.common.utils.SQLUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.db.driver.DriverBuilder;
import org.apache.rocketmq.streams.db.driver.JDBCDriver;
import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;

/**
* 主要用于写db,输入可以是一个insert/replace 模版,也可以是metadata对象,二者选一即可。都支持批量插入,提高吞吐 sql 模版:insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}') MetaData:主要是描述每个字段的类型,是否必须 二者选一个即可。sql模式,系统会把一批(batchSize)数据拼成一个大sql。metadata模式,基于字段描述,最终也是拼成一个大sql
*/
public class DBSink extends AbstractSink {

static final Log logger = LogFactory.getLog(DBSink.class);

public static final String SQL_MODE_DEFAULT = "default";
public static final String SQL_MODE_REPLACE = "replace";
public static final String SQL_MODE_IGNORE = "ignore";
Expand All @@ -68,6 +76,9 @@ public class DBSink extends AbstractSink {

protected transient IMessageCache<String> sqlCache;//cache sql, batch submit sql

boolean isMultiple = false; //是否多表


/**
* db串多数是名字,可以取个名字前缀,如果值为空,默认为此类的name,name为空,默认为简单类名
*
Expand Down Expand Up @@ -323,4 +334,51 @@ public void setOpenSqlCache(boolean openSqlCache) {
this.openSqlCache = openSqlCache;
}

public boolean isMultiple() {
return isMultiple;
}

public void setMultiple(boolean multiple) {
isMultiple = multiple;
}

/**
* 获取逻辑表名, 默认 logicTableName _ suffix模式
* @param realTableName
* @return
*/
private final String subStrLogicTableName(String realTableName){
int len = realTableName.lastIndexOf("_");
String logicTableName = realTableName.substring(0, len);
return logicTableName;
}

/**
*
* @param sourceTableName
* @param targetTableName
* @return
*/
private final String getCreateTableSqlFromOther(String sourceTableName, String targetTableName){

String createTableSql = MetaDataUtils.getCreateTableSqlByTableName(url, userName, password, sourceTableName);
if(createTableSql == null){
String errMsg = String.format("source table is not exist. multiple db sink must be dependency logic table meta for auto create sub table. logic table name is ", sourceTableName);
logger.error(errMsg);
throw new RuntimeException(errMsg);
}
createTableSql = createTableSql.replace(sourceTableName, targetTableName);
logger.info(String.format("createTableSql is %s", createTableSql));
return createTableSql;

}

/**
* 多sink场景
* @param createTableSql
*/
private final void createTable(String createTableSql){
ORMUtil.executeSQL(url, userName, password, createTableSql, null);
}

}
Loading

0 comments on commit 905aabc

Please sign in to comment.