Skip to content

Commit

Permalink
Merge pull request #594 from fd4s/use-custom-blocking-context-in-prod…
Browse files Browse the repository at this point in the history
…ucer

Produce on custom blocking context when configured
  • Loading branch information
bplommer authored Apr 14, 2021
2 parents 41cbe67 + 6047755 commit 3d3a229
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 14 deletions.
26 changes: 13 additions & 13 deletions modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ object KafkaProducer {
override def produce[P](
records: ProducerRecords[P, K, V]
): F[F[ProducerResult[P, K, V]]] =
withProducer { (producer, _) =>
withProducer { (producer, blocking) =>
records.records
.traverse(produceRecord(keySerializer, valueSerializer, producer))
.traverse(produceRecord(keySerializer, valueSerializer, producer, blocking))
.map(_.sequence.map(ProducerResult(_, records.passthrough)))
}

Expand Down Expand Up @@ -130,23 +130,23 @@ object KafkaProducer {
private[kafka] def produceRecord[F[_], K, V](
keySerializer: Serializer[F, K],
valueSerializer: Serializer[F, V],
producer: KafkaByteProducer
producer: KafkaByteProducer,
blocking: Blocking[F]
)(
implicit F: Async[F]
): ProducerRecord[K, V] => F[F[(ProducerRecord[K, V], RecordMetadata)]] =
record =>
asJavaRecord(keySerializer, valueSerializer, record).flatMap { javaRecord =>
F.delay(Promise[(ProducerRecord[K, V], RecordMetadata)]()).flatMap { promise =>
F.blocking {
producer.send(
javaRecord, { (metadata, exception) =>
if (exception == null)
promise.success((record, metadata))
else promise.failure(exception)
}
)
}
.as(F.fromFuture(F.delay(promise.future)))
blocking {
producer.send(
javaRecord, { (metadata, exception) =>
if (exception == null)
promise.success((record, metadata))
else promise.failure(exception)
}
)
}.as(F.fromFuture(F.delay(promise.future)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ object TransactionalKafkaProducer {
records.records
.flatMap(_.records)
.traverse(
KafkaProducer.produceRecord(keySerializer, valueSerializer, producer)
KafkaProducer
.produceRecord(keySerializer, valueSerializer, producer, blocking)
)
.map(_.sequence)
.flatTap { _ =>
Expand Down

0 comments on commit 3d3a229

Please sign in to comment.