Skip to content

Commit

Permalink
[BitSail][Connector] Support more send method in Kafka writer (#164)
Browse files Browse the repository at this point in the history
  • Loading branch information
zeliu authored Nov 15, 2022
1 parent 70f1f4c commit c55bfdd
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,36 @@ private int choosePartitionIdByFields(String[] fields) {
return kafkaProducer.choosePartitionIdByFields(fields);
}

private void send(KafkaRecord record) {
kafkaProducer.send(record, callback);
}

private void send(String value) {
kafkaProducer.send(value, callback);
kafkaProducer.send(KafkaRecord.builder().value(value).build(), callback);
}

private void send(String key, String value) {
kafkaProducer.send(KafkaRecord.builder().key(key).value(value).build(), callback);
}

private void sendByPartitionId(String value, int partitionId) {
kafkaProducer.send(value, partitionId, callback);
kafkaProducer.send(KafkaRecord.builder().value(value).partitionId(partitionId).build(), callback);
}

private void sendByPartitionId(String key, String value, int partitionId) {
kafkaProducer.send(KafkaRecord.builder().key(key).value(value).partitionId(partitionId).build(), callback);
}

private void sendWithHeaders(String value, Map<String, String> headers) {
kafkaProducer.send(KafkaRecord.builder().value(value).headers(headers).build(), callback);
}

private void sendWithHeaders(String key, String value, Map<String, String> headers) {
kafkaProducer.send(KafkaRecord.builder().key(key).value(value).headers(headers).build(), callback);
}

private void sendWithHeaders(String key, String value, int partitionId, Map<String, String> headers) {
kafkaProducer.send(KafkaRecord.builder().key(key).value(value).partitionId(partitionId).headers(headers).build(), callback);
}

private void closeProducer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.header.internals.RecordHeader;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -83,28 +85,24 @@ public KafkaProducer(String bootstrapServerAddress, String topic, Map<String, Ob
partitionList = getPartitionsByTopic(topic);
}

public Future<RecordMetadata> send(String value) {
return producer.send(new ProducerRecord<>(topic, value));
public Future<RecordMetadata> send(KafkaRecord record) {
return producer.send(generateProducerRecord(record));
}

public Future<RecordMetadata> send(String value, Callback callback) {
return producer.send(new ProducerRecord<>(topic, value), callback);
public Future<RecordMetadata> send(KafkaRecord record, Callback callback) {
return producer.send(generateProducerRecord(record), callback);
}

public Future<RecordMetadata> send(String value, int partitionId) {
return producer.send(new ProducerRecord<>(topic, partitionId, null, value));
}

public Future<RecordMetadata> send(String value, int partitionId, Callback callback) {
return producer.send(new ProducerRecord<>(topic, partitionId, null, value), callback);
}

public Future<RecordMetadata> send(String key, String value) {
return producer.send(new ProducerRecord<>(topic, key, value));
}

public Future<RecordMetadata> send(String key, String value, Callback callback) {
return producer.send(new ProducerRecord<>(topic, key, value), callback);
/**
* convert {@link KafkaRecord} to {@link ProducerRecord}
*/
private ProducerRecord generateProducerRecord(KafkaRecord record) {
ProducerRecord producerRecord = new ProducerRecord(topic, record.getPartitionId(), record.getTimestamp(), record.getKey(), record.getValue());
Map<String, String> headers = record.getHeaders();
if (Objects.nonNull(headers)) {
headers.keySet().stream().forEach(key -> producerRecord.headers().add(new RecordHeader(key, headers.get(key).getBytes())));
}
return producerRecord;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.bytedance.bitsail.connector.legacy.kafka.sink;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;

import java.util.Map;

@Getter
@Setter
@AllArgsConstructor
@Builder
@NoArgsConstructor
@ToString(of = {"key", "value", "partitionId"})
public class KafkaRecord {
private String key;
@NonNull
private String value;

private Integer partitionId;

private Long timestamp;

private Map<String, String> headers;
}

0 comments on commit c55bfdd

Please sign in to comment.