diff --git a/ydb/docs/en/core/reference/kafka-api/constraints.md b/ydb/docs/en/core/reference/kafka-api/constraints.md new file mode 100644 index 000000000000..e562b71cbe77 --- /dev/null +++ b/ydb/docs/en/core/reference/kafka-api/constraints.md @@ -0,0 +1,9 @@ +The Kafka protocol version 3.4.0 is supported with limitations: + +1. Only authenticated connections allowed. +2. Only SASL/PLAIN authentication supported. +3. Message compression not supported. +4. Only Manual Partition Assignment is supported in reading, [assign method](https://kafka.apache.org/35/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#assign(java.util.Collection)), without using consumer group partitions. +5. Transactions not supported. +6. DDL transactions not supported. To perform DDL operations, use the [YDB SDK](../ydb-sdk/index.md) or [YDB CLI](../ydb-cli/index.md). +7. Data schema validation not supported. \ No newline at end of file diff --git a/ydb/docs/en/core/reference/kafka-api/examples.md b/ydb/docs/en/core/reference/kafka-api/examples.md new file mode 100644 index 000000000000..22ef42e5827d --- /dev/null +++ b/ydb/docs/en/core/reference/kafka-api/examples.md @@ -0,0 +1,127 @@ +# Kafka API usage examples + +This article provides examples of usage the Kafka API to work with [topics](../../concepts/topic.md). + +Before executing the examples, [create a topic](../ydb-cli/topic-create.md) and [add a consumer](../ydb-cli/topic-consumer-add.md). + +## Examples of working with topics + +The examples use: + + * `ydb:9093` — host name. + * `/Root/Database` — database name. + * `/Root/Database/Topic` — topic name. + * `user@/Root/Database` — username. Includes the username and database name. + * `*****` — user password. + + +## Writing data to a topic + +### Writing via Kafka Java SDK + +This example includes a code snippet for writing data to a topic via Kafka Java SDK. + + ```java + String HOST = "ydb:9093"; + String TOPIC = "/Root/Database/Topic"; + String USER = "user@/Root/Database"; + String PASS = "*****"; + + Properties props = new Properties(); + props.put("bootstrap.servers", HOST); + props.put("acks", "all"); + + props.put("key.serializer", StringSerializer.class.getName()); + props.put("key.deserializer", StringDeserializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); + + props.put("security.protocol", "SASL_SSL"); + props.put("sasl.mechanism", "PLAIN"); + props.put("sasl.jaas.config", PlainLoginModule.class.getName() + " required username=\"" + USER + "\" password=\"" + PASS + "\";"); + + props.put("compression.type", "none"); + + Producer producer = new KafkaProducer<>(props); + producer.send(new ProducerRecord(TOPIC, "msg-key", "msg-body")); + producer.flush(); + producer.close(); + ``` + +### Writing via Logstash + +To configure [Logstash](https://github.com/elastic/logstash), use the following parameters: + + ``` + output { + kafka { + codec => json + topic_id => "/Root/Database/Topic" + bootstrap_servers => "ydb:9093" + compression_type => none + security_protocol => SASL_SSL + sasl_mechanism => PLAIN + sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='user@/Root/Database' password='*****';" + } + } + ``` + +### Writing via Fluent Bit + +To configure [Fluent Bit](https://github.com/fluent/fluent-bit), use the following parameters: + + ``` + [OUTPUT] + name kafka + match * + Brokers ydb:9093 + Topics /Root/Database/Topic + rdkafka.client.id Fluent-bit + rdkafka.request.required.acks 1 + rdkafka.log_level 7 + rdkafka.security.protocol SASL_SSL + rdkafka.sasl.mechanism PLAIN + rdkafka.sasl.username user@/Root/Database + rdkafka.sasl.password ***** + ``` + +## Reading data from a topic + +### Reading data from a topic via Kafka Java SDK + +This example includes a code snippet for reading data from a topic via Kafka Java SDK. + +```java + String HOST = "ydb:9093"; + String TOPIC = "/Root/Database/Topic"; + String USER = "user@/Root/Database"; + String PASS = "*****"; + + Properties props = new Properties(); + props.put("bootstrap.servers", HOST); + props.put("auto.offset.reset", "earliest"); // to read from start + props.put("check.crcs", false); + + props.put("key.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); + + props.put("security.protocol", "SASL_SSL"); + props.put("sasl.mechanism", "PLAIN"); + props.put("sasl.jaas.config", PlainLoginModule.class.getName() + " required username=\"" + USER + "\" password=\"" + PASS + "\";"); + + Consumer consumer = new KafkaConsumer<>(props); + + List partitionInfos = consumer.partitionsFor(TOPIC); + List topicPartitions = new ArrayList<>(); + + for (PartitionInfo partitionInfo : partitionInfos) { + topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + } + consumer.assign(topicPartitions); + + while (true) { + ConsumerRecords records = consumer.poll(1000); + for (ConsumerRecord record : records) { + System.out.println(record.key() + ":" + record.value()); + } + } \ No newline at end of file diff --git a/ydb/docs/en/core/reference/kafka-api/index.md b/ydb/docs/en/core/reference/kafka-api/index.md new file mode 100644 index 000000000000..4c014bf58a35 --- /dev/null +++ b/ydb/docs/en/core/reference/kafka-api/index.md @@ -0,0 +1,7 @@ +# Kafka API + +YDB supports working with topics using [the Kafka version 3.4.0](https://kafka.apache.org/34/documentation.html). + +[Kafka API usage limitations](constraints.md) + +[Kafka API usage examples](examples.md) \ No newline at end of file diff --git a/ydb/docs/en/core/reference/kafka-api/toc_i.yaml b/ydb/docs/en/core/reference/kafka-api/toc_i.yaml new file mode 100644 index 000000000000..9829816a871c --- /dev/null +++ b/ydb/docs/en/core/reference/kafka-api/toc_i.yaml @@ -0,0 +1,5 @@ +items: + - name: Limitations + href: constraints.md + - name: Usage examples + href: examples.md \ No newline at end of file diff --git a/ydb/docs/en/core/reference/kafka-api/toc_p.yaml b/ydb/docs/en/core/reference/kafka-api/toc_p.yaml new file mode 100644 index 000000000000..bb200f7ddbab --- /dev/null +++ b/ydb/docs/en/core/reference/kafka-api/toc_p.yaml @@ -0,0 +1,4 @@ +items: +- name: Overview + href: index.md +- include: { mode: link, path: toc_i.yaml } \ No newline at end of file