diff --git a/src/main/kotlin/com/viartemev/thewhiterabbit/consumer/ConfirmConsumer.kt b/src/main/kotlin/com/viartemev/thewhiterabbit/consumer/ConfirmConsumer.kt index bb78842..dc40b3d 100644 --- a/src/main/kotlin/com/viartemev/thewhiterabbit/consumer/ConfirmConsumer.kt +++ b/src/main/kotlin/com/viartemev/thewhiterabbit/consumer/ConfirmConsumer.kt @@ -5,8 +5,11 @@ import com.rabbitmq.client.Delivery import com.viartemev.thewhiterabbit.exception.AcknowledgeException import kotlinx.coroutines.CancellationException import kotlinx.coroutines.channels.ClosedReceiveChannelException +import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.flow.flatMapMerge import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import mu.KotlinLogging @@ -20,7 +23,7 @@ private val logger = KotlinLogging.logger {} * */ class ConfirmConsumer internal constructor( - private val amqpChannel: Channel, amqpQueue: String, private val prefetchSize: Int + private val amqpChannel: Channel, private val amqpQueue: String, private val prefetchSize: Int ) : Closeable { private val deliveries = KChannel(prefetchSize) @@ -38,7 +41,6 @@ class ConfirmConsumer internal constructor( } }, { consumerTag -> logger.info { "Consumer $consumerTag has been cancelled for reasons other than by a call to Channel#basicCancel" } - //FIXME do we need to cancel the channel? deliveries.cancel() }) } @@ -85,8 +87,9 @@ class ConfirmConsumer internal constructor( } override fun close() { - logger.debug { "closing ConfirmConsumer" } + logger.debug { "Shutting down consumer" } amqpChannel.basicCancel(consTag) + //FIXME Additional cancellation? deliveries.cancel() } } diff --git a/src/main/kotlin/com/viartemev/thewhiterabbit/consumer/flow/ConsumerFlow.kt b/src/main/kotlin/com/viartemev/thewhiterabbit/consumer/flow/ConsumerFlow.kt new file mode 100644 index 0000000..0667897 --- /dev/null +++ b/src/main/kotlin/com/viartemev/thewhiterabbit/consumer/flow/ConsumerFlow.kt @@ -0,0 +1,46 @@ +package com.viartemev.thewhiterabbit.consumer.flow + +import com.rabbitmq.client.Channel +import com.rabbitmq.client.Delivery +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.channels.trySendBlocking +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.callbackFlow +import mu.KotlinLogging + +private val logger = KotlinLogging.logger {} + +class ConsumerFlow( + private val amqpChannel: Channel, private val amqpQueue: String, private val prefetchSize: Int +) { + + //TODO exception handling + suspend fun consumerAutoAckFlow(): Flow = callbackFlow { + amqpChannel.basicQos(prefetchSize, false) + val tag = amqpChannel.basicConsume(amqpQueue, true, { consumerTag, message -> + trySendBlocking(message) + }, { consumerTag -> + channel.close() + }) + awaitClose { + amqpChannel.basicCancel(tag) + amqpChannel.close() + } + } + + //TODO exception handling + suspend fun consumerConfirmAckFlow(): Flow = callbackFlow { + amqpChannel.basicQos(prefetchSize, false) + val tag = amqpChannel.basicConsume(amqpQueue, false, { consumerTag, message -> + trySendBlocking(message) + amqpChannel.basicAck(message.envelope.deliveryTag, false) + }, { consumerTag -> + channel.close() + }) + awaitClose { + amqpChannel.basicCancel(tag) + amqpChannel.close() + } + } + +} diff --git a/src/test/kotlin/com/viartemev/thewhiterabbit/consumer/ConfirmConsumerTest.kt b/src/test/kotlin/com/viartemev/thewhiterabbit/consumer/ConfirmConsumerTest.kt index c3ed696..3de5a64 100644 --- a/src/test/kotlin/com/viartemev/thewhiterabbit/consumer/ConfirmConsumerTest.kt +++ b/src/test/kotlin/com/viartemev/thewhiterabbit/consumer/ConfirmConsumerTest.kt @@ -65,21 +65,24 @@ class ConfirmConsumerTest : AbstractTestContainersTest() { val sender = launch { publish { while (isActive) { - delay(100) + delay(1000) publishWithConfirm(message) } } } - consume(QUEUE_NAME, 2) { - consumeMessagesWithConfirm { - println("Consuming message: ${it.body}") - delay(5000) - counter.getAndAdd(String(it.body).toInt()) + val consumer = launch { + consume(QUEUE_NAME, 2) { + consumeMessagesWithConfirm { + println("Consuming message: ${it.body}") + delay(1000) + counter.getAndAdd(String(it.body).toInt()) + } } } - delay(50000) + delay(5000) println("Shouting down...") sender.cancel() + consumer.cancel() } } } diff --git a/src/test/kotlin/com/viartemev/thewhiterabbit/consumer/flow/ConsumerFlowTest.kt b/src/test/kotlin/com/viartemev/thewhiterabbit/consumer/flow/ConsumerFlowTest.kt new file mode 100644 index 0000000..bd12855 --- /dev/null +++ b/src/test/kotlin/com/viartemev/thewhiterabbit/consumer/flow/ConsumerFlowTest.kt @@ -0,0 +1,51 @@ +package com.viartemev.thewhiterabbit.consumer.flow + +import com.viartemev.thewhiterabbit.AbstractTestContainersTest +import com.viartemev.thewhiterabbit.channel.confirmChannel +import com.viartemev.thewhiterabbit.channel.publish +import com.viartemev.thewhiterabbit.queue.QueueSpecification +import com.viartemev.thewhiterabbit.queue.declareQueue +import com.viartemev.thewhiterabbit.utils.createMessage +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Test + +class ConsumerFlowTest : AbstractTestContainersTest() { + private val QUEUE_NAME = "test_queue" + + @Test + fun testAutoAckFlow(): Unit = runBlocking { + factory.newConnection().use { connection -> + connection.confirmChannel { + declareQueue(QueueSpecification(QUEUE_NAME)) + publish { + (1..10).map { createMessage(queue = QUEUE_NAME, body = "1") } + .map { m -> async { publishWithConfirm(m) } }.awaitAll() + } + ConsumerFlow(this, QUEUE_NAME, 2) + .consumerAutoAckFlow() + .take(10) + .collect { delivery -> println(String(delivery.body)) } + } + } + } + + @Test + fun testConfirmAckFlow(): Unit = runBlocking { + factory.newConnection().use { connection -> + connection.confirmChannel { + declareQueue(QueueSpecification(QUEUE_NAME)) + publish { + (1..10).map { createMessage(queue = QUEUE_NAME, body = "1") } + .map { m -> async { publishWithConfirm(m) } }.awaitAll() + } + ConsumerFlow(this, QUEUE_NAME, 2) + .consumerConfirmAckFlow() + .take(10) + .collect { delivery -> println(String(delivery.body)) } + } + } + } +} diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml new file mode 100644 index 0000000..e136222 --- /dev/null +++ b/src/test/resources/logback.xml @@ -0,0 +1,15 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n + + + + + + + + +