Skip to content

Commit

Permalink
Merge pull request #64 from Michaelg22/KafkaSourceAdd
Browse files Browse the repository at this point in the history
Kafka source add
  • Loading branch information
JieDing authored Sep 7, 2022
2 parents 09b64be + 6e1cf94 commit 36a5d74
Show file tree
Hide file tree
Showing 9 changed files with 380 additions and 0 deletions.
4 changes: 4 additions & 0 deletions connectors/source-kafka/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM openjdk:8-jre-alpine
WORKDIR /vance
COPY target/Source-kafka-1.0-SNAPSHOT-jar-with-dependencies.jar /vance
CMD ["java", "-jar", "./Source-kafka-1.0-SNAPSHOT-jar-with-dependencies.jar"]
80 changes: 80 additions & 0 deletions connectors/source-kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Kafka Source

## Overview

A [Vance Connector][vc] which transforms Kafka messages from topics to CloudEvents and deliver them to the target URL.

## User Guidelines

### Connector Introduction

The Kafka Source is a [Vance Connector][vc] which aims to generate CloudEvents in a way that wraps the body of the
original message into the `data` field of a new CloudEvent.
## The ideal message
The ideal type of event for the Kafka source is a String in a JSON format. But it can handle any other type of data provided by Kafka.
> JSON Formatted String
> String = "{ "name": "Jason", "age": "30"}"
>
For example, if an original message looks like:
... json
> { "name": "Jason", "age": "30" }
```
A Kafka message transformed into a CloudEvent looks like:
``` JSON
{
"id" : "4ad0b59fc-3e1f-484d-8925-bd78aab15123",
"source" : "kafka.localhost.topic2",
"type" : "kafka.message",
"datacontenttype" : "application/json or Plain/text",
"time" : "2022-09-07T10:21:49.668Z",
"data" : {
"name": "Jason",
"age": "30"
}
}
```

## Kafka Source Configs

Users can specify their configs by either setting environments variables or mounting a config.json to
`/vance/config/config.json` when they run the connector. Find examples of setting configs [here][config].

### Config Fields of the kafka Source

| Configs | Description | Example |
|:----------|:--------------------------------------------------------------------------------|:------------------------|
| v_target | v_target is used to specify the target URL HTTP Source will send CloudEvents to | "http://localhost:8081" |
| KAFKA_SERVER_URL | The URL of the Kafka Cluster the Kafka Source is listening on | "8080" |
| KAFKA_SERVER_PORT | v_port is used to specify the port Kafka Source is listening on | "8080" |
| CLIENT_ID | An optional identifier for multiple Kafka Sources that is passed to a Kafka broker with every request. | "kafkaSource" |
| TOPIC_LIST | The source will listen to the topic or topics specified. | "topic1" or "topic1, topic2, topic3" |

## Kafka Source Image



## Local Development

You can run the source codes of the Kafka Source locally as well.

### Building via Maven

```shell
$ cd connectors/source-Kafka
$ mvn clean package
```

### Running via Maven

```shell
$ mvn exec:java -Dexec.mainClass="com.linkall.source.Kafka.Entrance"
```

⚠️ NOTE: For better local development and test, the connector can also read configs from `main/resources/config.json`. So, you don't need to
declare any environment variables or mount a config file to `/vance/config/config.json`.

[vc]: https://github.com/linkall-labs/vance-docs/blob/main/docs/concept.md
[config]: https://github.com/linkall-labs/vance-docs/blob/main/docs/connector.md
71 changes: 71 additions & 0 deletions connectors/source-kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.linkall</groupId>
<artifactId>Source-kafka</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>18</maven.compiler.source>
<maven.compiler.target>18</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.linkall</groupId>
<artifactId>cdk-java</artifactId>
<version>0.2.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.4.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<archive>
<manifest>
<mainClass>
com.linkall.source.kafka.Entrance
</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.linkall.source.kafka;

import com.linkall.vance.core.VanceApplication;

public class Entrance {
public static void main(String[] args) {
VanceApplication.run(KafkaSource.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.linkall.source.kafka;

import com.linkall.vance.core.Adapter1;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.vertx.core.json.JsonObject;


import java.net.URI;
import java.time.OffsetDateTime;
import java.util.UUID;

public class KafkaAdapter implements Adapter1<KafkaData> {
private static final CloudEventBuilder template = CloudEventBuilder.v1();

public CloudEvent adapt(KafkaData kafkaData) {
template.withId(UUID.randomUUID().toString());
URI uri = URI.create("kafka." + kafkaData.KAFKA_SERVER_URL() +"."+ kafkaData.topic());
template.withSource(uri);
template.withType("kafka.message");
template.withTime(kafkaData.timeStamp());
try{
new JsonObject(new String(kafkaData.value()));
template.withDataContentType("application/json");
}catch(Exception e){
template.withDataContentType("plain/text");
}
template.withData(kafkaData.value());
return template.build();

}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.linkall.source.kafka;

import java.time.OffsetDateTime;

public record KafkaData(String topic, String key, byte[] value, String KAFKA_SERVER_URL, OffsetDateTime timeStamp) {


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.linkall.source.kafka;

import com.linkall.vance.core.Adapter;
import com.linkall.vance.core.Source;
import io.vertx.core.Vertx;
import io.vertx.ext.web.client.WebClient;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


public class KafkaSource implements Source {

public final static Vertx vertx = Vertx.vertx();
public final static WebClient webClient = WebClient.create(vertx);

public void start(){
KafkaWorker worker = new KafkaWorker("kafkawork",false);
worker.start();
}

public Adapter getAdapter() {
return new KafkaAdapter();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package com.linkall.source.kafka;

import com.linkall.vance.common.config.ConfigUtil;
import io.cloudevents.CloudEvent;
import io.cloudevents.http.vertx.VertxMessageFactory;
import io.cloudevents.jackson.JsonFormat;
import io.vertx.circuitbreaker.CircuitBreaker;
import io.vertx.circuitbreaker.CircuitBreakerOptions;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.ext.web.client.HttpResponse;
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;


import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;


public class KafkaWorker extends ShutdownableThread {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaWorker.class);
private final KafkaConsumer<byte[], byte[]> consumer;
private final String topicList;
private final KafkaAdapter adapter;
private final String KAFKA_SERVER_URL;

public final ConcurrentHashMap<TopicPartition,Long> offsets = new ConcurrentHashMap<>();


public KafkaWorker(String name, boolean isInterruptible) {
super(name, isInterruptible);

KAFKA_SERVER_URL = ConfigUtil.getString("KAFKA_SERVER_URL");
String KAFKA_SERVER_PORT = ConfigUtil.getString("KAFKA_SERVER_PORT");
String CLIENT_ID = ConfigUtil.getString("CLIENT_ID");
topicList = ConfigUtil.getString("TOPIC_LIST");

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CLIENT_ID);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");

consumer = new KafkaConsumer<>(properties);
String[] topicsListArray = null;
boolean haveSpace = topicList.contains(", ");
if (haveSpace) {
topicsListArray = topicList.split(", ");
} else
topicsListArray = topicList.split(",");


consumer.subscribe(Arrays.asList(topicsListArray));

adapter = new KafkaAdapter();
}


@Override
public void doWork() {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(25));
System.out.println("records.partitions() size: "+records.partitions().size());
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition);
System.out.println("partitionRecords size: "+partitionRecords.size());
ConcurrentHashMap<Long,Boolean> cm = new ConcurrentHashMap<>();

for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
OffsetDateTime timeStamp = new Date(record.timestamp()).toInstant().atOffset( ZoneOffset.UTC );
String key64 = Base64.getEncoder().encodeToString(record.key());
KafkaData kafkaData = new KafkaData(record.topic(), key64, record.value(), KAFKA_SERVER_URL, timeStamp);
CloudEvent event = adapter.adapt(kafkaData);
String sink = ConfigUtil.getVanceSink();
System.out.println("message: " + Arrays.toString(record.value()));
System.out.println("offset: " + record.offset());


CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", KafkaSource.vertx,
new CircuitBreakerOptions()
.setMaxRetries(5)
.setTimeout(30000)
);
breaker.<String>execute(promise -> {
LOGGER.info("try to send request");

Future<HttpResponse<Buffer>> responseFuture = VertxMessageFactory.createWriter(KafkaSource.webClient.postAbs(sink))
.writeStructured(event, JsonFormat.CONTENT_TYPE);

responseFuture.onSuccess(resp-> {
promise.complete();
LOGGER.info("send task success");
cm.put(record.offset(),true);
});
responseFuture.onFailure(System.err::println);
}).onFailure(t->{
LOGGER.info("send task failed");
cm.put(record.offset(),false);
System.out.println("The targeted server is down!");
System.exit(1);
});

breaker.close();
}
while(cm.size()!=partitionRecords.size()){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

TreeMap<Long,Boolean> tm = new TreeMap<>(cm);
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset()+1;
Iterator<Map.Entry<Long,Boolean>> it = tm.entrySet().iterator();
while (it.hasNext()){
Map.Entry<Long,Boolean> entry = it.next();
if(!entry.getValue()){
lastOffset = entry.getKey();
break;
}
}
System.out.println("lastOffset: "+lastOffset);
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset)));

}

}
}
8 changes: 8 additions & 0 deletions connectors/source-kafka/src/main/resources/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"v_target": "http://localhost:8081",
"KAFKA_SERVER_URL": "localhost",
"KAFKA_SERVER_PORT": "9092",
"CLIENT_ID": "Source",
"TOPIC_LIST": "topic1,topic2"
}

0 comments on commit 36a5d74

Please sign in to comment.