Skip to content

Commit

Permalink
Update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Artemyev Vyacheslav authored and Artemyev Vyacheslav committed Nov 7, 2023
1 parent c818bd2 commit 19f8180
Showing 1 changed file with 22 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,29 @@ class ConsumerFlow(
private val amqpChannel: Channel, private val amqpQueue: String
) {

//TODO exception handling
suspend fun consumerAutoAckFlow(prefetchSize: Int): Flow<Delivery> = callbackFlow {
amqpChannel.basicQos(prefetchSize, false)
val tag = amqpChannel.basicConsume(amqpQueue, true, { consumerTag, message ->
logger.debug { "Trying to send a message from the flow consumer to the channel" }
/**
* The consumerAutoAckFlow function establishes a cold Flow for consuming messages from an AMQP queue with automatic acknowledgment enabled.
* This flow is designed to emit messages to the downstream consumers as they arrive from the queue.
*/
suspend fun consumerAutoAckFlow(prefetchSize: Int = 0): Flow<Delivery> = callbackFlow {
if (prefetchSize != 0) {
amqpChannel.basicQos(prefetchSize, false)
}
val deliverCallback: (consumerTag: String, message: Delivery) -> Unit = { _, message ->
logger.debug { "Trying to send a message from the flow consumer to the flow" }
trySendBlocking(message)
logger.debug { "The message was successfully sent to the channel" }
}, { consumerTag ->
logger.info { "Consumer $consumerTag has been cancelled for reasons other than by a call to Channel#basicCancel" }
channel.close()
})
logger.debug { "The message was successfully sent to the flow" }
}
val cancelCallback: (consumerTag: String) -> Unit = { _ -> channel.close() }
val tag = amqpChannel.basicConsume(amqpQueue, true, deliverCallback, cancelCallback)
awaitClose {
amqpChannel.basicCancel(tag)
try {
logger.debug { "Cancelling consumer#$tag" }
amqpChannel.basicCancel(tag)
} catch (e: Exception) {
logger.error(e) { "Can't cancel consumer#$tag" }
channel.close()
}
}
}

Expand All @@ -52,7 +62,7 @@ class ConsumerFlow(
close(e)
}
}
val cancelCallback: (consumerTag: String) -> Unit = { consumerTag -> channel.close() }
val cancelCallback: (consumerTag: String) -> Unit = { _ -> channel.close() }
val tag = amqpChannel.basicConsume(amqpQueue, false, deliverCallback, cancelCallback)
awaitClose {
try {
Expand Down

0 comments on commit 19f8180

Please sign in to comment.