Skip to content

Commit

Permalink
Add legacy connector rocketmq (#35)
Browse files Browse the repository at this point in the history
* Initialize rmq writer

* Add test (ignored) for rocketmq writer.

* Optimize writer itcase test.

* Regular checkstyle
  • Loading branch information
BlockLiu authored Oct 26, 2022
1 parent 1a76e56 commit a2bbdf0
Show file tree
Hide file tree
Showing 19 changed files with 1,400 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bitsail-connectors-legacy</artifactId>
<groupId>com.bytedance.bitsail</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>bitsail-connector-rocketmq</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>${rocketmq.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-connector-fake</artifactId>
<version>${revision}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>bitsail-connector-test</artifactId>
<version>${revision}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.legacy.rocketmq.config;

import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.connector.legacy.rocketmq.error.RocketMQPluginErrorCode;
import com.bytedance.bitsail.connector.legacy.rocketmq.option.RocketMQWriterOptions;

import lombok.Data;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@Data
public class RocketMQSinkConfig implements Serializable {
private static final long serialVersionUID = 2L;

private String nameServerAddress;
private String producerGroup;
private String topic;
private String tag;

private boolean enableBatchFlush;
private int batchSize;

private String accessKey;
private String secretKey;

private boolean logFailuresOnly;
private boolean enableSyncSend;

private int failureRetryTimes;
private int sendMsgTimeout;
private int maxMessageSize;

public RocketMQSinkConfig(BitSailConfiguration outputSliceConfig) {
this.nameServerAddress = outputSliceConfig.getNecessaryOption(RocketMQWriterOptions.NAME_SERVER_ADDRESS,
RocketMQPluginErrorCode.REQUIRED_VALUE);
this.producerGroup = outputSliceConfig.getUnNecessaryOption(RocketMQWriterOptions.PRODUCER_GROUP,
UUID.randomUUID().toString());
this.topic = outputSliceConfig.getNecessaryOption(RocketMQWriterOptions.TOPIC,
RocketMQPluginErrorCode.REQUIRED_VALUE);
this.tag = outputSliceConfig.get(RocketMQWriterOptions.TAG);

this.enableBatchFlush = outputSliceConfig.get(RocketMQWriterOptions.ENABLE_BATCH_FLUSH);
this.batchSize = outputSliceConfig.get(RocketMQWriterOptions.BATCH_SIZE);

this.accessKey = outputSliceConfig.get(RocketMQWriterOptions.ACCESS_KEY);
this.secretKey = outputSliceConfig.get(RocketMQWriterOptions.SECRET_KEY);

this.logFailuresOnly = outputSliceConfig.get(RocketMQWriterOptions.LOG_FAILURES_ONLY);
this.enableSyncSend = outputSliceConfig.get(RocketMQWriterOptions.ENABLE_SYNC_SEND);

this.failureRetryTimes = outputSliceConfig.get(RocketMQWriterOptions.SEND_FAILURE_RETRY_TIMES);
this.sendMsgTimeout = outputSliceConfig.get(RocketMQWriterOptions.SEND_MESSAGE_TIMEOUT);
this.maxMessageSize = outputSliceConfig.get(RocketMQWriterOptions.MAX_MESSAGE_SIZE);
}

@Override
public String toString() {
Map<String, String> configMap = new HashMap<>();
configMap.put("nameServerAddress", this.nameServerAddress);
configMap.put("producerGroup", this.producerGroup);
configMap.put("topic", this.topic);
configMap.put("tag", this.tag);
configMap.put("batchSize", this.batchSize + "");
configMap.put("enableBatchFlush", this.enableBatchFlush + "");
configMap.put("logFailuresOnly", this.logFailuresOnly + "");
configMap.put("enableSyncSend", this.enableSyncSend + "");
configMap.put("failureRetryTimes", this.failureRetryTimes + "");
configMap.put("sendMsgTimeout", this.sendMsgTimeout + "");
configMap.put("maxMessageSize", this.maxMessageSize + "");
return configMap.toString();
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.legacy.rocketmq.constant;

public class RocketMQConstants {

public static final String CONNECTOR_NAME = "rocketmq";

public static final int MAX_PARALLELISM_OUTPUT_ROCKETMQ = 5;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.legacy.rocketmq.error;

import com.bytedance.bitsail.common.exception.ErrorCode;

public enum RocketMQPluginErrorCode implements ErrorCode {

REQUIRED_VALUE("RocketMQ-01", "You missed parameter which is required, please check your configuration."),
UNSUPPORTED_FORMAT("RocketMQ-02", "Unsupported output format.");

private final String code;
private final String description;

RocketMQPluginErrorCode(String code, String description) {
this.code = code;
this.description = description;
}

@Override
public String getCode() {
return code;
}

@Override
public String getDescription() {
return description;
}

@Override
public String toString() {
return String.format("Code:[%s], Description:[%s].", this.code,
this.description);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.legacy.rocketmq.option;

import com.bytedance.bitsail.common.annotation.Essential;
import com.bytedance.bitsail.common.option.ConfigOption;
import com.bytedance.bitsail.common.option.WriterOptions;

import static com.bytedance.bitsail.common.option.ConfigOptions.key;
import static com.bytedance.bitsail.common.option.WriterOptions.WRITER_PREFIX;

public interface RocketMQWriterOptions extends WriterOptions.BaseWriterOptions {

@Essential
ConfigOption<String> NAME_SERVER_ADDRESS =
key(WRITER_PREFIX + "name_server_address")
.noDefaultValue(String.class);

ConfigOption<String> PRODUCER_GROUP =
key(WRITER_PREFIX + "producer_group")
.noDefaultValue(String.class);

@Essential
ConfigOption<String> TOPIC =
key(WRITER_PREFIX + "topic")
.noDefaultValue(String.class);

ConfigOption<String> TAG =
key(WRITER_PREFIX + "tag")
.noDefaultValue(String.class);

ConfigOption<Boolean> ENABLE_BATCH_FLUSH =
key(WRITER_PREFIX + "enable_batch_flush")
.defaultValue(true);

ConfigOption<Integer> BATCH_SIZE =
key(WRITER_PREFIX + "batch_size")
.defaultValue(100);

/**
* when encounter errors while sending:<br/>
* true: log the error<br/>
* false: throw exceptions
*/
ConfigOption<Boolean> LOG_FAILURES_ONLY =
key(WRITER_PREFIX + "log_failures_only")
.defaultValue(false);

ConfigOption<Boolean> ENABLE_SYNC_SEND =
key(WRITER_PREFIX + "enable_sync_send")
.defaultValue(false);

ConfigOption<String> ACCESS_KEY =
key(WRITER_PREFIX + "access_key")
.noDefaultValue(String.class);

ConfigOption<String> SECRET_KEY =
key(WRITER_PREFIX + "secret_key")
.noDefaultValue(String.class);

ConfigOption<Integer> SEND_FAILURE_RETRY_TIMES =
key(WRITER_PREFIX + "send_failure_retry_times")
.defaultValue(3);

ConfigOption<Integer> SEND_MESSAGE_TIMEOUT =
key(WRITER_PREFIX + "send_message_timeout_ms")
.defaultValue(3000);

ConfigOption<Integer> MAX_MESSAGE_SIZE =
key(WRITER_PREFIX + "max_message_size_bytes")
.defaultValue(4194304);

ConfigOption<String> KEY_FIELDS =
key(WRITER_PREFIX + "key")
.noDefaultValue(String.class);

ConfigOption<String> PARTITION_FIELDS =
key(WRITER_PREFIX + "partition_fields")
.noDefaultValue(String.class);

ConfigOption<String> FORMAT =
key(WRITER_PREFIX + "format")
.defaultValue("json");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.legacy.rocketmq.sink;

import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

public class HashQueueSelector implements MessageQueueSelector {

private int nullKeyCount;

public HashQueueSelector() {
super();
nullKeyCount = 0;
}

@Override
public MessageQueue select(List<MessageQueue> mqList, Message message, Object partitionKeys) {
int queueId;

if (partitionKeys != null) {
queueId = partitionKeys.hashCode() % mqList.size();
} else {
queueId = nullKeyCount % mqList.size();
nullKeyCount = (nullKeyCount + 1) % mqList.size();
}

return mqList.get(queueId);
}
}

Loading

0 comments on commit a2bbdf0

Please sign in to comment.