Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Doc] Add *Kafka sink connector guide* #5210

Merged
merged 4 commits into from
Sep 22, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions site2/docs/io-kafka-sink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
---
id: io-kafka-sink
title: Kafka sink connector
sidebar_label: Kafka sink connector
---

The Kafka sink connector pulls messages from Pulsar topics and persists the messages
to Kafka topics.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tuteng are line7 and line8 accurate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Anonymitaet yeah, it is correct.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@murong00 thanks for your help

This guide explains how to configure and use the Kafka sink connector.

## Configuration

The configuration of the Kafka sink connector has the following parameters.

| Name | Type| Required | Default | Description
|------|----------|---------|-------------|-------------|
| `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. |
|`ack`|String|true|" " (empty string) |The number of acknowledgments that the producer requires the leader to receive before a request completes. <br/>This controls the durability of the sent records.
|`batchsize`|long|false|16384L|The batch size that a Kafka producer attempts to batch records together before sending them to brokers.
|`maxRequestSize`|long|false|1048576L|The maximum size of a Kafka request in bytes.
|`topic`|String|true|" " (empty string) |The Kafka topic which receives messages from Pulsar.
| `keyDeserializationClass` | String|false | org.apache.kafka.common.serialization.StringSerializer | The serializer class for Kafka producers to serialize keys.
| `valueDeserializationClass` | String|false | org.apache.kafka.common.serialization.ByteArraySerializer | The serializer class for Kafka producers to serialize values.<br/><br/>The serializer is set by a specific implementation of [`KafkaAbstractSink`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java).
|`producerConfigProperties`|Map|false|" " (empty string)|The producer configuration properties to be passed to producers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tuteng Are batchsize (line 20), maxRequestSize (line 21) and producerConfigProperties (line 25) optional or required?
I don't find this information in KafkaSinkConfig.java.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional, The required is true, indicating that it is necessary, otherwise, it is optional.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tuteng thank you, I've added.


### Example

Before using the Kafka sink connector, you need to create a configuration file through one of the following methods.

* JSON

```json
{
"bootstrapServers": "localhost:6667",
"topic": "test",
"acks": "1",
"batchSize": "16384",
"maxRequestSize": "1048576",
"producerConfigProperties":
{
"client.id": "test-pulsar-producer",
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "GSSAPI",
"sasl.kerberos.service.name": "kafka",
"acks": "all"
}
}

* YAML

```yaml
configs:
bootstrapServers: "localhost:6667"
topic: "test"
acks: "1"
batchSize: "16384"
maxRequestSize: "1048576"
producerConfigProperties:
client.id: "test-pulsar-producer"
security.protocol: "SASL_PLAINTEXT"
sasl.mechanism: "GSSAPI"
sasl.kerberos.service.name: "kafka"
acks: "all"
```