Skip to content

Kafka Java Publisher

mrb-github edited this page Apr 21, 2020 · 2 revisions

Main goal is to start a sender that will send a series of messages.

  1. It was easy enough to get started with Spring boot.
  2. Then modified the publisher -- See the modified code below
  3. Since the my dev environment is on a Mac, I had to remotely connect for testing directly from my IDE
  4. So I opened up the port 9902 The security group would look like this: Custom TCP TCP 9092 0.0.0.0/0 Kafka Custom TCP TCP 9092 ::/0 Kafka
  5. Still I couldn't connect from the MAC to my EC2 instance of Kafka. See resolution below (https://www.confluent.io/blog/kafka-listeners-explained/)
  6. After resolution, we are able to send messages! Tested with local console receiver on the Ec2 instance.

To fix the access issue: I followed the configuration explained at: Confluent Bloghttps://www.confluent.io/blog/kafka-listeners-explained/

Config.properties of Kafka broker hosted on Ec2 to be accessed from my Mac: listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092 listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT advertised.listeners=INTERNAL://ip-XXX-XX-XX-XXX.us-east-2.compute.internal:1909 (From aws console, private DNS) 2,EXTERNAL://ec2-X-XX-XXX-XXX.us-east-2.compute.amazonaws.com:9092 (public Dns) inter.broker.listener.name=INTERNAL

My publisher modification: ` package com.example.demo;

import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer;

@SpringBootApplication public class DemoApplication {

public static void main(String[] args) {

	//SpringApplication.run(DemoApplication.class, args);
	System.out.println("Starting publisher");

	Properties props = new Properties();
	props.put("bootstrap.servers", "ec2-3-15-233-230.us-east-2.compute.amazonaws.com:9092");
	props.put("acks", "all");
	props.put("retries", 0);
	props.put("batch.size", 16384);
	props.put("linger.ms", 1);
	props.put("buffer.memory", 33554432);
	props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
	props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

	Producer<String, String> producer = new KafkaProducer<>(props);
	for (int i = 0; i < 5; i++) {
		System.out.println("Sending message:" + i);
		producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i)));
	}

	producer.close();
}

}

`

Clone this wiki locally