Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package cakesolutions.kafka
import cakesolutions.kafka.KafkaTopicPartition.{Partition, Topic}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.header.Header
import scala.collection.JavaConverters._

/**
* Helper functions for creating Kafka's `ProducerRecord`s.
Expand Down Expand Up @@ -59,6 +61,24 @@ object KafkaProducerRecord {
case None => new ProducerRecord(topic, value)
}

/**
* Create a producer record with an optional key.
*
* @param topic the topic where the record will be appended to
* @param key optional key that will be included in the record
* @param value the value that will be included in the record
* @param headers the record headers
* @tparam Key type of the key
* @tparam Value type of the value
* @return producer record
*/
def apply[Key >: Null, Value](topic: String, key: Option[Key], value: Value,
headers: Seq[Header]): ProducerRecord[Key, Value] =
key match {
case Some(k) => apply(topic, k, value, headers)
case None => apply(topic, value, headers)
}

/**
* Create a producer record with topic, key, and value.
*
Expand All @@ -72,6 +92,21 @@ object KafkaProducerRecord {
def apply[Key, Value](topic: String, key: Key, value: Value): ProducerRecord[Key, Value] =
new ProducerRecord(topic, key, value)

/**
* Create a producer record with topic, key, and value.
*
* @param topic the topic where the record will be appended to
* @param key the key that will be included in the record
* @param value the value that will be included in the record
* @param headers the record headers
* @tparam Key type of the key
* @tparam Value type of the value
* @return producer record
*/
def apply[Key, Value](topic: String, key: Key, value: Value,
headers: Seq[Header]): ProducerRecord[Key, Value] =
new ProducerRecord(topic, null, key, value, headers.asJava)

/**
* Create a producer record without a key.
*
Expand All @@ -84,6 +119,20 @@ object KafkaProducerRecord {
def apply[Key, Value](topic: String, value: Value): ProducerRecord[Key, Value] =
new ProducerRecord(topic, value)

/**
* Create a producer record without a key.
*
* @param topic topic to which record is being sent
* @param value the value that will be included in the record
* @param headers the record headers
* @tparam Key type of the key
* @tparam Value type of the value
* @return producer record
*/
def apply[Key >: Null, Value](topic: String, value: Value,
headers: Seq[Header]): ProducerRecord[Key, Value] =
new ProducerRecord(topic, null, null, null, value, headers.asJava)

/**
* Create a producer record from a topic selection, optional key, value, and optional timestamp.
*
Expand All @@ -100,13 +149,34 @@ object KafkaProducerRecord {
key: Option[Key] = None,
value: Value,
timestamp: Option[Long] = None
): ProducerRecord[Key, Value] = {
): ProducerRecord[Key, Value] =
apply(topicPartitionSelection, key, value, timestamp, Seq.empty)

/**
* Create a producer record from a topic selection, optional key, value, and optional timestamp.
*
* @param topicPartitionSelection the topic (with optional partition) where the record will be appended to
* @param key the key that will be included in the record
* @param value the value that will be included in the record
* @param timestamp the timestamp of the record
* @param headers the record headers
* @tparam Key type of the key
* @tparam Value type of the value
* @return producer record
*/
def apply[Key >: Null, Value](
topicPartitionSelection: Destination,
key: Option[Key],
value: Value,
timestamp: Option[Long],
headers: Seq[Header]
): ProducerRecord[Key, Value] = {
val topic = topicPartitionSelection.topic
val partition = topicPartitionSelection.partition.map(i => i: java.lang.Integer).orNull
val nullableKey = key.orNull
val nullableTimestamp = timestamp.map(i => i: java.lang.Long).orNull

new ProducerRecord[Key, Value](topic, partition, nullableTimestamp, nullableKey, value)
new ProducerRecord[Key, Value](topic, partition, nullableTimestamp, nullableKey, value, headers.asJava)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cakesolutions.kafka

import java.nio.ByteBuffer
import java.util.UUID

import cakesolutions.kafka.KafkaProducerRecord.Destination
import org.apache.kafka.common.header.internals.RecordHeader
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers

import scala.collection.JavaConverters._

class KafkaProducerRecordSpec extends AnyFlatSpecLike with Matchers {
private val headers = Seq(
new RecordHeader("foo", "kafka".getBytes),
new RecordHeader("bar", ByteBuffer.wrap("apache".getBytes))
)
type K = String
type V = UUID
private val topic = "scala-kafka-client"
private val k = "west"
private val v = UUID.randomUUID()
private val dest = Destination(topic = topic)

"apply[Key >: Null, Value](topic: String, key: Option[Key], value: Value)" should "create the specified record" in {
val rec = KafkaProducerRecord(topic, Some(k), v)
rec.topic() shouldBe topic
rec.key() shouldBe k
rec.value() shouldBe v
rec.headers().asScala.toSeq shouldBe Seq.empty
}

"apply[Key >: Null, Value](topic: String, key: Option[Key], value: Value, headers: Seq[Header])" should "create the specified record" in {
val rec = KafkaProducerRecord(topic, Some(k), v, headers)
rec.topic() shouldBe topic
rec.key() shouldBe k
rec.value() shouldBe v
rec.headers().asScala.toSeq shouldBe headers
}

"apply[Key, Value](topic: String, key: Key, value: Value)" should "create the specified record" in {
val rec = KafkaProducerRecord(topic, k, v)
rec.topic() shouldBe topic
rec.key() shouldBe k
rec.value() shouldBe v
rec.headers().asScala.toSeq shouldBe Seq.empty
}

"apply[Key, Value](topic: String, key: Key, value: Value, headers: Seq[Header])" should "create the specified record" in {
val rec = KafkaProducerRecord(topic, k, v, headers)
rec.topic() shouldBe topic
rec.key() shouldBe k
rec.value() shouldBe v
rec.headers().asScala.toSeq shouldBe headers
}

"apply[Key >: Null, Value](topic: String, value: Value)" should "create the specified record" in {
val rec = KafkaProducerRecord[K, V](topic, v)
rec.topic() shouldBe topic
rec.key() shouldBe null
rec.value() shouldBe v
rec.headers().asScala.toSeq shouldBe Seq.empty
}

"apply[Key >: Null, Value](topic: String, value: Value, headers: Seq[Header])" should "create the specified record" in {
val rec = KafkaProducerRecord[K, V](topic, v, headers)
rec.topic() shouldBe topic
rec.key() shouldBe null
rec.value() shouldBe v
rec.headers().asScala.toSeq shouldBe headers
}

"apply[Key >: Null, Value](topicPartitionSelection: Destination, key: Option[Key] = None, value: Value, timestamp: Option[Long] = None)" should "create the specified record" in {
val rec = KafkaProducerRecord(dest, Some(k), v)
rec.topic() shouldBe topic
rec.key() shouldBe k
rec.value() shouldBe v
rec.headers().asScala.toSeq shouldBe Seq.empty
}

"apply[Key >: Null, Value](topicPartitionSelection: Destination, key: Option[Key] = None, value: Value, timestamp: Option[Long] = None, headers: Seq[Header])" should "create the specified record" in {
val rec = KafkaProducerRecord(dest, Some(k), v, timestamp = None, headers)
rec.topic() shouldBe topic
rec.key() shouldBe k
rec.value() shouldBe v
rec.headers().asScala.toSeq shouldBe headers
}
}