diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java index 0ceac3a8a5e..7055113176a 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java @@ -56,23 +56,36 @@ public class KafkaBolt extends BaseRichBolt { private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class); - public static final String TOPIC = "topic"; + private static final String TOPIC = "topic"; + + private static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + private static final String DEFAULT_VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + private static final String DEFAULT_ACKS = "1"; private KafkaProducer producer; private OutputCollector collector; - private TupleToKafkaMapper mapper; + private TupleToKafkaMapper mapper = new FieldNameBasedTupleToKafkaMapper<>(); private KafkaTopicSelector topicSelector; private Properties boltSpecfiedProperties = new Properties(); /** * With default setting for fireAndForget and async, the callback is called when the sending succeeds. * By setting fireAndForget true, the send will not wait at all for kafka to ack. * "acks" setting in 0.8.2 Producer API config doesn't matter if fireAndForget is set. - * By setting async false, synchronous sending is used. + * By setting async false, synchronous sending is used. */ private boolean fireAndForget = false; private boolean async = true; - public KafkaBolt() {} + public KafkaBolt() { + boltSpecfiedProperties.setProperty("acks", DEFAULT_ACKS); + boltSpecfiedProperties.setProperty("key.serializer", DEFAULT_KEY_SERIALIZER); + boltSpecfiedProperties.setProperty("value.serializer", DEFAULT_VALUE_SERIALIZER); + } + + public KafkaBolt(String hosts) { + this(); + boltSpecfiedProperties.setProperty("bootstrap.servers", hosts); + } public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper mapper) { this.mapper = mapper; @@ -108,8 +121,8 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll @Override public void execute(final Tuple input) { if (TupleUtils.isTick(input)) { - collector.ack(input); - return; // Do not try to send ticks to Kafka + collector.ack(input); + return; // Do not try to send ticks to Kafka } K key = null; V message = null;