diff --git a/chunjun-connectors/chunjun-connector-rocketmq/pom.xml b/chunjun-connectors/chunjun-connector-rocketmq/pom.xml new file mode 100644 index 0000000000..814346869b --- /dev/null +++ b/chunjun-connectors/chunjun-connector-rocketmq/pom.xml @@ -0,0 +1,146 @@ + + + + + + chunjun-connectors + com.dtstack.chunjun + 1.12-SNAPSHOT + + 4.0.0 + + flinkx-connector-rocketmq + ChunJun : Connectors : RocketMQ + + + 8 + 8 + 4.9.2 + + + + + org.apache.rocketmq + rocketmq-client + ${rocketmq.version} + + + org.apache.rocketmq + rocketmq-acl + ${rocketmq.version} + + + commons-logging + commons-logging + + + commons-cli + commons-cli + + + + + org.apache.rocketmq + rocketmq-common + ${rocketmq.version} + + + io.netty + netty-tcnative + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.0 + + + package + + shade + + + false + + + org.slf4j:slf4j-api + log4j:log4j + ch.qos.logback:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + maven-antrun-plugin + 1.2 + + + copy-resources + + package + + run + + + + + + + + + + + + + + + + + + + + diff --git a/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/conf/RocketMQConf.java b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/conf/RocketMQConf.java new file mode 100644 index 0000000000..33b3c5877f --- /dev/null +++ b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/conf/RocketMQConf.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.rocketmq.conf; + +import com.dtstack.chunjun.conf.ChunJunCommonConf; + +import org.apache.commons.lang3.StringUtils; + +import java.io.Serializable; +import java.util.Locale; + +/** @author shitou @date 2022/5/26 * */ +public class RocketMQConf extends ChunJunCommonConf { + + private String topic; + private String nameserverAddress; + private String unitName; + private int heartbeatBrokerInterval = 30000; + // security + private String accessKey; + private String secretKey; + // for aliyun instance + private String accessChannel = "LOCAL"; + + // consumer + private String tag; + private String consumerGroup; + private StartMode mode = StartMode.EARLIEST; + private long startMessageOffset; + private long startMessageTimeStamp = -1L; + private long startTimeMs = -1L; + private long endTimeMs = Long.MAX_VALUE; + private String timeZone; + private String encoding = "UTF-8"; + private String fieldDelimiter = "\u0001"; + private int fetchSize = 32; + /** offset persistent interval for consumer* */ + private int persistConsumerOffsetInterval = 5000; // 5s + + private long partitionDiscoveryIntervalMs = 30000; // 30s + + // producer + + public String getAccessKey() { + return accessKey; + } + + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; + } + + public String getSecretKey() { + return secretKey; + } + + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + + public long getPartitionDiscoveryIntervalMs() { + return partitionDiscoveryIntervalMs; + } + + public void setPartitionDiscoveryIntervalMs(long partitionDiscoveryIntervalMs) { + this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs; + } + + public long getStartMessageTimeStamp() { + return startMessageTimeStamp; + } + + public void setStartMessageTimeStamp(long startMessageTimeStamp) { + this.startMessageTimeStamp = startMessageTimeStamp; + } + + public long getStartTimeMs() { + return startTimeMs; + } + + public void setStartTimeMs(long startTimeMs) { + this.startTimeMs = startTimeMs; + } + + public long getEndTimeMs() { + return endTimeMs; + } + + public void setEndTimeMs(long endTimeMs) { + this.endTimeMs = endTimeMs; + } + + public int getFetchSize() { + return fetchSize; + } + + public void setFetchSize(int fetchSize) { + this.fetchSize = fetchSize; + } + + public int getHeartbeatBrokerInterval() { + return heartbeatBrokerInterval; + } + + public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) { + this.heartbeatBrokerInterval = heartbeatBrokerInterval; + } + + public String getAccessChannel() { + return accessChannel; + } + + public void setAccessChannel(String accessChannel) { + this.accessChannel = accessChannel; + } + + public String getUnitName() { + return unitName; + } + + public void setUnitName(String unitName) { + this.unitName = unitName; + } + + public int getPersistConsumerOffsetInterval() { + return persistConsumerOffsetInterval; + } + + public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) { + this.persistConsumerOffsetInterval = persistConsumerOffsetInterval; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getTag() { + return tag; + } + + public void setTag(String tag) { + this.tag = tag; + } + + public String getNameserverAddress() { + return nameserverAddress; + } + + public void setNameserverAddress(String nameserverAddress) { + this.nameserverAddress = nameserverAddress; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public StartMode getMode() { + return mode; + } + + public void setMode(StartMode mode) { + this.mode = mode; + } + + public long getStartMessageOffset() { + return startMessageOffset; + } + + public void setStartMessageOffset(long startMessageOffset) { + this.startMessageOffset = startMessageOffset; + } + + public String getTimeZone() { + return timeZone; + } + + public void setTimeZone(String timeZone) { + this.timeZone = timeZone; + } + + public String getEncoding() { + return encoding; + } + + public void setEncoding(String encoding) { + this.encoding = encoding; + } + + public String getFieldDelimiter() { + return fieldDelimiter; + } + + public void setFieldDelimiter(String fieldDelimiter) { + this.fieldDelimiter = fieldDelimiter; + } + + public enum StartMode { + /** Start from the earliest offset possible. */ + EARLIEST("earliest"), + /** Start from the latest offset. */ + LATEST("latest"), + /** Start from user-supplied timestamp for each messageQueue. */ + TIMESTAMP("timestamp"), + /** Start from user-supplied offset for each messageQueue */ + OFFSET("offset"), + + UNKNOWN("unknown"); + + final String name; + + StartMode(String name) { + this.name = name; + } + + public static StartMode getFromName(String name) { + if (StringUtils.isBlank(name)) { + throw new IllegalArgumentException("mode name is blank."); + } + switch (name.toLowerCase(Locale.ENGLISH)) { + case "earliest": + return EARLIEST; + case "latest": + return LATEST; + case "timestamp": + return TIMESTAMP; + case "offset": + return OFFSET; + default: + return UNKNOWN; + } + } + } + + public static class Builder implements Serializable { + private static final long serialVersionUID = 1L; + private final RocketMQConf conf; + + public Builder() { + this.conf = new RocketMQConf(); + } + + public Builder setTopic(String topic) { + this.conf.setTopic(topic); + return this; + } + + public Builder setNameserverAddress(String nameserverAddress) { + this.conf.setNameserverAddress(nameserverAddress); + return this; + } + + public Builder setHeartbeatBrokerInterval(int heartbeatBrokerInterval) { + this.conf.setHeartbeatBrokerInterval(heartbeatBrokerInterval); + return this; + } + + public Builder setAccessChannel(String accessChannel) { + this.conf.setAccessChannel(accessChannel); + return this; + } + + public Builder setUnitName(String unitName) { + this.conf.setUnitName(unitName); + return this; + } + + public Builder setTag(String tag) { + this.conf.setTag(tag); + return this; + } + + public Builder setConsumerGroup(String consumerGroup) { + this.conf.setConsumerGroup(consumerGroup); + return this; + } + + public Builder setMode(StartMode mode) { + this.conf.setMode(mode); + return this; + } + + public Builder setStartMessageOffset(long startMessageOffset) { + this.conf.setStartMessageOffset(startMessageOffset); + return this; + } + + public Builder setStartTimeMs(long startTimeMs) { + this.conf.setStartTimeMs(startTimeMs); + return this; + } + + public Builder setEndTimeMs(long endTimeMs) { + this.conf.setEndTimeMs(endTimeMs); + return this; + } + + public Builder setTimeZone(String timeZone) { + this.conf.setTimeZone(timeZone); + return this; + } + + public Builder setEncoding(String encoding) { + this.conf.setEncoding(encoding); + return this; + } + + public Builder setFieldDelimiter(String fieldDelimiter) { + this.conf.setFieldDelimiter(fieldDelimiter); + return this; + } + + public Builder setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) { + this.conf.setPersistConsumerOffsetInterval(persistConsumerOffsetInterval); + return this; + } + + public Builder setStartMessageTimeStamp(long startMessageTimeStamp) { + this.conf.setStartMessageTimeStamp(startMessageTimeStamp); + return this; + } + + public Builder setFetchSize(int fetchSize) { + this.conf.setFetchSize(fetchSize); + return this; + } + + public Builder setPartitionDiscoveryIntervalMs(long partitionDiscoveryIntervalMs) { + this.conf.setPartitionDiscoveryIntervalMs(partitionDiscoveryIntervalMs); + return this; + } + + public Builder setAccessKey(String accessKey) { + this.conf.setAccessKey(accessKey); + return this; + } + + public Builder setSecretKey(String secretKey) { + this.conf.setSecretKey(secretKey); + return this; + } + + public RocketMQConf build() { + return this.conf; + } + } +} diff --git a/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/converter/RocketMQRowConverter.java b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/converter/RocketMQRowConverter.java new file mode 100644 index 0000000000..39923e634f --- /dev/null +++ b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/converter/RocketMQRowConverter.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.rocketmq.converter; + +import com.dtstack.chunjun.converter.AbstractRowConverter; +import com.dtstack.chunjun.converter.IDeserializationConverter; +import com.dtstack.chunjun.throwable.UnsupportedTypeException; +import com.dtstack.chunjun.util.JsonUtil; + +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.rocketmq.common.message.Message; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Map; +import java.util.Objects; + +/** @author shitou @date 2022/5/27 * */ +public class RocketMQRowConverter + extends AbstractRowConverter { + + private final String encoding; + private final String[] filedNames; + + public RocketMQRowConverter(RowType rowType, String encoding, String[] filedNames) { + super(rowType); + for (int i = 0; i < rowType.getFieldCount(); i++) { + toInternalConverters.add( + wrapIntoNullableInternalConverter( + createInternalConverter(rowType.getTypeAt(i)))); + toExternalConverters.add( + wrapIntoNullableExternalConverter( + createExternalConverter(fieldTypes[i]), fieldTypes[i])); + } + this.encoding = encoding; + this.filedNames = filedNames; + } + + @Override + public RowData toInternal(byte[] input) throws Exception { + String line = new String(input, encoding); + Map dataMap = JsonUtil.toObject(line, JsonUtil.MAP_TYPE_REFERENCE); + GenericRowData rowData = new GenericRowData(fieldTypes.length); + for (int i = 0; i < filedNames.length; i++) { + if (Objects.nonNull(dataMap.get(filedNames[i]))) { + Object value = toInternalConverters.get(i).deserialize(dataMap.get(filedNames[i])); + rowData.setField(i, value); + } else { + rowData.setField(i, null); + } + } + return rowData; + } + + @Override + public Message toExternal(RowData rowData, Message output) throws Exception { + return null; + } + + @Override + protected IDeserializationConverter wrapIntoNullableInternalConverter( + IDeserializationConverter IDeserializationConverter) { + return super.wrapIntoNullableInternalConverter(IDeserializationConverter); + } + + @Override + protected IDeserializationConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + return val -> Boolean.valueOf(String.valueOf(val)); + case CHAR: + case VARCHAR: + return val -> StringData.fromString(String.valueOf(val)); + case BINARY: + case VARBINARY: + return val -> String.valueOf(val).getBytes(); + case TINYINT: + return val -> ((Integer) val).byteValue(); + case SMALLINT: + return val -> ((Integer) val).shortValue(); + case INTEGER: + return val -> Integer.parseInt(String.valueOf(val)); + case BIGINT: + return val -> Long.parseLong(String.valueOf(val)); + case FLOAT: + return val -> Float.parseFloat(String.valueOf(val)); + case DOUBLE: + return val -> Double.parseDouble(String.valueOf(val)); + case DECIMAL: + DecimalType decimalType = (DecimalType) type; + final int precision = decimalType.getPrecision(); + final int scale = decimalType.getScale(); + return val -> { + BigDecimal decimal = new BigDecimal(String.valueOf(val)); + return DecimalData.fromBigDecimal(decimal, precision, scale); + }; + case DATE: + return val -> + (int) ((Date.valueOf(String.valueOf(val))).toLocalDate().toEpochDay()); + case TIME_WITHOUT_TIME_ZONE: + return val -> + (int) + ((Time.valueOf(String.valueOf(val))).toLocalTime().toNanoOfDay() + / 1_000_000L); + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return val -> TimestampData.fromTimestamp(Timestamp.valueOf(String.valueOf(val))); + default: + throw new UnsupportedTypeException(type); + } + } +} diff --git a/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/RocketMQScanTableSource.java b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/RocketMQScanTableSource.java new file mode 100644 index 0000000000..b151ff5ba8 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/RocketMQScanTableSource.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.rocketmq.source; + +import com.dtstack.chunjun.connector.rocketmq.conf.RocketMQConf; +import com.dtstack.chunjun.connector.rocketmq.converter.RocketMQRowConverter; +import com.dtstack.chunjun.connector.rocketmq.source.deserialization.KeyValueDeserializationSchema; +import com.dtstack.chunjun.connector.rocketmq.source.deserialization.RowKeyValueDeserializationSchema; +import com.dtstack.chunjun.converter.AbstractRowConverter; +import com.dtstack.chunjun.table.connector.source.ParallelSourceFunctionProvider; + +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** Defines the scan table source of RocketMQ. */ +public class RocketMQScanTableSource implements ScanTableSource, SupportsReadingMetadata { + + private final DescriptorProperties properties; + private final TableSchema schema; + private final RocketMQConf rocketMQConf; + private AbstractRowConverter converter; + private List metadataKeys; + private Integer parallelism; + + public RocketMQScanTableSource( + DescriptorProperties properties, + TableSchema schema, + RocketMQConf rocketMQConf, + Integer parallelism) { + this.properties = properties; + this.schema = schema; + this.rocketMQConf = rocketMQConf; + this.parallelism = parallelism; + this.metadataKeys = Collections.emptyList(); + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + + final RowType rowType = (RowType) schema.toRowDataType().getLogicalType(); + + String[] fieldNames = schema.getFieldNames(); + converter = new RocketMQRowConverter(rowType, rocketMQConf.getEncoding(), fieldNames); + + return ParallelSourceFunctionProvider.of( + new com.dtstack.chunjun.connector.rocketmq.source.RocketMQSourceFunction<>( + createKeyValueDeserializationSchema(), rocketMQConf), + isBounded(), + parallelism); + } + + @Override + public Map listReadableMetadata() { + return Collections.emptyMap(); + } + + @Override + public void applyReadableMetadata(List metadataKeys, DataType producedDataType) { + this.metadataKeys = metadataKeys; + } + + @Override + public DynamicTableSource copy() { + RocketMQScanTableSource tableSource = + new RocketMQScanTableSource(properties, schema, rocketMQConf, parallelism); + tableSource.metadataKeys = metadataKeys; + return tableSource; + } + + @Override + public String asSummaryString() { + return RocketMQScanTableSource.class.getName(); + } + + private boolean isBounded() { + return rocketMQConf.getEndTimeMs() != Long.MAX_VALUE; + } + + private KeyValueDeserializationSchema createKeyValueDeserializationSchema() { + return new RowKeyValueDeserializationSchema.Builder() + .setProperties(properties.asMap()) + .setTableSchema(schema) + .setConverter(converter) + .build(); + } +} diff --git a/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/RocketMQSourceFunction.java b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/RocketMQSourceFunction.java new file mode 100644 index 0000000000..1339d80005 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/RocketMQSourceFunction.java @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.rocketmq.source; + +import com.dtstack.chunjun.connector.rocketmq.conf.RocketMQConf; +import com.dtstack.chunjun.connector.rocketmq.source.deserialization.KeyValueDeserializationSchema; +import com.dtstack.chunjun.connector.rocketmq.source.deserialization.RowKeyValueDeserializationSchema; +import com.dtstack.chunjun.connector.rocketmq.source.watermark.WaterMarkForAll; +import com.dtstack.chunjun.connector.rocketmq.source.watermark.WaterMarkPerQueue; +import com.dtstack.chunjun.connector.rocketmq.utils.RetryUtil; +import com.dtstack.chunjun.connector.rocketmq.utils.RocketMQUtils; +import com.dtstack.chunjun.restore.FormatState; +import com.dtstack.chunjun.util.JsonUtil; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.curator4.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.apache.commons.collections.map.LinkedMap; +import org.apache.commons.lang.Validate; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.management.ManagementFactory; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +/** + * The RocketMQSource is based on RocketMQ pull consumer mode, and provides exactly once reliability + * guarantees when checkpoints are enabled. Otherwise, the source doesn't provide any reliability + * guarantees. + */ +public class RocketMQSourceFunction extends RichParallelSourceFunction + implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + public static final int DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = 100; + + private static final Logger log = LoggerFactory.getLogger(RocketMQSourceFunction.class); + private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states"; + private com.dtstack.chunjun.connector.rocketmq.source.RunningChecker runningChecker; + private transient DefaultMQPullConsumer consumer; + private KeyValueDeserializationSchema schema; + private transient ListState unionOffsetStates; + private Map formatStateMap; + private Map offsetTable; + private Map restoredOffsets; + private List messageQueues; + private ExecutorService executor; + + // watermark in source + private WaterMarkPerQueue waterMarkPerQueue; + private WaterMarkForAll waterMarkForAll; + + private ScheduledExecutorService timer; + /** Data for pending but uncommitted offsets. */ + private LinkedMap pendingOffsetsToCommit; + + private RocketMQConf rocketMQConf; + private String topic; + private String group; + private transient volatile boolean restored; + private transient boolean enableCheckpoint; + private volatile Object checkPointLock; + private transient volatile Exception consumerException; + + public RocketMQSourceFunction( + KeyValueDeserializationSchema schema, RocketMQConf rocketMQConf) { + this.schema = schema; + this.rocketMQConf = rocketMQConf; + } + + @Override + public void open(Configuration parameters) throws Exception { + + log.debug("source open...."); + int indexOfThisSubTask = getRuntimeContext().getIndexOfThisSubtask(); + + // init metrics + if (schema instanceof RowKeyValueDeserializationSchema) { + ((RowKeyValueDeserializationSchema) schema).setRuntimeContext(getRuntimeContext()); + if (formatStateMap != null) { + // if restart has changed the parallelism ? + ((RowKeyValueDeserializationSchema) schema) + .setFormatState(formatStateMap.get(indexOfThisSubTask)); + } + schema.init(); + } + + this.topic = rocketMQConf.getTopic(); + this.group = rocketMQConf.getConsumerGroup(); + + Validate.notEmpty(topic, "Consumer topic can not be empty"); + Validate.notEmpty(group, "Consumer group can not be empty"); + + this.enableCheckpoint = + ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled(); + + if (offsetTable == null) { + offsetTable = new ConcurrentHashMap<>(); + } + if (restoredOffsets == null) { + restoredOffsets = new ConcurrentHashMap<>(); + } + + // use restoredOffsets to init offset table. + initOffsetTableFromRestoredOffsets(); + + if (pendingOffsetsToCommit == null) { + pendingOffsetsToCommit = new LinkedMap(); + } + if (checkPointLock == null) { + checkPointLock = new ReentrantLock(); + } + if (waterMarkPerQueue == null) { + waterMarkPerQueue = new WaterMarkPerQueue(5000); + } + if (waterMarkForAll == null) { + waterMarkForAll = new WaterMarkForAll(5000); + } + if (timer == null) { + timer = Executors.newSingleThreadScheduledExecutor(); + } + + runningChecker = new com.dtstack.chunjun.connector.rocketmq.source.RunningChecker(); + runningChecker.setRunning(true); + + final ThreadFactory threadFactory = + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("rmq-pull-thread-%d") + .build(); + executor = Executors.newCachedThreadPool(threadFactory); + + consumer = + new DefaultMQPullConsumer( + group, + RocketMQUtils.buildAclRPCHook( + rocketMQConf.getAccessKey(), rocketMQConf.getSecretKey())); + RocketMQUtils.buildConsumer(rocketMQConf, consumer); + + // set unique instance name, avoid exception: + // https://help.aliyun.com/document_detail/29646.html + String runtimeName = ManagementFactory.getRuntimeMXBean().getName(); + String instanceName = + RocketMQUtils.getInstanceName( + runtimeName, + topic, + group, + String.valueOf(indexOfThisSubTask), + String.valueOf(System.nanoTime())); + consumer.setInstanceName(instanceName); + consumer.start(); + + log.info( + "[{}] open successfully, \n[{}]: \n{} ", + this.getClass().getSimpleName(), + rocketMQConf.getClass().getSimpleName(), + JsonUtil.toPrintJson(rocketMQConf)); + } + + @Override + public void run(SourceContext context) throws Exception { + String tag = rocketMQConf.getTag(); + int pullBatchSize = rocketMQConf.getFetchSize(); + + final RuntimeContext ctx = getRuntimeContext(); + // The lock that guarantees that record emission and state updates are atomic, + // from the view of taking a checkpoint. + int taskNumber = ctx.getNumberOfParallelSubtasks(); + int taskIndex = ctx.getIndexOfThisSubtask(); + log.info("Source run, NumberOfTotalTask={}, IndexOfThisSubTask={}", taskNumber, taskIndex); + + timer.scheduleAtFixedRate( + () -> { + // context.emitWatermark(waterMarkPerQueue.getCurrentWatermark()); + context.emitWatermark(waterMarkForAll.getCurrentWatermark()); + }, + 5, + 5, + TimeUnit.SECONDS); + + Collection totalQueues = consumer.fetchSubscribeMessageQueues(topic); + messageQueues = + RocketMQUtils.allocate(totalQueues, taskNumber, ctx.getIndexOfThisSubtask()); + for (MessageQueue mq : messageQueues) { + this.executor.execute( + () -> { + try { + RetryUtil.call( + () -> { + while (runningChecker.isRunning()) { + long offset = getMessageQueueOffset(mq); + PullResult pullResult = + consumer.pullBlockIfNotFound( + mq, tag, offset, pullBatchSize); + + boolean found = false; + switch (pullResult.getPullStatus()) { + case FOUND: + List messages = + pullResult.getMsgFoundList(); + for (MessageExt msg : messages) { + byte[] key = + msg.getKeys() != null + ? msg.getKeys() + .getBytes( + StandardCharsets + .UTF_8) + : null; + byte[] value = msg.getBody(); + OUT data = + schema.deserializeKeyAndValue( + key, value); + + // output and state update are atomic + synchronized (checkPointLock) { + log.debug( + msg.getMsgId() + + "_" + + msg.getBrokerName() + + " " + + msg.getQueueId() + + " " + + msg.getQueueOffset()); + context.collectWithTimestamp( + data, msg.getBornTimestamp()); + + // update max eventTime per queue + // waterMarkPerQueue.extractTimestamp(mq, msg.getBornTimestamp()); + waterMarkForAll.extractTimestamp( + msg.getBornTimestamp()); + // tpsMetric.markEvent(); + } + } + found = true; + break; + case NO_MATCHED_MSG: + log.debug( + "No matched message after offset {} for queue {}", + offset, + mq); + break; + case NO_NEW_MSG: + log.debug( + "No new message after offset {} for queue {}", + offset, + mq); + break; + case OFFSET_ILLEGAL: + log.warn( + "Offset {} is illegal for queue {}", + offset, + mq); + break; + default: + break; + } + + synchronized (checkPointLock) { + updateMessageQueueOffset( + mq, pullResult.getNextBeginOffset()); + } + + if (!found) { + RetryUtil.waitForMs( + DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND); + } + } + return true; + }, + "Read exception, start retry..."); + } catch (Exception e) { + // 处理子线程异常 + consumerException = e; + } + }); + } + + awaitTermination(); + } + + private void awaitTermination() throws Exception { + while (runningChecker.isRunning()) { + Thread.sleep(50); + checkConsumerException(); + } + } + + private void checkConsumerException() throws Exception { + if (consumerException != null) { + throw consumerException; + } + } + + private long getMessageQueueOffset(MessageQueue mq) throws MQClientException { + Long offset = offsetTable.get(mq); + // restoredOffsets(unionOffsetStates) is the restored global union state; + // should only snapshot mqs that actually belong to us + if (offset == null) { + // fetchConsumeOffset from broker + offset = consumer.fetchConsumeOffset(mq, false); + if (!restored || offset < 0) { + RocketMQConf.StartMode mode = rocketMQConf.getMode(); + switch (mode) { + case EARLIEST: + offset = consumer.minOffset(mq); + break; + case LATEST: + offset = consumer.maxOffset(mq); + break; + case TIMESTAMP: + offset = + consumer.searchOffset( + mq, + rocketMQConf.getStartMessageTimeStamp() < 0 + ? (rocketMQConf.getStartTimeMs() < 0 + ? System.currentTimeMillis() + : rocketMQConf.getStartTimeMs()) + : rocketMQConf.getStartMessageTimeStamp()); + break; + case OFFSET: + offset = rocketMQConf.getStartMessageOffset(); + break; + default: + throw new IllegalArgumentException( + "Unknown value for consumer start mode."); + } + } + } + offsetTable.put(mq, offset); + return offsetTable.get(mq); + } + + private void updateMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException { + offsetTable.put(mq, offset); + if (!enableCheckpoint) { + consumer.updateConsumeOffset(mq, offset); + } + } + + @Override + public void cancel() { + log.debug("cancel ..."); + runningChecker.setRunning(false); + + if (timer != null) { + timer.shutdown(); + timer = null; + } + + if (executor != null) { + executor.shutdown(); + executor = null; + } + + if (consumer != null) { + consumer.shutdown(); + consumer = null; + } + + if (offsetTable != null) { + offsetTable.clear(); + offsetTable = null; + } + if (restoredOffsets != null) { + restoredOffsets.clear(); + restoredOffsets = null; + } + if (pendingOffsetsToCommit != null) { + pendingOffsetsToCommit.clear(); + pendingOffsetsToCommit = null; + } + + if (schema != null) { + schema.close(); + } + } + + @Override + public void close() throws Exception { + log.debug("close ..."); + // pretty much the same logic as cancelling + try { + cancel(); + } finally { + super.close(); + } + } + + public void initOffsetTableFromRestoredOffsets() { + Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null"); + restoredOffsets.forEach( + (mq, offset) -> { + if (!offsetTable.containsKey(mq) || offsetTable.get(mq) < offset) { + offsetTable.put(mq, offset); + } + }); + log.info("init offset table {} from restoredOffsets successful.", offsetTable); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + // called when a snapshot for a checkpoint is requested + log.info("Snapshotting state {} ...", context.getCheckpointId()); + if (!runningChecker.isRunning()) { + log.info("snapshotState() called on closed source; returning null."); + return; + } + + // Discovery topic Route change when snapshot + RetryUtil.call( + () -> { + Collection totalQueues = + consumer.fetchSubscribeMessageQueues(topic); + int taskNumber = getRuntimeContext().getNumberOfParallelSubtasks(); + int taskIndex = getRuntimeContext().getIndexOfThisSubtask(); + List newQueues = + RocketMQUtils.allocate(totalQueues, taskNumber, taskIndex); + Collections.sort(newQueues); + log.debug(taskIndex + " Topic route is same."); + if (!messageQueues.equals(newQueues)) { + throw new RuntimeException(); + } + return true; + }, + "RuntimeException due to topic route changed."); + + FormatState formatState; + formatState = ((RowKeyValueDeserializationSchema) schema).getFormatState(); + + unionOffsetStates.clear(); + List> messageQueuesOffset = new LinkedList<>(); + Map currentOffsets = new HashMap<>(offsetTable.size()); + for (Map.Entry entry : offsetTable.entrySet()) { + messageQueuesOffset.add(Tuple2.of(entry.getKey(), entry.getValue())); + currentOffsets.put(entry.getKey(), entry.getValue()); + } + formatState.setState(messageQueuesOffset); + unionOffsetStates.add(formatState); + pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets); + log.info( + "Snapshotted state: {}, checkpoint id: {}, timestamp: {}", + formatState, + context.getCheckpointId(), + context.getCheckpointTimestamp()); + } + + /** + * called every time the user-defined function is initialized, be that when the function is + * first initialized or be that when the function is actually recovering from an earlier + * checkpoint. Given this, initializeState() is not only the place where different types of + * state are initialized, but also where state recovery logic is included. + */ + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + log.info("initialize State ..."); + + this.unionOffsetStates = + context.getOperatorStateStore() + .getUnionListState( + new ListStateDescriptor<>( + OFFSETS_STATE_NAME, + TypeInformation.of(new TypeHint() {}))); + this.restored = context.isRestored(); + + if (restored) { + if (restoredOffsets == null) { + restoredOffsets = new ConcurrentHashMap<>(); + } + + if (formatStateMap == null) { + formatStateMap = new HashMap<>(16); + } + for (FormatState state : unionOffsetStates.get()) { + List> messageQueues = + (List>) state.getState(); + messageQueues.forEach( + mqOffsets -> { + if (!restoredOffsets.containsKey(mqOffsets.f0) + || restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) { + restoredOffsets.put(mqOffsets.f0, mqOffsets.f1); + } + }); + formatStateMap.put(state.getNumOfSubTask(), state); + } + log.info( + "Setting restore state in the consumer. Using the following offsets: {}", + restoredOffsets); + } else { + log.info("No restore state for the consumer."); + } + } + + @Override + public TypeInformation getProducedType() { + return schema.getProducedType(); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + // callback when checkpoint complete + if (!runningChecker.isRunning()) { + log.info("notifyCheckpointComplete() called on closed source; returning null."); + return; + } + + final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId); + if (posInMap == -1) { + log.warn("Received confirmation for unknown checkpoint id {}", checkpointId); + return; + } + + Map offsets = + (Map) pendingOffsetsToCommit.remove(posInMap); + + // remove older checkpoints in map + for (int i = 0; i < posInMap; i++) { + pendingOffsetsToCommit.remove(0); + } + + if (offsets == null || offsets.size() == 0) { + log.debug("Checkpoint state was empty."); + return; + } + + for (Map.Entry entry : offsets.entrySet()) { + consumer.updateConsumeOffset(entry.getKey(), entry.getValue()); + } + } +} diff --git a/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/RunningChecker.java b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/RunningChecker.java new file mode 100644 index 0000000000..5c247eacae --- /dev/null +++ b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/RunningChecker.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.rocketmq.source; + +import java.io.Serializable; + +public class RunningChecker implements Serializable { + private volatile boolean isRunning = false; + + public boolean isRunning() { + return isRunning; + } + + public void setRunning(boolean running) { + isRunning = running; + } +} diff --git a/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/deserialization/KeyValueDeserializationSchema.java b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/deserialization/KeyValueDeserializationSchema.java new file mode 100644 index 0000000000..4506cee9f7 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/deserialization/KeyValueDeserializationSchema.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.rocketmq.source.deserialization; + +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; + +import java.io.Serializable; + +public interface KeyValueDeserializationSchema extends ResultTypeQueryable, Serializable { + T deserializeKeyAndValue(byte[] key, byte[] value); + + void init(); + + void close(); +} diff --git a/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/deserialization/RowKeyValueDeserializationSchema.java b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/deserialization/RowKeyValueDeserializationSchema.java new file mode 100644 index 0000000000..cb1fb9a14e --- /dev/null +++ b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/deserialization/RowKeyValueDeserializationSchema.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.rocketmq.source.deserialization; + +import com.dtstack.chunjun.constants.Metrics; +import com.dtstack.chunjun.converter.AbstractRowConverter; +import com.dtstack.chunjun.dirty.DirtyConf; +import com.dtstack.chunjun.dirty.manager.DirtyManager; +import com.dtstack.chunjun.dirty.utils.DirtyConfUtil; +import com.dtstack.chunjun.metrics.AccumulatorCollector; +import com.dtstack.chunjun.metrics.BaseMetric; +import com.dtstack.chunjun.restore.FormatState; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Map; + +/** + * * The row based implementation of {@link KeyValueDeserializationSchema} for the deserialization + * of message key and value.. + */ +public class RowKeyValueDeserializationSchema implements KeyValueDeserializationSchema { + + private static final long serialVersionUID = -1L; + private static final Logger LOGGER = + LoggerFactory.getLogger(RowKeyValueDeserializationSchema.class); + + private transient TableSchema tableSchema; + // private final int columnSize; + + protected FormatState formatState; + + protected transient BaseMetric inputMetric; + protected long startTime; + protected LongCounter numReadCounter; + protected LongCounter bytesReadCounter; + protected LongCounter durationCounter; + // dirty data + protected DirtyManager dirtyManager; + // gateway + protected AccumulatorCollector accumulatorCollector; + + private transient RuntimeContext runtimeContext; + private AbstractRowConverter converter; + + public RowKeyValueDeserializationSchema( + TableSchema tableSchema, + Map properties, + AbstractRowConverter converter) { + this.tableSchema = tableSchema; + // this.columnSize = tableSchema.getFieldNames().length; + + DescriptorProperties descriptorProperties = new DescriptorProperties(); + descriptorProperties.putProperties(properties); + this.converter = converter; + } + + @Override + public RowData deserializeKeyAndValue(byte[] key, byte[] value) { + beforeDeserialize(value); + return deserializeValue(value); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public void init() { + initDirtyManager(); + initAccumulatorCollector(); + // initRowSizeCalculator(); + initStatisticsAccumulator(); + initRestoreInfo(); + // openInputFormat(); + startTime = System.currentTimeMillis(); + } + + @Override + public void close() { + if (durationCounter != null) { + updateDuration(); + } + + if (accumulatorCollector != null) { + accumulatorCollector.close(); + } + + if (inputMetric != null) { + inputMetric.waitForReportMetrics(); + } + } + + protected void beforeDeserialize(byte[] value) { + if (value != null) { + updateDuration(); + if (numReadCounter != null) { + numReadCounter.add(1L); + } + if (bytesReadCounter != null) { + bytesReadCounter.add(value.length); + } + } + } + + /** 更新任务执行时间指标 */ + private void updateDuration() { + if (durationCounter != null) { + durationCounter.resetLocal(); + durationCounter.add(System.currentTimeMillis() - startTime); + } + } + + private void initDirtyManager() { + StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); + + ExecutionConfig.GlobalJobParameters params = + context.getExecutionConfig().getGlobalJobParameters(); + DirtyConf dc = DirtyConfUtil.parseFromMap(params.toMap()); + this.dirtyManager = new DirtyManager(dc, context); + } + + /** 初始化累加器收集器 */ + private void initAccumulatorCollector() { + String lastWriteLocation = + String.format( + "%s_%s", + Metrics.LAST_WRITE_LOCATION_PREFIX, runtimeContext.getIndexOfThisSubtask()); + String lastWriteNum = + String.format( + "%s_%s", + Metrics.LAST_WRITE_NUM__PREFIX, runtimeContext.getIndexOfThisSubtask()); + + accumulatorCollector = + new AccumulatorCollector( + (StreamingRuntimeContext) runtimeContext, + Arrays.asList( + Metrics.NUM_READS, + Metrics.READ_BYTES, + Metrics.READ_DURATION, + Metrics.WRITE_BYTES, + Metrics.NUM_WRITES, + lastWriteLocation, + lastWriteNum)); + accumulatorCollector.start(); + } + + /** 初始化累加器指标 */ + private void initStatisticsAccumulator() { + numReadCounter = getRuntimeContext().getLongCounter(Metrics.NUM_READS); + bytesReadCounter = getRuntimeContext().getLongCounter(Metrics.READ_BYTES); + durationCounter = getRuntimeContext().getLongCounter(Metrics.READ_DURATION); + + inputMetric = new BaseMetric(getRuntimeContext()); + inputMetric.addMetric(Metrics.NUM_READS, numReadCounter, true); + inputMetric.addMetric(Metrics.READ_BYTES, bytesReadCounter, true); + inputMetric.addMetric(Metrics.READ_DURATION, durationCounter); + inputMetric.addDirtyMetric(Metrics.DIRTY_DATA_COUNT, this.dirtyManager.getConsumedMetric()); + inputMetric.addDirtyMetric( + Metrics.DIRTY_DATA_COLLECT_FAILED_COUNT, + this.dirtyManager.getFailedConsumedMetric()); + } + + /** 从checkpoint状态缓存map中恢复上次任务的指标信息 */ + private void initRestoreInfo() { + if (formatState == null) { + formatState = new FormatState(runtimeContext.getIndexOfThisSubtask(), null); + } else { + numReadCounter.add(formatState.getMetricValue(Metrics.NUM_READS)); + bytesReadCounter.add(formatState.getMetricValue(Metrics.READ_BYTES)); + durationCounter.add(formatState.getMetricValue(Metrics.READ_DURATION)); + } + } + + public RuntimeContext getRuntimeContext() { + return runtimeContext; + } + + public void setRuntimeContext(RuntimeContext runtimeContext) { + this.runtimeContext = runtimeContext; + } + + public FormatState getFormatState() { + if (formatState != null && numReadCounter != null && inputMetric != null) { + formatState.setMetric(inputMetric.getMetricCounters()); + } + return formatState; + } + + public void setFormatState(FormatState formatState) { + this.formatState = formatState; + } + + // -------------------------------------------------------------------------------------------- + + @Override + public TypeInformation getProducedType() { + return InternalTypeInfo.of((RowType) tableSchema.toRowDataType().getLogicalType()); + } + + private RowData deserializeValue(byte[] value) { + try { + return converter.toInternal(value); + } catch (Exception e) { + dirtyManager.collect(value, e, null); + } + return null; + } + + /** Builder of {@link RowKeyValueDeserializationSchema}. */ + public static class Builder { + + private TableSchema schema; + private Map properties; + private AbstractRowConverter converter; + + public Builder() {} + + public Builder setConverter(AbstractRowConverter converter) { + this.converter = converter; + return this; + } + + public Builder setTableSchema(TableSchema tableSchema) { + this.schema = tableSchema; + return this; + } + + public Builder setProperties(Map properties) { + this.properties = properties; + if (null == properties) { + return this; + } + Configuration configuration = new Configuration(); + for (String key : properties.keySet()) { + configuration.setString(key, properties.get(key)); + } + return this; + } + + public RowKeyValueDeserializationSchema build() { + return new RowKeyValueDeserializationSchema(schema, properties, converter); + } + } +} diff --git a/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/watermark/WaterMarkForAll.java b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/watermark/WaterMarkForAll.java new file mode 100644 index 0000000000..c12e1adb73 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/watermark/WaterMarkForAll.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.rocketmq.source.watermark; + +import org.apache.flink.streaming.api.watermark.Watermark; + +public class WaterMarkForAll { + + private long maxOutOfOrderness = 5000L; // 5 seconds + + private long maxTimestamp = 0L; + + public WaterMarkForAll() {} + + public WaterMarkForAll(long maxOutOfOrderness) { + this.maxOutOfOrderness = maxOutOfOrderness; + } + + public void extractTimestamp(long timestamp) { + maxTimestamp = Math.max(timestamp, maxTimestamp); + } + + public Watermark getCurrentWatermark() { + return new Watermark(maxTimestamp - maxOutOfOrderness); + } +} diff --git a/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/watermark/WaterMarkPerQueue.java b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/watermark/WaterMarkPerQueue.java new file mode 100644 index 0000000000..d9e8b77112 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/source/watermark/WaterMarkPerQueue.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.rocketmq.source.watermark; + +import org.apache.flink.streaming.api.watermark.Watermark; + +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class WaterMarkPerQueue { + + private ConcurrentMap maxEventTimeTable; + + private long maxOutOfOrderness = 5000L; // 5 seconds + + public WaterMarkPerQueue() {} + + public WaterMarkPerQueue(long maxOutOfOrderness) { + this.maxOutOfOrderness = maxOutOfOrderness; + maxEventTimeTable = new ConcurrentHashMap<>(); + } + + public void extractTimestamp(MessageQueue mq, long timestamp) { + long maxEventTime = maxEventTimeTable.getOrDefault(mq, maxOutOfOrderness); + maxEventTimeTable.put(mq, Math.max(maxEventTime, timestamp)); + } + + public Watermark getCurrentWatermark() { + // return the watermark as current highest timestamp minus the out-of-orderness bound + long minTimestamp = maxOutOfOrderness; + for (Map.Entry entry : maxEventTimeTable.entrySet()) { + minTimestamp = Math.min(minTimestamp, entry.getValue()); + } + return new Watermark(minTimestamp - maxOutOfOrderness); + } + + @Override + public String toString() { + return "WaterMarkPerQueue{" + + "maxEventTimeTable=" + + maxEventTimeTable + + ", maxOutOfOrderness=" + + maxOutOfOrderness + + '}'; + } +} diff --git a/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/table/RocketMQDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/table/RocketMQDynamicTableFactory.java new file mode 100644 index 0000000000..6ef602bd73 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/table/RocketMQDynamicTableFactory.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.rocketmq.table; + +import com.dtstack.chunjun.connector.rocketmq.conf.RocketMQConf; +import com.dtstack.chunjun.connector.rocketmq.source.RocketMQScanTableSource; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.utils.TableSchemaUtils; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import org.apache.commons.lang3.time.FastDateFormat; + +import java.text.ParseException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.stream.Collectors; + +import static com.dtstack.chunjun.connector.rocketmq.table.RocketMQOptions.CONSUMER_GROUP; +import static com.dtstack.chunjun.connector.rocketmq.table.RocketMQOptions.NAME_SERVER_ADDRESS; +import static com.dtstack.chunjun.connector.rocketmq.table.RocketMQOptions.OPTIONAL_ACCESS_CHANNEL; +import static com.dtstack.chunjun.connector.rocketmq.table.RocketMQOptions.OPTIONAL_ACCESS_KEY; +import static com.dtstack.chunjun.connector.rocketmq.table.RocketMQOptions.OPTIONAL_CONSUMER_BATCH_SIZE; +import static com.dtstack.chunjun.connector.rocketmq.table.RocketMQOptions.OPTIONAL_ENCODING; +import static com.dtstack.chunjun.connector.rocketmq.table.RocketMQOptions.OPTIONAL_END_TIME; +import static com.dtstack.chunjun.connector.rocketmq.table.RocketMQOptions.OPTIONAL_HEART_BEAT_BROKER_INTERVAL; +import static com.dtstack.chunjun.connector.rocketmq.table.RocketMQOptions.OPTIONAL_PERSIST_CONSUMER_INTERVAL; +import static com.dtstack.chunjun.connector.rocketmq.table.RocketMQOptions.OPTIONAL_SECRET_KEY; +import static com.dtstack.chunjun.connector.rocketmq.table.RocketMQOptions.OPTIONAL_START_MESSAGE_OFFSET; +import static com.dtstack.chunjun.connector.rocketmq.table.RocketMQOptions.OPTIONAL_START_MESSAGE_TIMESTAMP; +import static com.dtstack.chunjun.connector.rocketmq.table.RocketMQOptions.OPTIONAL_START_OFFSET_MODE; +import static com.dtstack.chunjun.connector.rocketmq.table.RocketMQOptions.OPTIONAL_START_TIME; +import static com.dtstack.chunjun.connector.rocketmq.table.RocketMQOptions.OPTIONAL_START_TIME_MILLS; +import static com.dtstack.chunjun.connector.rocketmq.table.RocketMQOptions.OPTIONAL_TAG; +import static com.dtstack.chunjun.connector.rocketmq.table.RocketMQOptions.OPTIONAL_TIME_ZONE; +import static com.dtstack.chunjun.connector.rocketmq.table.RocketMQOptions.TOPIC; +import static org.apache.flink.table.factories.FactoryUtil.SCAN_PARALLELISM; +import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper; + +/** @author shitou @date 2022/5/17 * */ +public class RocketMQDynamicTableFactory implements DynamicTableSourceFactory { + + private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + public static final String IDENTIFIER = "rocketmq-x"; + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + transformContext(this, context); + final FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context); + helper.validate(); + + final ReadableConfig config = helper.getOptions(); + + Map rawProperties = context.getCatalogTable().getOptions(); + + final DescriptorProperties descriptorProperties = new DescriptorProperties(); + descriptorProperties.putProperties(rawProperties); + TableSchema physicalSchema = + TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + + descriptorProperties.putTableSchema("schema", physicalSchema); + + Integer parallelism = config.get(SCAN_PARALLELISM); + + return new RocketMQScanTableSource( + descriptorProperties, physicalSchema, getSourceConfig(config), parallelism); + } + + private RocketMQConf getSourceConfig(ReadableConfig config) { + String topic = config.get(TOPIC); + String consumerGroup = config.get(CONSUMER_GROUP); + String nameServerAddress = config.get(NAME_SERVER_ADDRESS); + String tag = config.get(OPTIONAL_TAG); + String mode = config.get(OPTIONAL_START_OFFSET_MODE); + String encoding = config.get(OPTIONAL_ENCODING); + String accessKey = config.get(OPTIONAL_ACCESS_KEY); + String secretKey = config.get(OPTIONAL_SECRET_KEY); + String accessChannel = config.get(OPTIONAL_ACCESS_CHANNEL); + int heartbeat = config.get(OPTIONAL_HEART_BEAT_BROKER_INTERVAL); + int persistInterval = config.get(OPTIONAL_PERSIST_CONSUMER_INTERVAL); + int batchSize = config.get(OPTIONAL_CONSUMER_BATCH_SIZE); + long startMessageOffset = config.get(OPTIONAL_START_MESSAGE_OFFSET); + long startMessageTimeStamp = config.get(OPTIONAL_START_MESSAGE_TIMESTAMP); + + String startDateTime = config.get(OPTIONAL_START_TIME); + String timeZone = config.get(OPTIONAL_TIME_ZONE); + long startTime = config.get(OPTIONAL_START_TIME_MILLS); + if (startTime == -1) { + if (!StringUtils.isNullOrWhitespaceOnly(startDateTime)) { + try { + startTime = parseDateString(startDateTime, timeZone); + } catch (ParseException e) { + throw new RuntimeException( + String.format( + "Incorrect datetime format: %s, pls use ISO-8601 " + + "complete date plus hours, minutes and seconds format:%s.", + startDateTime, DATE_FORMAT), + e); + } + } + } + long stopInMs = Long.MAX_VALUE; + String endDateTime = config.get(OPTIONAL_END_TIME); + if (!StringUtils.isNullOrWhitespaceOnly(endDateTime)) { + try { + stopInMs = parseDateString(endDateTime, timeZone); + } catch (ParseException e) { + throw new RuntimeException( + String.format( + "Incorrect datetime format: %s, pls use ISO-8601 " + + "complete date plus hours, minutes and seconds format:%s.", + endDateTime, DATE_FORMAT), + e); + } + Preconditions.checkArgument( + stopInMs >= startTime, "Start time should be less than stop time."); + } + + return new RocketMQConf.Builder() + .setStartTimeMs(startMessageOffset < 0 ? startTime : -1L) + .setPersistConsumerOffsetInterval(persistInterval) + .setStartMessageTimeStamp(startMessageTimeStamp) + .setMode(RocketMQConf.StartMode.getFromName(mode)) + .setStartMessageOffset(startMessageOffset) + .setHeartbeatBrokerInterval(heartbeat) + .setNameserverAddress(nameServerAddress) + .setConsumerGroup(consumerGroup) + .setAccessChannel(accessChannel) + .setAccessKey(accessKey) + .setSecretKey(secretKey) + .setFetchSize(batchSize) + .setEncoding(encoding) + .setEndTimeMs(stopInMs) + .setTopic(topic) + .setTag(tag) + .build(); + } + + private void transformContext(DynamicTableFactory factory, Context context) { + Map catalogOptions = context.getCatalogTable().getOptions(); + Map convertedOptions = + normalizeOptionCaseAsFactory(factory, catalogOptions); + catalogOptions.clear(); + for (Map.Entry entry : convertedOptions.entrySet()) { + catalogOptions.put(entry.getKey(), entry.getValue()); + } + } + + private Map normalizeOptionCaseAsFactory( + Factory factory, Map options) { + Map normalizedOptions = new HashMap<>(16); + Map requiredOptionKeysLowerCaseToOriginal = + factory.requiredOptions().stream() + .collect( + Collectors.toMap( + option -> option.key().toLowerCase(), ConfigOption::key)); + Map optionalOptionKeysLowerCaseToOriginal = + factory.optionalOptions().stream() + .collect( + Collectors.toMap( + option -> option.key().toLowerCase(), ConfigOption::key)); + for (Map.Entry entry : options.entrySet()) { + final String catalogOptionKey = entry.getKey(); + final String catalogOptionValue = entry.getValue(); + normalizedOptions.put( + requiredOptionKeysLowerCaseToOriginal.containsKey( + catalogOptionKey.toLowerCase()) + ? requiredOptionKeysLowerCaseToOriginal.get( + catalogOptionKey.toLowerCase()) + : optionalOptionKeysLowerCaseToOriginal.getOrDefault( + catalogOptionKey.toLowerCase(), catalogOptionKey), + catalogOptionValue); + } + return normalizedOptions; + } + + private Long parseDateString(String dateString, String timeZone) throws ParseException { + FastDateFormat simpleDateFormat = + FastDateFormat.getInstance(DATE_FORMAT, TimeZone.getTimeZone(timeZone)); + return simpleDateFormat.parse(dateString).getTime(); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + Set> requiredOptions = new HashSet<>(); + requiredOptions.add(TOPIC); + requiredOptions.add(CONSUMER_GROUP); + requiredOptions.add(NAME_SERVER_ADDRESS); + return requiredOptions; + } + + @Override + public Set> optionalOptions() { + Set> optionalOptions = new HashSet<>(); + optionalOptions.add(OPTIONAL_TAG); + optionalOptions.add(OPTIONAL_ACCESS_KEY); + optionalOptions.add(OPTIONAL_SECRET_KEY); + optionalOptions.add(OPTIONAL_ACCESS_CHANNEL); + optionalOptions.add(OPTIONAL_HEART_BEAT_BROKER_INTERVAL); + optionalOptions.add(OPTIONAL_PERSIST_CONSUMER_INTERVAL); + optionalOptions.add(OPTIONAL_CONSUMER_BATCH_SIZE); + optionalOptions.add(OPTIONAL_START_OFFSET_MODE); + optionalOptions.add(OPTIONAL_START_MESSAGE_TIMESTAMP); + optionalOptions.add(OPTIONAL_START_MESSAGE_OFFSET); + optionalOptions.add(OPTIONAL_START_TIME_MILLS); + optionalOptions.add(OPTIONAL_START_TIME); + optionalOptions.add(OPTIONAL_END_TIME); + optionalOptions.add(OPTIONAL_TIME_ZONE); + optionalOptions.add(OPTIONAL_ENCODING); + optionalOptions.add(SCAN_PARALLELISM); + return optionalOptions; + } +} diff --git a/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/table/RocketMQOptions.java b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/table/RocketMQOptions.java new file mode 100644 index 0000000000..476ac97bc8 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/table/RocketMQOptions.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.rocketmq.table; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** @author shitou @date 2022/5/17 * */ +public class RocketMQOptions { + + // common + + public static final ConfigOption TOPIC = + ConfigOptions.key("topic").stringType().noDefaultValue(); + + public static final ConfigOption NAME_SERVER_ADDRESS = + ConfigOptions.key("nameserver.address").stringType().noDefaultValue(); + + public static final ConfigOption OPTIONAL_HEART_BEAT_BROKER_INTERVAL = + ConfigOptions.key("heartbeat.broker.interval").intType().defaultValue(30000); + + public static final ConfigOption OPTIONAL_ACCESS_KEY = + ConfigOptions.key("access.key").stringType().noDefaultValue(); + + public static final ConfigOption OPTIONAL_SECRET_KEY = + ConfigOptions.key("secret.key").stringType().noDefaultValue(); + + public static final ConfigOption OPTIONAL_ACCESS_CHANNEL = + ConfigOptions.key("access.channel").stringType().defaultValue("LOCAL"); + + // consumer + + public static final ConfigOption CONSUMER_GROUP = + ConfigOptions.key("consumer.group").stringType().noDefaultValue(); + + public static final ConfigOption OPTIONAL_TAG = + ConfigOptions.key("tag").stringType().defaultValue("*"); + + public static final ConfigOption OPTIONAL_START_MESSAGE_OFFSET = + ConfigOptions.key("start.message-offset").longType().defaultValue(-1L); + + public static final ConfigOption OPTIONAL_START_MESSAGE_TIMESTAMP = + ConfigOptions.key("start.message-timestamp").longType().defaultValue(-1L); + + public static final ConfigOption OPTIONAL_START_OFFSET_MODE = + ConfigOptions.key("consumer.start-offset-mode").stringType().defaultValue("latest"); + + public static final ConfigOption OPTIONAL_START_TIME_MILLS = + ConfigOptions.key("start.time.ms".toLowerCase()).longType().defaultValue(-1L); + + public static final ConfigOption OPTIONAL_START_TIME = + ConfigOptions.key("start.time".toLowerCase()).stringType().noDefaultValue(); + + public static final ConfigOption OPTIONAL_END_TIME = + ConfigOptions.key("end.time").stringType().noDefaultValue(); + + public static final ConfigOption OPTIONAL_TIME_ZONE = + ConfigOptions.key("time.zone".toLowerCase()).stringType().defaultValue("GMT+8"); + + public static final ConfigOption OPTIONAL_PERSIST_CONSUMER_INTERVAL = + ConfigOptions.key("persist.consumer-offset-interval").intType().defaultValue(5000); + + public static final ConfigOption OPTIONAL_ENCODING = + ConfigOptions.key("encoding").stringType().defaultValue("UTF-8"); + + // public static final ConfigOption OPTIONAL_FIELD_DELIMITER = + // ConfigOptions.key("field.delimiter").stringType().defaultValue("\u0001"); + + // public static final ConfigOption OPTIONAL_LINE_DELIMITER = + // ConfigOptions.key("line.delimiter").stringType().defaultValue("\n"); + + public static final ConfigOption OPTIONAL_CONSUMER_BATCH_SIZE = + ConfigOptions.key("consumer.batch-size").intType().defaultValue(32); + + // producer + + public static final ConfigOption PRODUCER_GROUP = + ConfigOptions.key("producer.group").stringType().noDefaultValue(); + + public static final ConfigOption OPTIONAL_COLUMN_ERROR_DEBUG = + ConfigOptions.key("column.error.debug").booleanType().defaultValue(true); + + public static final ConfigOption OPTIONAL_LENGTH_CHECK = + ConfigOptions.key("length.check").stringType().defaultValue("NONE"); + + public static final ConfigOption OPTIONAL_WRITE_RETRY_TIMES = + ConfigOptions.key("retry.times").intType().defaultValue(10); + + public static final ConfigOption OPTIONAL_WRITE_SLEEP_TIME_MS = + ConfigOptions.key("sleep.time.ms").longType().defaultValue(5000L); + + public static final ConfigOption OPTIONAL_WRITE_IS_DYNAMIC_TAG = + ConfigOptions.key("is.dynamic.tag").booleanType().defaultValue(false); + + public static final ConfigOption OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN = + ConfigOptions.key("dynamic.tag.column").stringType().noDefaultValue(); + + public static final ConfigOption OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN_WRITE_INCLUDED = + ConfigOptions.key("dynamic.tag.column-write-included").booleanType().defaultValue(true); + + public static final ConfigOption OPTIONAL_WRITE_KEY_COLUMNS = + ConfigOptions.key("key.columns").stringType().noDefaultValue(); + + public static final ConfigOption OPTIONAL_WRITE_KEYS_TO_BODY = + ConfigOptions.key("write.keys.to-body").booleanType().defaultValue(false); +} diff --git a/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/utils/RetryUtil.java b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/utils/RetryUtil.java new file mode 100644 index 0000000000..623be0fc46 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/utils/RetryUtil.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.rocketmq.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +public class RetryUtil { + private static final Logger log = LoggerFactory.getLogger(RetryUtil.class); + + private static final long INITIAL_BACKOFF = 200; + private static final long MAX_BACKOFF = 5000; + private static final int MAX_ATTEMPTS = 5; + + private RetryUtil() {} + + public static void waitForMs(long sleepMs) { + try { + Thread.sleep(sleepMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + public static T call(Callable callable, String errorMsg) throws Exception { + long backoff = INITIAL_BACKOFF; + int retries = 0; + do { + try { + return callable.call(); + } catch (Exception ex) { + if (retries >= MAX_ATTEMPTS) { + throw ex; + } + log.error("{}, retry {}/{}", errorMsg, retries, MAX_ATTEMPTS, ex); + retries++; + } + waitForMs(backoff); + backoff = Math.min(backoff * 2, MAX_BACKOFF); + } while (true); + } +} diff --git a/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/utils/RocketMQUtils.java b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/utils/RocketMQUtils.java new file mode 100644 index 0000000000..415b085e83 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-rocketmq/src/main/java/com/dtstack/chunjun/connector/rocketmq/utils/RocketMQUtils.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.rocketmq.utils; + +import com.dtstack.chunjun.connector.rocketmq.conf.RocketMQConf; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.Validate; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.AccessChannel; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; + +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public final class RocketMQUtils { + + public static String getInstanceName(String... args) { + if (null != args && args.length > 0) { + return String.join("_", args); + } + return ManagementFactory.getRuntimeMXBean().getName() + "_" + System.nanoTime(); + } + + /** + * Average Hashing queue algorithm Refer: + * org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely + */ + public static List allocate( + Collection mqSet, int numberOfParallelTasks, int indexOfThisTask) { + ArrayList mqAll = new ArrayList<>(mqSet); + Collections.sort(mqAll); + List result = new ArrayList<>(); + int mod = mqAll.size() % numberOfParallelTasks; + int averageSize = + mqAll.size() <= numberOfParallelTasks + ? 1 + : (mod > 0 && indexOfThisTask < mod + ? mqAll.size() / numberOfParallelTasks + 1 + : mqAll.size() / numberOfParallelTasks); + int startIndex = + (mod > 0 && indexOfThisTask < mod) + ? indexOfThisTask * averageSize + : indexOfThisTask * averageSize + mod; + int range = Math.min(averageSize, mqAll.size() - startIndex); + for (int i = 0; i < range; i++) { + result.add(mqAll.get((startIndex + i) % mqAll.size())); + } + return result; + } + + public static AclClientRPCHook buildAclRPCHook(String accessKey, String secretKey) { + if (!StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey)) { + return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)); + } + return null; + } + + public static void buildConsumer(RocketMQConf conf, DefaultMQPullConsumer consumer) { + Validate.notEmpty(conf.getNameserverAddress()); + consumer.setNamesrvAddr(conf.getNameserverAddress()); + consumer.setHeartbeatBrokerInterval(conf.getHeartbeatBrokerInterval()); + // When using aliyun products, you need to set up channels + consumer.setAccessChannel(AccessChannel.valueOf(conf.getAccessChannel())); + consumer.setUnitName(conf.getUnitName()); + + consumer.setMessageModel(MessageModel.CLUSTERING); + consumer.setPersistConsumerOffsetInterval(conf.getPersistConsumerOffsetInterval()); + } +} diff --git a/chunjun-connectors/chunjun-connector-rocketmq/src/main/resources/services/org.apache.flink.table.factories.Factory b/chunjun-connectors/chunjun-connector-rocketmq/src/main/resources/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000..625b4faf5c --- /dev/null +++ b/chunjun-connectors/chunjun-connector-rocketmq/src/main/resources/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.dtstack.chunjun.connector.rocketmq.table.RocketMQDynamicTableFactory diff --git a/chunjun-connectors/pom.xml b/chunjun-connectors/pom.xml index 952f3ae728..620a04b165 100644 --- a/chunjun-connectors/pom.xml +++ b/chunjun-connectors/pom.xml @@ -24,6 +24,7 @@ chunjun-connector-kafka chunjun-connector-emqx + chunjun-connector-rocketmq chunjun-connector-jdbc-base diff --git a/chunjun-examples/sql/rocketmq/rocketmq_stream.sql b/chunjun-examples/sql/rocketmq/rocketmq_stream.sql new file mode 100644 index 0000000000..76a821c286 --- /dev/null +++ b/chunjun-examples/sql/rocketmq/rocketmq_stream.sql @@ -0,0 +1,59 @@ +CREATE TABLE rocketmq_source +( + `id` BIGINT, + `name` STRING, + `age` INT, + `boolean_col` BOOLEAN, + `tinyint_col` TINYINT, + `smallint_col` SMALLINT, + `float_col` FLOAT, + `double_col` DOUBLE, + `decimal_col` DECIMAL(18,8), + `char_col` CHAR, + `time` TIME, + `date_col` DATE, + `timestamp_col` TIMESTAMP, + `byte_col` BINARY +) WITH ( + 'connector' = 'rocketmq-x', + 'topic' = 'shitou_test', + 'tag' = 'chunjun-1|chunjun-2', + 'consumer.group' = 'shitou', + 'access.key' = 'RocketMQ', + 'secret.key' = '12345678', + 'nameserver.address' = '127.0.0.1:9876', + 'consumer.start-offset-mode' = 'timestamp', --从指定的时间戳开始消费 + 'start.message-offset' = '0', + 'start.time' = '2022-06-15 15:27:18', + 'heartbeat.broker.interval' = '35000', + 'persist.consumer-offset-interval' = '4500', + 'scan.parallelism' = '2' -- 并行度 + ); + +CREATE TABLE sink +( + `id` BIGINT, + `name` STRING, + `age` INT, + `boolean_col` BOOLEAN, + `tinyint_col` TINYINT, + `smallint_col` SMALLINT, + `float_col` FLOAT, + `double_col` DOUBLE, + `decimal_col` DECIMAL(18,8), + `char_col` CHAR, + `time` TIME, + `date_col` DATE, + `timestamp_col` TIMESTAMP, + `byte_col` BINARY +) WITH ( + 'connector' = 'stream-x', + 'print' = 'true' + ); + +insert into sink +select * +from rocketmq_source; + + + diff --git "a/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/rocketmq/rocketmq-source.md" "b/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/rocketmq/rocketmq-source.md" new file mode 100644 index 0000000000..8b673feb24 --- /dev/null +++ "b/docs_zh/ChunJun\350\277\236\346\216\245\345\231\250/rocketmq/rocketmq-source.md" @@ -0,0 +1,156 @@ +# RocketMQ Source + +## 一、介绍 +支持读取消息队列RocketMQ的数据; 支持解析的消息格式:JSON + +## 二、支持的版本 +RocketMQ 4.4+ + +## 三、插件名称 +| sql | rocketmq-x | +| --- | --- | + +## 四、参数说明 + +#### 1、sql + +- **connector** + - 描述:rocketmq-x + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **topic** + - 描述:需要读取的topic + - 必选:是 + - 参数类型:String + - 默认值:无 + + +- **consumer.group** + - 描述:消费者组 + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **nameserver.address** + - 描述:集群nameserver地址,名称服务器为客户端,包括生产者,消费者和命令行客户端提供最新的路由信息。多个nameserver地址之间用分号分割。 + - 必选:是 + - 字段类型:String + - 默认值:无 + + +- **tag** + - 描述:消息标签,方便服务器过滤使用。多个tag之间用'|'分隔 + - 必选:否 + - 字段类型:String + - 默认值:* + + +- **access.key** + - 描述:access.key,数据源开启acl后需要 + - 必选:否 + - 字段类型:String + - 默认值:无 + + +- **secret.key** + - 描述:secret.key,数据源开启acl后需要 + - 必选:否 + - 字段类型:String + - 默认值:无 + + +- **access.channel** + - 描述:对于阿里云的数据源实例,需要设置这个参数 + - 必选:否 + - 参数类型:string + - 默认值:LOCAL + + +- **consumer.batch-size** + - 描述:批量消费,一次消费多少条消息 + - 必选:否 + - 字段类型:Integer + - 默认值:32 + + +- **consumer.start-offset-mode** + - 描述:consumer消费方式,可选值: + - earliest:从最早的偏移量开始消费 + - latest:从最新的偏移量开始消费 + - timestamp:从指定的时间戳开始消费,搭配start.message-timestamp参数使用 + - offset:从指定的偏移量开始消费,搭配start.message-offset参数使用 + - 必选:否 + - 字段类型:String + - 默认值:latest + + +- **start.message-offset** + - 描述:当consumer.start-offset-mode为offset时,为每个消息队列指定起始消费的偏移量 + - 必选:否 + - 字段类型:Long + - 默认值:-1L + + +- **start.message-timestamp** + - 描述:当consumer.start-offset-mode为timestamp时,为每个消息队列指定起始消费的时间戳 + - 必选:否 + - 字段类型:Long + - 默认值:-1L + + +- **start.time.ms** + - 描述:意同start.message-timestamp,当start.message-timestamp未指定时生效 + - 必选:否 + - 参数类型:Long + - 默认值:-1L + + +- **start.time** + - 描述:意同start.message-timestamp,当start.message-timestamp、start.time.ms都未指定时生效,三者都未指定则以的当前系统时间为准。格式为yyyy-MM-dd HH:mm:ss的时间字符串 + - 必选:否 + - 参数类型:Long + - 默认值:无 + + +- **time.zone** + - 描述:搭配start.time参数使用,设置start.time的时区 + - 必选:否 + - 参数类型:String + - 默认值:GMT+8 + + +- **encoding** + - 描述:消息解码的字符集,consumer消费到的消息是二进制数组,以此字符集进行解码 + - 必选:否 + - 参数类型:String + - 默认值:UTF-8 + + +- **heartbeat.broker.interval** + - 描述:向Broker发送心跳间隔时间,单位毫秒 + - 必选:否 + - 参数类型:Integer + - 默认值:30000 + + +- **persist.consumer-offset-interval** + - 描述:持久化Consumer消费进度间隔时间,单位毫秒 + - 必选:否 + - 参数类型:Integer + - 默认值:5000 + + +## 五、数据类型 + +| 是否支持 | 数据类型 | +| --- | ---| +| 支持 | CHAR、VARCHAR、INT、BINARY、TINYINT、SMALLINT、INT、BIGINT、FLOAT、DOUBLE、DECIMAL、DATE、TIME、TIMESTAMP| +| 不支持 | | + + +## 六、配置示例 +见项目内`flinkx-examples`文件夹