Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka source add #64

Merged
merged 12 commits into from
Sep 7, 2022
4 changes: 4 additions & 0 deletions connectors/source-kafka/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM openjdk:8-jre-alpine
WORKDIR /vance
COPY target/Source-kafka-1.0-SNAPSHOT-jar-with-dependencies.jar /vance
CMD ["java", "-jar", "./Source-kafka-1.0-SNAPSHOT-jar-with-dependencies.jar"]
80 changes: 80 additions & 0 deletions connectors/source-kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Kafka Source

## Overview

A [Vance Connector][vc] which transforms Kafka messages from topics to CloudEvents and deliver them to the target URL.

## User Guidelines

### Connector Introduction

The Kafka Source is a [Vance Connector][vc] which aims to generate CloudEvents in a way that wraps the body of the
original message into the `data` field of a new CloudEvent.
## The ideal message
The ideal type of event for the Kafka source is a String in a JSON format. But it can handle any other type of data provided by Kafka.
> JSON Formatted String
> String = "{ "name": "Jason", "age": "30"}"
>

For example, if an original message looks like:
... json
> { "name": "Jason", "age": "30" }
```

A Kafka message transformed into a CloudEvent looks like:

``` JSON
{
"id" : "4ad0b59fc-3e1f-484d-8925-bd78aab15123",
"source" : "kafka.localhost.topic2",
"type" : "kafka.message",
"datacontenttype" : "application/json or Plain/text",
"time" : "2022-09-07T10:21:49.668Z",
"data" : {
"name": "Jason",
"age": "30"
}
}
```

## Kafka Source Configs

Users can specify their configs by either setting environments variables or mounting a config.json to
`/vance/config/config.json` when they run the connector. Find examples of setting configs [here][config].

### Config Fields of the kafka Source

| Configs | Description | Example |
|:----------|:--------------------------------------------------------------------------------|:------------------------|
| v_target | v_target is used to specify the target URL HTTP Source will send CloudEvents to | "http://localhost:8081" |
| KAFKA_SERVER_URL | The URL of the Kafka Cluster the Kafka Source is listening on | "8080" |
| KAFKA_SERVER_PORT | v_port is used to specify the port Kafka Source is listening on | "8080" |
| CLIENT_ID | An optional identifier for multiple Kafka Sources that is passed to a Kafka broker with every request. | "kafkaSource" |
| TOPIC_LIST | The source will listen to the topic or topics specified. | "topic1" or "topic1, topic2, topic3" |

## Kafka Source Image



## Local Development

You can run the source codes of the Kafka Source locally as well.

### Building via Maven

```shell
$ cd connectors/source-Kafka
$ mvn clean package
```

### Running via Maven

```shell
$ mvn exec:java -Dexec.mainClass="com.linkall.source.Kafka.Entrance"
```

⚠️ NOTE: For better local development and test, the connector can also read configs from `main/resources/config.json`. So, you don't need to
declare any environment variables or mount a config file to `/vance/config/config.json`.

[vc]: https://github.com/linkall-labs/vance-docs/blob/main/docs/concept.md
[config]: https://github.com/linkall-labs/vance-docs/blob/main/docs/connector.md
71 changes: 71 additions & 0 deletions connectors/source-kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.linkall</groupId>
<artifactId>Source-kafka</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>18</maven.compiler.source>
<maven.compiler.target>18</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.linkall</groupId>
<artifactId>cdk-java</artifactId>
<version>0.2.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.4.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<archive>
<manifest>
<mainClass>
com.linkall.source.kafka.Entrance
</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.linkall.source.kafka;

import com.linkall.vance.core.VanceApplication;

public class Entrance {
public static void main(String[] args) {
VanceApplication.run(KafkaSource.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.linkall.source.kafka;

import com.linkall.vance.core.Adapter1;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.vertx.core.json.JsonObject;


import java.net.URI;
import java.time.OffsetDateTime;
import java.util.UUID;

public class KafkaAdapter implements Adapter1<KafkaData> {
private static final CloudEventBuilder template = CloudEventBuilder.v1();

public CloudEvent adapt(KafkaData kafkaData) {
template.withId(UUID.randomUUID().toString());
URI uri = URI.create("kafka." + kafkaData.KAFKA_SERVER_URL() +"."+ kafkaData.topic());
template.withSource(uri);
template.withType("kafka.message");
template.withTime(kafkaData.timeStamp());
try{
new JsonObject(new String(kafkaData.value()));
template.withDataContentType("application/json");
}catch(Exception e){
template.withDataContentType("plain/text");
}
template.withData(kafkaData.value());
return template.build();

}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.linkall.source.kafka;

import java.time.OffsetDateTime;

public record KafkaData(String topic, String key, byte[] value, String KAFKA_SERVER_URL, OffsetDateTime timeStamp) {


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.linkall.source.kafka;

import com.linkall.vance.core.Adapter;
import com.linkall.vance.core.Source;
import io.vertx.core.Vertx;
import io.vertx.ext.web.client.WebClient;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


public class KafkaSource implements Source {

public final static Vertx vertx = Vertx.vertx();
public final static WebClient webClient = WebClient.create(vertx);

public void start(){
KafkaWorker worker = new KafkaWorker("kafkawork",false);
worker.start();
}

public Adapter getAdapter() {
return new KafkaAdapter();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package com.linkall.source.kafka;

import com.linkall.vance.common.config.ConfigUtil;
import io.cloudevents.CloudEvent;
import io.cloudevents.http.vertx.VertxMessageFactory;
import io.cloudevents.jackson.JsonFormat;
import io.vertx.circuitbreaker.CircuitBreaker;
import io.vertx.circuitbreaker.CircuitBreakerOptions;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.ext.web.client.HttpResponse;
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;


import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;


public class KafkaWorker extends ShutdownableThread {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaWorker.class);
private final KafkaConsumer<byte[], byte[]> consumer;
private final String topicList;
private final KafkaAdapter adapter;
private final String KAFKA_SERVER_URL;

public final ConcurrentHashMap<TopicPartition,Long> offsets = new ConcurrentHashMap<>();


public KafkaWorker(String name, boolean isInterruptible) {
super(name, isInterruptible);

KAFKA_SERVER_URL = ConfigUtil.getString("KAFKA_SERVER_URL");
String KAFKA_SERVER_PORT = ConfigUtil.getString("KAFKA_SERVER_PORT");
String CLIENT_ID = ConfigUtil.getString("CLIENT_ID");
topicList = ConfigUtil.getString("TOPIC_LIST");

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CLIENT_ID);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");

consumer = new KafkaConsumer<>(properties);
String[] topicsListArray = null;
boolean haveSpace = topicList.contains(", ");
if (haveSpace) {
topicsListArray = topicList.split(", ");
} else
topicsListArray = topicList.split(",");


consumer.subscribe(Arrays.asList(topicsListArray));

adapter = new KafkaAdapter();
}


@Override
public void doWork() {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(25));
System.out.println("records.partitions() size: "+records.partitions().size());
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition);
System.out.println("partitionRecords size: "+partitionRecords.size());
ConcurrentHashMap<Long,Boolean> cm = new ConcurrentHashMap<>();

for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
OffsetDateTime timeStamp = new Date(record.timestamp()).toInstant().atOffset( ZoneOffset.UTC );
String key64 = Base64.getEncoder().encodeToString(record.key());
KafkaData kafkaData = new KafkaData(record.topic(), key64, record.value(), KAFKA_SERVER_URL, timeStamp);
CloudEvent event = adapter.adapt(kafkaData);
String sink = ConfigUtil.getVanceSink();
System.out.println("message: " + Arrays.toString(record.value()));
System.out.println("offset: " + record.offset());


CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", KafkaSource.vertx,
new CircuitBreakerOptions()
.setMaxRetries(5)
.setTimeout(30000)
);
breaker.<String>execute(promise -> {
LOGGER.info("try to send request");

Future<HttpResponse<Buffer>> responseFuture = VertxMessageFactory.createWriter(KafkaSource.webClient.postAbs(sink))
.writeStructured(event, JsonFormat.CONTENT_TYPE);

responseFuture.onSuccess(resp-> {
promise.complete();
LOGGER.info("send task success");
cm.put(record.offset(),true);
});
responseFuture.onFailure(System.err::println);
}).onFailure(t->{
LOGGER.info("send task failed");
cm.put(record.offset(),false);
System.out.println("The targeted server is down!");
System.exit(1);
});

breaker.close();
}
while(cm.size()!=partitionRecords.size()){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

TreeMap<Long,Boolean> tm = new TreeMap<>(cm);
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset()+1;
Iterator<Map.Entry<Long,Boolean>> it = tm.entrySet().iterator();
while (it.hasNext()){
Map.Entry<Long,Boolean> entry = it.next();
if(!entry.getValue()){
lastOffset = entry.getKey();
break;
}
}
System.out.println("lastOffset: "+lastOffset);
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset)));

}

}
}
8 changes: 8 additions & 0 deletions connectors/source-kafka/src/main/resources/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"v_target": "http://localhost:8081",
"KAFKA_SERVER_URL": "localhost",
"KAFKA_SERVER_PORT": "9092",
"CLIENT_ID": "Source",
"TOPIC_LIST": "topic1,topic2"
}