diff --git a/rocketmq-streams-channel-rocketmq/pom.xml b/rocketmq-streams-channel-rocketmq/pom.xml
index c8d2cb02..83983e26 100644
--- a/rocketmq-streams-channel-rocketmq/pom.xml
+++ b/rocketmq-streams-channel-rocketmq/pom.xml
@@ -56,6 +56,11 @@
org.apache.rocketmq
rocketmq-acl
+
+ org.apache.rocketmq
+ schema-registry-client
+ 0.0.4-SNAPSHOT
+
junit
junit
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/schema/AvroSchemaWrapper.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/schema/AvroSchemaWrapper.java
new file mode 100644
index 00000000..a449b8cf
--- /dev/null
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/schema/AvroSchemaWrapper.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.schema;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
+
+public class AvroSchemaWrapper implements SchemaWrapper {
+
+ private static final Log LOG = LogFactory.getLog(AvroSchemaWrapper.class);
+
+ private SpecificAvroSerde avroSerde;
+
+ private SchemaConfig schemaConfig;
+
+ public AvroSchemaWrapper(SchemaConfig schemaConfig) {
+ this.schemaConfig = schemaConfig;
+ try {
+ if (schemaConfig.getSchemaRegistryUrl() == null) {
+ avroSerde = new SpecificAvroSerde();
+ Map configs = new HashMap<>();
+ configs.put(AvroSerializerConfig.DESERIALIZE_TARGET_TYPE, Class.forName(schemaConfig.getClassName()));
+ configs.put(AvroSerializerConfig.SKIP_SCHEMA_REGISTRY, true);
+ avroSerde.configure(configs);
+ } else {
+ SchemaRegistryClient schemaRegistryClient =
+ SchemaRegistryClientFactory.newClient(schemaConfig.getSchemaRegistryUrl(), null);
+ avroSerde = new SpecificAvroSerde(schemaRegistryClient);
+ Map configs = new HashMap<>();
+ configs.put(AvroSerializerConfig.DESERIALIZE_TARGET_TYPE, Class.forName(schemaConfig.getClassName()));
+ avroSerde.configure(configs);
+ }
+ } catch (Exception e) {
+ LOG.error("init AvroSchemaWrapper failed, " + schemaConfig.toString(), e);
+ throw new RuntimeException("init AvroSchemaWrapper failed");
+ }
+ }
+
+ @Override
+ public Object deserialize(MessageExt messageExt) {
+ String subject = messageExt.getTopic();
+ byte[] msgBody = messageExt.getBody();
+ return avroSerde.deserializer().deserialize(subject, msgBody);
+ }
+
+ @Override
+ public SchemaConfig getConfig() {
+ return schemaConfig;
+ }
+}
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/schema/JsonSchemaWrapper.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/schema/JsonSchemaWrapper.java
new file mode 100644
index 00000000..39081c0d
--- /dev/null
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/schema/JsonSchemaWrapper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.schema;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.serde.json.JsonSerde;
+
+/**
+ * @author huitong
+ */
+public class JsonSchemaWrapper implements SchemaWrapper {
+
+ private static final Log LOG = LogFactory.getLog(JsonSchemaWrapper.class);
+
+ private JsonSerde jsonSerde;
+
+ private SchemaConfig schemaConfig;
+
+ public JsonSchemaWrapper(SchemaConfig schemaConfig) {
+ this.schemaConfig = schemaConfig;
+ try {
+ if (schemaConfig.getSchemaRegistryUrl() == null) {
+ jsonSerde = new JsonSerde();
+ Map configs = new HashMap<>();
+ configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE, Class.forName(schemaConfig.getClassName()));
+ configs.put(JsonSerializerConfig.SKIP_SCHEMA_REGISTRY, true);
+ jsonSerde.configure(configs);
+ } else {
+ SchemaRegistryClient schemaRegistryClient =
+ SchemaRegistryClientFactory.newClient(schemaConfig.getSchemaRegistryUrl(), null);
+ jsonSerde = new JsonSerde(schemaRegistryClient);
+ Map configs = new HashMap<>();
+ configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE, Class.forName(schemaConfig.getClassName()));
+ jsonSerde.configure(configs);
+ }
+ } catch (Exception e) {
+ LOG.error("init AvroSchemaWrapper failed, " + schemaConfig.toString(), e);
+ throw new RuntimeException("init AvroSchemaWrapper failed");
+ }
+ }
+
+ @Override
+ public Object deserialize(MessageExt messageExt) {
+
+ String subject = messageExt.getTopic();
+ byte[] msgBody = messageExt.getBody();
+ return jsonSerde.deserializer().deserialize(subject, msgBody);
+ }
+
+ @Override
+ public SchemaConfig getConfig() {
+ return schemaConfig;
+ }
+}
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/schema/SchemaConfig.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/schema/SchemaConfig.java
new file mode 100644
index 00000000..831d1e46
--- /dev/null
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/schema/SchemaConfig.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.schema;
+
+import java.io.Serializable;
+import org.apache.commons.lang3.StringUtils;
+
+public class SchemaConfig implements Serializable {
+
+ /**
+ * json, avro, protobuf, kyro, thrift, etc.
+ */
+ private String schemaType;
+
+ /**
+ * allowed to be null, if null means skip schema registry
+ */
+ private String schemaRegistryUrl;
+
+ /**
+ * deserialize target class
+ */
+ private String className;
+
+ public SchemaConfig() {
+ }
+
+ public SchemaConfig(SchemaType schemaType, Class targetClass) {
+ this.schemaType = schemaType.name();
+ this.className = targetClass.getName();
+ }
+
+ public SchemaConfig(SchemaType schemaType, Class targetClass, String schemaRegistryUrl) {
+ this.schemaType = schemaType.name();
+ this.schemaRegistryUrl = schemaRegistryUrl;
+ this.className = targetClass.getName();
+ }
+
+ public String getSchemaType() {
+ return schemaType;
+ }
+
+ public void setSchemaType(String schemaType) {
+ this.schemaType = schemaType;
+ }
+
+ public String getSchemaRegistryUrl() {
+ return schemaRegistryUrl;
+ }
+
+ public void setSchemaRegistryUrl(String schemaRegistryUrl) {
+ this.schemaRegistryUrl = schemaRegistryUrl;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+
+ public void setClassName(String className) {
+ this.className = className;
+ }
+
+ @Override
+ public String toString() {
+ return "SchemaConfig{" +
+ "schemaType='" + schemaType + '\'' +
+ ", schemaRegistryUrl='" + schemaRegistryUrl + '\'' +
+ ", className='" + className + '\'' +
+ '}';
+ }
+
+ public boolean equals(SchemaConfig configToCompare) {
+ if (!StringUtils.equals(getSchemaType(), configToCompare.getSchemaType())) {
+ return false;
+ }
+ if (!StringUtils.equals(getClassName(), configToCompare.getClassName())) {
+ return false;
+ }
+ if (!StringUtils.equals(getSchemaRegistryUrl(), configToCompare.getSchemaRegistryUrl())) {
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/schema/SchemaType.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/schema/SchemaType.java
new file mode 100644
index 00000000..7d7b714e
--- /dev/null
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/schema/SchemaType.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.schema;
+
+public enum SchemaType {
+
+ /**
+ * Avro type
+ */
+ AVRO("AVRO"),
+ /**
+ * Protobuf type
+ */
+ PROTOBUF("PROTOBUF"),
+ /**
+ * Thrift type
+ */
+ THRIFT("THRIFT"),
+ /**
+ * Json type
+ */
+ JSON("JSON"),
+ /**
+ * Text type for reserved
+ */
+ TEXT("TEXT"),
+ /**
+ * Binlog type for reserved
+ */
+ BINLOG("BINLOG");
+
+ private final String value;
+
+ SchemaType(final String value) {
+ this.value = value;
+ }
+}
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/schema/SchemaWrapper.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/schema/SchemaWrapper.java
new file mode 100644
index 00000000..c130505c
--- /dev/null
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/schema/SchemaWrapper.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.schema;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+public interface SchemaWrapper {
+
+ Object deserialize(MessageExt messageExt);
+
+ SchemaConfig getConfig();
+}
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/schema/SchemaWrapperFactory.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/schema/SchemaWrapperFactory.java
new file mode 100644
index 00000000..b50cfa92
--- /dev/null
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/schema/SchemaWrapperFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.schema;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SchemaWrapperFactory {
+
+ private static Map schemaWrapperCache = new HashMap<>();
+
+ public static SchemaWrapper createIfAbsent(String topic, SchemaConfig schemaConfig) {
+ SchemaWrapper schemaWrapper = schemaWrapperCache.get(topic);
+ if (schemaWrapper != null) {
+ // if change config, also need to create a new one
+ if (schemaConfig.equals(schemaWrapper.getConfig())) {
+ return schemaWrapper;
+ }
+ }
+ if (SchemaType.JSON.name().equals(schemaConfig.getSchemaType())) {
+ JsonSchemaWrapper jsonSchemaWrapper = new JsonSchemaWrapper(schemaConfig);
+ schemaWrapperCache.put(topic, jsonSchemaWrapper);
+ return jsonSchemaWrapper;
+ } else if (SchemaType.AVRO.name().equals(schemaConfig.getSchemaType())) {
+ AvroSchemaWrapper avroSchemaWrapper = new AvroSchemaWrapper(schemaConfig);
+ schemaWrapperCache.put(topic, avroSchemaWrapper);
+ return avroSchemaWrapper;
+ } else {
+ throw new RuntimeException("scheme type " + schemaConfig.getSchemaType() + " not supported");
+ }
+ }
+
+}
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
index 190b06f5..b74d932f 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
@@ -50,23 +50,29 @@
import org.apache.rocketmq.streams.common.channel.source.AbstractSupportShuffleSource;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
+import org.apache.rocketmq.streams.common.context.UserDefinedMessage;
import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;
+import org.apache.rocketmq.streams.schema.SchemaConfig;
+import org.apache.rocketmq.streams.schema.SchemaWrapper;
+import org.apache.rocketmq.streams.schema.SchemaWrapperFactory;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
public class RocketMQSource extends AbstractSupportShuffleSource {
protected static final Log LOG = LogFactory.getLog(RocketMQSource.class);
- private static final String STRATEGY_AVERAGE = "average";
-
@ENVDependence
private String tags = SubscriptionData.SUB_ALL;
private int userPullThreadNum = 1;
private long pullTimeout;
private long commitInternalMs = 1000;
- private String strategyName;
- private transient ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET;//默认从哪里消费,不会被持久化。不设置默认从尾部消费
+
+ private SchemaConfig schemaConfig;
+ /**
+ * 默认从哪里消费,不会被持久化。不设置默认从尾部消费
+ */
+ private transient ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET;
private RPCHook rpcHook;
private transient DefaultLitePullConsumer pullConsumer;
private transient ExecutorService executorService;
@@ -77,15 +83,10 @@ public RocketMQSource() {
}
public RocketMQSource(String topic, String tags, String groupName, String namesrvAddr) {
- this(topic, tags, groupName, namesrvAddr, STRATEGY_AVERAGE);
- }
-
- public RocketMQSource(String topic, String tags, String groupName, String namesrvAddr, String strategyName) {
this.topic = topic;
this.tags = tags;
this.groupName = groupName;
this.namesrvAddr = namesrvAddr;
- this.strategyName = strategyName;
}
@Override
@@ -190,20 +191,20 @@ private Map getMessageQueueAllocationResult(DefaultMQAdmin
try {
ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo(groupName);
- Iterator var5 = consumerConnection.getConnectionSet().iterator();
+ Iterator iterator = consumerConnection.getConnectionSet().iterator();
- while (var5.hasNext()) {
- Connection connection = (Connection) var5.next();
+ while (iterator.hasNext()) {
+ Connection connection = (Connection) iterator.next();
String clientId = connection.getClientId();
ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId, false);
- Iterator var9 = consumerRunningInfo.getMqTable().keySet().iterator();
+ Iterator iterator1 = consumerRunningInfo.getMqTable().keySet().iterator();
- while (var9.hasNext()) {
- MessageQueue messageQueue = (MessageQueue) var9.next();
+ while (iterator1.hasNext()) {
+ MessageQueue messageQueue = (MessageQueue) iterator1.next();
results.put(messageQueue, clientId.split("@")[1]);
}
}
- } catch (Exception var11) {
+ } catch (Exception ex) {
;
}
@@ -334,9 +335,9 @@ public void run() {
List msgs = pullConsumer.poll(pullTimeout);
- int i = 0;
- for (MessageExt msg : msgs) {
- JSONObject jsonObject = create(msg.getBody(), msg.getProperties());
+ int i = 0;
+ for (MessageExt msg : msgs) {
+ JSONObject jsonObject = createFromMsg(msg);
String topic = msg.getTopic();
int queueId = msg.getQueueId();
@@ -345,9 +346,10 @@ public void run() {
String unionQueueId = RocketMQMessageQueue.getQueueId(queue);
- String offset = msg.getQueueOffset() + "";
- org.apache.rocketmq.streams.common.context.Message message = createMessage(jsonObject, unionQueueId, offset, false);
- message.getHeader().setOffsetIsLong(true);
+ String offset = msg.getQueueOffset() + "";
+ org.apache.rocketmq.streams.common.context.Message message =
+ createMessage(jsonObject, unionQueueId, offset, false);
+ message.getHeader().setOffsetIsLong(true);
if (i == msgs.size() - 1) {
message.getHeader().setNeedFlush(true);
@@ -380,7 +382,26 @@ public void shutdown() {
}
}
- private void newRebalance(Set allQueueInLastRebalance) {
+ /**
+ * 从 rocketmq 消息转为可被后续环节处理的jsonObject
+ * @param messageExt
+ * @return
+ */
+ public JSONObject createFromMsg(MessageExt messageExt) {
+ if (schemaConfig != null) {
+ try {
+ SchemaWrapper schemaWrapper =
+ SchemaWrapperFactory.createIfAbsent(messageExt.getTopic(), schemaConfig);
+ Object pojo = schemaWrapper.deserialize(messageExt);
+ return new UserDefinedMessage(pojo);
+ } catch (Exception ex) {
+ LOG.error("deserialize with schema failed, try to deserialize directly to json", ex);
+ }
+ }
+ return create(messageExt.getBody(), messageExt.getProperties());
+ }
+
+ private void newRebalance(Set allQueueInLastRebalance){
Set temp = new HashSet<>();
for (MessageQueue queue : allQueueInLastRebalance) {
String unionQueueId = RocketMQMessageQueue.getQueueId(queue);
@@ -406,14 +427,6 @@ public void setPullTimeout(Long pullTimeout) {
this.pullTimeout = pullTimeout;
}
- public String getStrategyName() {
- return strategyName;
- }
-
- public void setStrategyName(String strategyName) {
- this.strategyName = strategyName;
- }
-
public RPCHook getRpcHook() {
return rpcHook;
}
@@ -438,5 +451,11 @@ public void setCommitInternalMs(long commitInternalMs) {
this.commitInternalMs = commitInternalMs;
}
+ public SchemaConfig getSchemaConfig() {
+ return schemaConfig;
+ }
+ public void setSchemaConfig(SchemaConfig schemaConfig) {
+ this.schemaConfig = schemaConfig;
+ }
}
\ No newline at end of file
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
index 35668b95..3c2e3d12 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
@@ -46,6 +46,7 @@
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
import org.apache.rocketmq.streams.mqtt.source.PahoSource;
+import org.apache.rocketmq.streams.schema.SchemaConfig;
import org.apache.rocketmq.streams.source.RocketMQSource;
public class DataStreamSource {
@@ -114,7 +115,23 @@ public DataStream fromRocketmq(String topic, String groupName, String tags, bool
return new DataStream(this.mainPipelineBuilder, null);
}
+ public DataStream fromRocketmq(String topic, String groupName, String namesrvAddress, SchemaConfig schemaConfig) {
+ return fromRocketmq(topic, groupName, "*", namesrvAddress, null, schemaConfig);
+ }
+ public DataStream fromRocketmq(String topic, String groupName, String tags, String namesrvAddress, RPCHook rpcHook,
+ SchemaConfig schemaConfig) {
+ RocketMQSource rocketMQSource = new RocketMQSource();
+ rocketMQSource.setTopic(topic);
+ rocketMQSource.setTags(tags);
+ rocketMQSource.setGroupName(groupName);
+ rocketMQSource.setJsonData(false);
+ rocketMQSource.setNamesrvAddr(namesrvAddress);
+ rocketMQSource.setRpcHook(rpcHook);
+ rocketMQSource.setSchemaConfig(schemaConfig);
+ this.mainPipelineBuilder.setSource(rocketMQSource);
+ return new DataStream(this.mainPipelineBuilder, null);
+ }
public DataStream fromCollection(JSONObject... elements) {
CollectionSource source = new CollectionSource();
diff --git a/rocketmq-streams-commons/pom.xml b/rocketmq-streams-commons/pom.xml
index 2881878d..76e5bc46 100755
--- a/rocketmq-streams-commons/pom.xml
+++ b/rocketmq-streams-commons/pom.xml
@@ -117,6 +117,12 @@
de.ruedigermoeller
fst
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
com.esotericsoftware
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntValueKV.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntValueKV.java
index ef6f8022..77a45e14 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntValueKV.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntValueKV.java
@@ -16,22 +16,10 @@
*/
package org.apache.rocketmq.streams.common.cache.compress.impl;
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
import org.apache.rocketmq.streams.common.cache.compress.ByteArray;
import org.apache.rocketmq.streams.common.cache.compress.ByteStore;
import org.apache.rocketmq.streams.common.cache.compress.CacheKV;
import org.apache.rocketmq.streams.common.utils.NumberUtils;
-import org.junit.Assert;
/**
* 支持key是string,value是int的场景,支持size不大于10000000.只支持int,long,boolean,string类型
@@ -81,105 +69,4 @@ public IntValueKV(int capacity) {
super(capacity, true);
}
- public static void main(String[] args) throws Exception {
- IntValueKV cache = new IntValueKV(5);
- cache.put("A", 0);
- cache.put("B", 1);
- cache.put("C", 2);
- cache.put("D", 3);
- cache.put("E", 4);
- cache.put("F", 5);
- cache.put("G", 6);
-
- System.exit(0);
-
- int size = 10000000;
- int sampleSize = 1024;
- int dataSize = 3974534;
- IntValueKV compressByteMap = new IntValueKV(size);
- Map dataMap = new HashMap<>(size);
- Set whiteSet = new HashSet<>(1024);
- Map sample1Map = new HashMap<>(1024);
- Map sample2Map = new HashMap<>(1024);
- //init data
- Random random = new Random();
- while (true) {
- if (whiteSet.size() >= sampleSize) {
- break;
- }
- int seed = random.nextInt(dataSize);
- if (!whiteSet.contains(seed)) {
- whiteSet.add(seed);
- }
- }
-
- long originWriteCounter = 0;
- long compressWriteCounter = 0;
- String path = "/Users/arthur/Downloads/";
- String blackFile = "2020-11-11-14-08-32_EXPORT_CSV_16231630_392_0.csv";
- try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path + blackFile)))) {
- reader.readLine();
- String line = null;
- int counter = 0;
- while ((line = reader.readLine()) != null) {
- line = line.replaceAll("\"", "");
- String[] parts = line.split(",", 2);
- long begin = System.nanoTime();
- dataMap.put(parts[1].trim(), Integer.parseInt(parts[0]));
- originWriteCounter += (System.nanoTime() - begin);
- if (whiteSet.contains(counter++)) {
- sample1Map.put(parts[1].trim(), Integer.parseInt(parts[0]));
- }
- }
- }
- for (int i = 0; i < sampleSize * 100; i++) {
- sample2Map.put(UUID.randomUUID().toString(), -1);
- }
- System.out.println("sample1 size:\t" + sample1Map.size());
- System.out.println("sample2 size:\t" + sample2Map.size());
- //System.out.println(
- // "origin map size(computed by third party):\t" + RamUsageEstimator.humanSizeOf(dataMap) + "\tline's\t"
- // + dataMap.size());
- //
- Iterator> iterator = dataMap.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry entry = iterator.next();
- long begin = System.nanoTime();
- compressByteMap.put(entry.getKey(), entry.getValue());
- compressWriteCounter += (System.nanoTime() - begin);
- }
- //System.out.println(
- // "compressed map size(computed by third party):\t" + RamUsageEstimator.humanSizeOf(compressByteMap)
- // + "\tline's\t"
- // + compressByteMap.size);
- System.out.println("compressed map size(computed by it's self)\t" + compressByteMap.calMemory() + " MB");
- System.out.println(
- "origin write cost:\t" + originWriteCounter / 1000 + "\tcompress write cost:\t"
- + compressWriteCounter / 1000);
- //
- long originSearchCounter = 0;
- long compressCounter = 0;
- Iterator> iterator1 = sample1Map.entrySet().iterator();
- Iterator> iterator2 = sample2Map.entrySet().iterator();
- while (iterator1.hasNext() && iterator2.hasNext()) {
- Entry entry1 = iterator1.next();
- String key1 = entry1.getKey();
- Integer value1 = entry1.getValue();
- Entry entry2 = iterator2.next();
- String key2 = entry2.getKey();
- Integer value2 = entry2.getValue();
- long begin = System.nanoTime();
- Assert.assertEquals(value1, dataMap.get(key1));
- Assert.assertNotEquals(value2, dataMap.get(key2));
- originSearchCounter += (System.nanoTime() - begin);
- begin = System.nanoTime();
- Assert.assertEquals(value1, compressByteMap.get(key1));
- Assert.assertNotEquals(value2, compressByteMap.get(key2));
- compressCounter += (System.nanoTime() - begin);
- }
- System.out.println(
- "origin search cost:\t" + originSearchCounter / 1000 + "\tcompress search cost:\t"
- + compressCounter / 1000);
- }
-
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/StringValueKV.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/StringValueKV.java
index 814c8fb0..b46ec20d 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/StringValueKV.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/StringValueKV.java
@@ -17,11 +17,8 @@
package org.apache.rocketmq.streams.common.cache.compress.impl;
import java.io.UnsupportedEncodingException;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.rocketmq.streams.common.cache.compress.ByteArrayValueKV;
import org.apache.rocketmq.streams.common.cache.compress.ICacheKV;
-import org.junit.Assert;
/**
* 支持key是string,value是int的场景,支持size不大于10000000.只支持int,long,boolean,string类型 只能一次行load,不能进行更新
@@ -75,62 +72,4 @@ public int calMemory() {
return values.calMemory();
}
- public static void main(String[] args) throws InterruptedException {
- int count = 10000000;
-
- StringValueKV map = new StringValueKV(count);
- long start = System.currentTimeMillis();
- for (int i = 0; i < count; i++) {
- map.put("sdfsdfdds" + i, i + "");
- }
- System.out.println("fixed value size: " + map.getSize());
- //System.out.println("fixed value memory: " + RamUsageEstimator.humanSizeOf(map));
- System.out.println("fixed value write cost: " + (System.currentTimeMillis() - start));
-
- start = System.currentTimeMillis();
- SplitCache splitCache = new SplitCache(count);
- for (int i = 0; i < count; i++) {
- splitCache.put("sdfsdfdds" + i, i + "");
- }
- System.out.println("free value size: " + splitCache.getSize());
- // System.out.println("free value memory: " + RamUsageEstimator.humanSizeOf(splitCache));
- System.out.println("free value cost: " + (System.currentTimeMillis() - start));
-
- start = System.currentTimeMillis();
- Map originMap = new HashMap<>(count);
- for (int i = 0; i < count; i++) {
- originMap.put("sdfsdfdds" + i, i + "");
- }
- System.out.println("origin map size: " + originMap.size());
- // System.out.println("origin map memory: " + RamUsageEstimator.humanSizeOf(originMap));
- System.out.println("origin map cost: " + (System.currentTimeMillis() - start));
-
- start = System.currentTimeMillis();
- for (int i = 0; i < count; i++) {
- String v = map.get("sdfsdfdds" + i);
- Assert.assertEquals(v, i + "");
- v = map.get("asdfasdf" + i);
- Assert.assertNull(v);
- }
- System.out.println("fix value read cost: " + (System.currentTimeMillis() - start));
-
- start = System.currentTimeMillis();
- for (int i = 0; i < count; i++) {
- String v = splitCache.get("sdfsdfdds" + i);
- Assert.assertEquals(v, i + "");
- v = splitCache.get("asdfasdf" + i);
- Assert.assertNull(v);
- }
- System.out.println("free value read cost: " + (System.currentTimeMillis() - start));
-
- start = System.currentTimeMillis();
- for (int i = 0; i < count; i++) {
- String v = originMap.get("sdfsdfdds" + i);
- Assert.assertEquals(v, i + "");
- v = originMap.get("asdfasdf" + i);
- Assert.assertNull(v);
- }
- System.out.println("origin map read cost: " + (System.currentTimeMillis() - start));
- }
-
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractBatchSource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractBatchSource.java
index c50b09bc..d0647c74 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractBatchSource.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractBatchSource.java
@@ -94,7 +94,7 @@ public AbstractContext doReceiveMessage(JSONObject message) {
@Override
public JSONObject createJson(Object message) {
- if (isJsonData && JSONObject.class.isInstance(message)) {
+ if (isJsonData && message instanceof JSONObject) {
return (JSONObject) message;
}
return super.createJson(message);
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
index 2bc7e55f..a0e35353 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java
@@ -20,9 +20,7 @@
import com.alibaba.fastjson.JSONObject;
import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -254,7 +252,6 @@ public JSONObject createJson(Object message) {
if (!isJsonData) {
jsonObject = new UserDefinedMessage(message);
jsonObject.put(IMessage.DATA_KEY, message);
- jsonObject.put(IMessage.IS_NOT_JSON_MESSAGE, true);
} else {
jsonObject = Message.parseObject(message.toString());
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/IMessage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/IMessage.java
index 4b6bed8f..0972d154 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/IMessage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/IMessage.java
@@ -23,19 +23,19 @@ public interface IMessage {
/**
* 对于非json数据在处理前先转化成json结构,key=data;value=实际数据
*/
- public static final String DATA_KEY = "data";
+ String DATA_KEY = "data";
- String SHUFFLE_MESSAGE_FLAG = "_shuffle_msg";//是否是shuffle msg
+ String TYPE_KEY = "type";
/**
- * 是否为json格式
+ * 是否是shuffle msg
*/
- public static final String IS_NOT_JSON_MESSAGE = "isNotJsonMessage";
+ String SHUFFLE_MESSAGE_FLAG = "_shuffle_msg";
/**
* used when trace id lost
*/
- public static final String DEFAULT_MESSAGE_TRACE_ID = "00000000-0000-0000-0000-000000000000";
+ String DEFAULT_MESSAGE_TRACE_ID = "00000000-0000-0000-0000-000000000000";
MessageHeader getHeader();
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/Message.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/Message.java
index e11f6d3c..e656e56c 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/Message.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/Message.java
@@ -23,6 +23,9 @@
public class Message implements IMessage {
+ /**
+ * content of message, can be JsonObject or UserDefinedMessage
+ */
private JSONObject message;
private boolean isJsonMessage = true;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageOffset.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageOffset.java
index 38950f8f..73ebc806 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageOffset.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageOffset.java
@@ -108,7 +108,7 @@ public boolean greateThan(String dstOffset) {
* @return
*/
public static boolean greateThan(String oriOffset, String dstOffset, boolean isOffsetIsLong) {
- if (isOffsetIsLong == false) {
+ if (!isOffsetIsLong) {
return (oriOffset.compareTo(dstOffset) > 0);
}
if (StringUtil.isEmpty(dstOffset)) {
@@ -116,34 +116,12 @@ public static boolean greateThan(String oriOffset, String dstOffset, boolean isO
}
List dstOffsetLayers = new ArrayList<>();
- Long dstMainOffset = Long.valueOf(parseOffset(dstOffset, dstOffsetLayers));
+ long dstMainOffset = Long.parseLong(parseOffset(dstOffset, dstOffsetLayers));
List oriOffsetLayers = new ArrayList<>();
- Long oriMainOffset = Long.valueOf(parseOffset(oriOffset, oriOffsetLayers));
+ long oriMainOffset = Long.parseLong(parseOffset(oriOffset, oriOffsetLayers));
- if (oriMainOffset > dstMainOffset) {
- return true;
- } else if (oriMainOffset <= dstMainOffset) {
- return false;
- }
-
- for (int i = 0; i < oriOffsetLayers.size(); i++) {
- Long origLayerOffset = (i < oriOffsetLayers.size()) ? oriOffsetLayers.get(i) : null;
- Long destLayerOffset = (i < dstOffsetLayers.size()) ? dstOffsetLayers.get(i) : null;
- if (origLayerOffset != null && destLayerOffset != null) {
- if (origLayerOffset > destLayerOffset) {
- return true;
- } else if (origLayerOffset <= destLayerOffset) {
- return false;
- }
- continue;
- }
- if (origLayerOffset != null && destLayerOffset == null) {
- return true;
- }
- break;
- }
- return false;
+ return oriMainOffset > dstMainOffset;
}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/UserDefinedMessage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/UserDefinedMessage.java
index faeedffc..cb83154f 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/UserDefinedMessage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/UserDefinedMessage.java
@@ -21,7 +21,6 @@
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
-import java.sql.Timestamp;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
@@ -39,12 +38,22 @@
public class UserDefinedMessage extends JSONObject implements Serializable {
protected Object messageValue;
- protected Map fieldMap;//如果messageValue是pojo,这里存储字段名和field信息
+ /**
+ * 如果messageValue是pojo,这里存储字段名和field信息
+ */
+ protected Map fieldMap;
+
protected boolean isList = false;
+
protected boolean isBasic = false;
+
protected boolean isMap = false;
+
protected boolean isPojo = false;
- protected List columnNames;//schama的列名,主要针对list场景
+ /**
+ * schema的列名,主要针对list场景
+ */
+ protected List columnNames;
public UserDefinedMessage(Object messageValue) {
this(messageValue, null);
@@ -347,8 +356,10 @@ public void putAll(Map extends String, ?> m) {
@Override
public Set keySet() {
Set set = super.keySet();
- set.addAll(fieldMap.keySet());
- return set;
+ if (!set.isEmpty()) {
+ set.addAll(fieldMap.keySet());
+ }
+ return fieldMap.keySet();
}
@Override
@@ -369,30 +380,17 @@ public Collection