From 26adb83aaf950f27393d4800716351c81bc48f3a Mon Sep 17 00:00:00 2001 From: Anonymitaet Date: Wed, 18 Sep 2019 11:07:06 +0800 Subject: [PATCH 1/4] Add *Kafka sink connector guide* --- site2/docs/io-kafka-sink.md | 49 +++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 site2/docs/io-kafka-sink.md diff --git a/site2/docs/io-kafka-sink.md b/site2/docs/io-kafka-sink.md new file mode 100644 index 0000000000000..0cb5b6d77be99 --- /dev/null +++ b/site2/docs/io-kafka-sink.md @@ -0,0 +1,49 @@ + +--- +id: io-kafka-sink +title: Kafka sink connector +sidebar_label: Kafka sink connector +--- + +The Kafka sink connector pulls messages from Pulsar topics and persists the messages +to Kafka topics. + +This guide explains how to configure and use the Kafka sink connector. + +## Configuration + +The configuration of the Kafka sink connector has the following parameters. + +| Name | Type| Required | Default | Description +|------|----------|---------|-------------|-------------| +| `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. | +|`ack`|String|true|" " (empty string) |The number of acknowledgments that the producer requires the leader to receive before a request completes.
This controls the durability of the sent records. +|`batchsize`| +|`maxRequestSize`| +|`topic`| +| `keyDeserializationClass` | String|false | org.apache.kafka.common.serialization.StringDeserializer | The deserializer class for Kafka consumers to deserialize keys. +| `valueDeserializationClass` | String|false | org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer class for Kafka consumers to deserialize values. +|`producerConfigProperties` + + + + + + +### Example + +Before using the Kafka source connector, you need to create a configuration file through one of the following methods. + +* JSON + +"bootstrapServers": "localhost:6667" +"topic": "test" +"acks": "1" +"batchSize": "16384" +"maxRequestSize": "1048576" +"producerConfigProperties": + "client.id": "test-pulsar-producer" + "security.protocol": "SASL_PLAINTEXT" + "sasl.mechanism": "GSSAPI" + "sasl.kerberos.service.name": "kafka" + "acks": "all" \ No newline at end of file From 68993b3b6222bd7a3b5120160b2a4b524ccdb164 Mon Sep 17 00:00:00 2001 From: Anonymitaet Date: Wed, 18 Sep 2019 11:14:10 +0800 Subject: [PATCH 2/4] update --- site2/docs/io-kafka-sink.md | 63 +++++++++++++++++++++++-------------- 1 file changed, 40 insertions(+), 23 deletions(-) diff --git a/site2/docs/io-kafka-sink.md b/site2/docs/io-kafka-sink.md index 0cb5b6d77be99..c92601e3a9992 100644 --- a/site2/docs/io-kafka-sink.md +++ b/site2/docs/io-kafka-sink.md @@ -1,4 +1,3 @@ - --- id: io-kafka-sink title: Kafka sink connector @@ -18,32 +17,50 @@ The configuration of the Kafka sink connector has the following parameters. |------|----------|---------|-------------|-------------| | `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. | |`ack`|String|true|" " (empty string) |The number of acknowledgments that the producer requires the leader to receive before a request completes.
This controls the durability of the sent records. -|`batchsize`| -|`maxRequestSize`| -|`topic`| -| `keyDeserializationClass` | String|false | org.apache.kafka.common.serialization.StringDeserializer | The deserializer class for Kafka consumers to deserialize keys. -| `valueDeserializationClass` | String|false | org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer class for Kafka consumers to deserialize values. -|`producerConfigProperties` - - - - +|`batchsize`|long||16384L|The batch size that a Kafka producer attempts to batch records together before sending them to brokers. +|`maxRequestSize`|long||1048576L|The maximum size of a Kafka request in bytes. +|`topic`|String|true|" " (empty string) |The Kafka topic which receives messages from Pulsar. +| `keyDeserializationClass` | String|false | org.apache.kafka.common.serialization.StringSerializer | The serializer class for Kafka producers to serialize keys. +| `valueDeserializationClass` | String|false | org.apache.kafka.common.serialization.ByteArraySerializer | The serializer class for Kafka producers to serialize values.

The serializer is set by a specific implementation of [`KafkaAbstractSink`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java). +|`producerConfigProperties`|Map||" " (empty string)|The producer configuration properties to be passed to producers.

**Note: other properties specified in the connector configuration file take precedence over this configuration**. ### Example -Before using the Kafka source connector, you need to create a configuration file through one of the following methods. +Before using the Kafka sink connector, you need to create a configuration file through one of the following methods. * JSON + + ```json + { + "bootstrapServers": "localhost:6667", + "topic": "test", + "acks": "1", + "batchSize": "16384", + "maxRequestSize": "1048576", + "producerConfigProperties": + { + "client.id": "test-pulsar-producer", + "security.protocol": "SASL_PLAINTEXT", + "sasl.mechanism": "GSSAPI", + "sasl.kerberos.service.name": "kafka", + "acks": "all" + } + } + +* YAML -"bootstrapServers": "localhost:6667" -"topic": "test" -"acks": "1" -"batchSize": "16384" -"maxRequestSize": "1048576" -"producerConfigProperties": - "client.id": "test-pulsar-producer" - "security.protocol": "SASL_PLAINTEXT" - "sasl.mechanism": "GSSAPI" - "sasl.kerberos.service.name": "kafka" - "acks": "all" \ No newline at end of file + ```yaml + configs: + bootstrapServers: "localhost:6667" + topic: "test" + acks: "1" + batchSize: "16384" + maxRequestSize: "1048576" + producerConfigProperties: + client.id: "test-pulsar-producer" + security.protocol: "SASL_PLAINTEXT" + sasl.mechanism: "GSSAPI" + sasl.kerberos.service.name: "kafka" + acks: "all" + ``` \ No newline at end of file From 9215c4a7e09cbf667ad4805c3a0c800cc32a86d3 Mon Sep 17 00:00:00 2001 From: Anonymitaet Date: Wed, 18 Sep 2019 11:16:29 +0800 Subject: [PATCH 3/4] update --- site2/docs/io-kafka-sink.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/site2/docs/io-kafka-sink.md b/site2/docs/io-kafka-sink.md index c92601e3a9992..4341f4624ebdd 100644 --- a/site2/docs/io-kafka-sink.md +++ b/site2/docs/io-kafka-sink.md @@ -39,13 +39,13 @@ Before using the Kafka sink connector, you need to create a configuration file t "batchSize": "16384", "maxRequestSize": "1048576", "producerConfigProperties": - { + { "client.id": "test-pulsar-producer", "security.protocol": "SASL_PLAINTEXT", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "acks": "all" - } + } } * YAML From 590e46318d137ae13ff20c4c55080885c43bb58d Mon Sep 17 00:00:00 2001 From: Anonymitaet <50226895+Anonymitaet@users.noreply.github.com> Date: Wed, 18 Sep 2019 14:24:46 +0800 Subject: [PATCH 4/4] Update --- site2/docs/io-kafka-sink.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/site2/docs/io-kafka-sink.md b/site2/docs/io-kafka-sink.md index 4341f4624ebdd..e4744ab59ea69 100644 --- a/site2/docs/io-kafka-sink.md +++ b/site2/docs/io-kafka-sink.md @@ -17,12 +17,12 @@ The configuration of the Kafka sink connector has the following parameters. |------|----------|---------|-------------|-------------| | `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. | |`ack`|String|true|" " (empty string) |The number of acknowledgments that the producer requires the leader to receive before a request completes.
This controls the durability of the sent records. -|`batchsize`|long||16384L|The batch size that a Kafka producer attempts to batch records together before sending them to brokers. -|`maxRequestSize`|long||1048576L|The maximum size of a Kafka request in bytes. +|`batchsize`|long|false|16384L|The batch size that a Kafka producer attempts to batch records together before sending them to brokers. +|`maxRequestSize`|long|false|1048576L|The maximum size of a Kafka request in bytes. |`topic`|String|true|" " (empty string) |The Kafka topic which receives messages from Pulsar. | `keyDeserializationClass` | String|false | org.apache.kafka.common.serialization.StringSerializer | The serializer class for Kafka producers to serialize keys. | `valueDeserializationClass` | String|false | org.apache.kafka.common.serialization.ByteArraySerializer | The serializer class for Kafka producers to serialize values.

The serializer is set by a specific implementation of [`KafkaAbstractSink`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java). -|`producerConfigProperties`|Map||" " (empty string)|The producer configuration properties to be passed to producers.

**Note: other properties specified in the connector configuration file take precedence over this configuration**. +|`producerConfigProperties`|Map|false|" " (empty string)|The producer configuration properties to be passed to producers.

**Note: other properties specified in the connector configuration file take precedence over this configuration**. ### Example @@ -63,4 +63,4 @@ Before using the Kafka sink connector, you need to create a configuration file t sasl.mechanism: "GSSAPI" sasl.kerberos.service.name: "kafka" acks: "all" - ``` \ No newline at end of file + ```