diff --git a/integration-tests/conf/queue-storage.conf b/integration-tests/conf/queue-storage.conf index 1e9e5e92..0f8ba615 100644 --- a/integration-tests/conf/queue-storage.conf +++ b/integration-tests/conf/queue-storage.conf @@ -24,6 +24,16 @@ queues { } } + queueName2 { + defaultVisibilityTimeout = 1 second + delay = 0 seconds + receiveMessageWait = 0 seconds + deadLettersQueue { + name = "myDLQ" + maxReceiveCount = 3 + } + } + myDLQ { } fifoQueue { diff --git a/integration-tests/python/test_integration.py b/integration-tests/python/test_integration.py index eef3ce3d..218392e7 100644 --- a/integration-tests/python/test_integration.py +++ b/integration-tests/python/test_integration.py @@ -47,10 +47,13 @@ def stop(self): def get_elasticmq_port(self): return self.get_exposed_port(9321) - def create_sqs_client(self): + def create_sqs_resource(self): port = self.get_elasticmq_port() - return boto3.resource('sqs', region_name='radom', endpoint_url=f'http://localhost:{port}', aws_access_key_id='test', aws_secret_access_key='test') + return boto3.resource('sqs', region_name='random', endpoint_url=f'http://localhost:{port}', aws_access_key_id='test', aws_secret_access_key='test') + def create_sqs_client(self): + port = self.get_elasticmq_port() + return boto3.client('sqs', region_name='random', endpoint_url=f'http://localhost:{port}', aws_access_key_id='test', aws_secret_access_key='test') @pytest.fixture(scope="function") def message_storage_container(): @@ -67,7 +70,7 @@ def queue_storage_container(): container.stop() def test_messages_storage(message_storage_container): - sqs = message_storage_container.create_sqs_client() + sqs = message_storage_container.create_sqs_resource() queue = sqs.create_queue(QueueName='simpleQueue', Attributes={'VisibilityTimeout': '1'}) assert queue is not None queue.send_message(MessageBody='Hello 1') @@ -85,7 +88,7 @@ def test_messages_storage(message_storage_container): message_storage_container.start() - sqs = message_storage_container.create_sqs_client() + sqs = message_storage_container.create_sqs_resource() queue = sqs.get_queue_by_name(QueueName='simpleQueue') assert queue is not None queue.send_message(MessageBody='Hello 4') @@ -98,7 +101,7 @@ def test_messages_storage(message_storage_container): assert not os.path.exists(os.path.join(os.getcwd(), ".data", "queues.conf")) def test_queue_storage(queue_storage_container): - sqs = queue_storage_container.create_sqs_client() + sqs = queue_storage_container.create_sqs_resource() queue = sqs.create_queue(QueueName='simpleQueue', Attributes={'VisibilityTimeout': '1'}) assert queue is not None queue.send_message(MessageBody='Hello 1') @@ -117,7 +120,7 @@ def test_queue_storage(queue_storage_container): queue_storage_container.start() - sqs = queue_storage_container.create_sqs_client() + sqs = queue_storage_container.create_sqs_resource() queue = sqs.get_queue_by_name(QueueName='simpleQueue') assert queue is not None queue.send_message(MessageBody='Hello 4') @@ -130,8 +133,40 @@ def test_queue_storage(queue_storage_container): assert os.path.exists(os.path.join(os.getcwd(), ".data", "queues.conf")) def test_list_dead_letter_source_queues(queue_storage_container): - sqs = queue_storage_container.create_sqs_client() + sqs = queue_storage_container.create_sqs_resource() queue = sqs.get_queue_by_name(QueueName='myDLQ') queues = list(queue.dead_letter_source_queues.all()) print(queues) - assert len(queues) == 2 + assert len(queues) == 3 + +def test_message_move_task(queue_storage_container): + sqs = queue_storage_container.create_sqs_resource() + queue = sqs.get_queue_by_name(QueueName='queueName2') + dlq = sqs.get_queue_by_name(QueueName='myDLQ') + + # populate the queue with 3 messages + queue.send_message(MessageBody='Hello 1') + queue.send_message(MessageBody='Hello 2') + queue.send_message(MessageBody='Hello 3') + + # receive from queue maxReceiveCount + 1 to make it move to DLQ + messages1 = queue.receive_messages(MaxNumberOfMessages=10) + assert len(messages1) == 3 + time.sleep(1.5) + messages2 = queue.receive_messages(MaxNumberOfMessages=10) + assert len(messages2) == 3 + time.sleep(1.5) + messages3 = queue.receive_messages(MaxNumberOfMessages=10) + assert len(messages3) == 3 + time.sleep(1.5) + messages4 = queue.receive_messages(MaxNumberOfMessages=10) + assert len(messages4) == 0 + + # start the message move task + client = queue_storage_container.create_sqs_client() + client.start_message_move_task(SourceArn=dlq.attributes['QueueArn']) + time.sleep(1) + + # receive again + messages5 = queue.receive_messages(MaxNumberOfMessages=10) + assert len(messages5) == 3 diff --git a/native-server/src/main/resources/META-INF/native-image/reflect-config.json b/native-server/src/main/resources/META-INF/native-image/reflect-config.json index d37c0cbc..7e3f06e2 100644 --- a/native-server/src/main/resources/META-INF/native-image/reflect-config.json +++ b/native-server/src/main/resources/META-INF/native-image/reflect-config.json @@ -919,6 +919,16 @@ "allDeclaredFields":true, "queryAllPublicMethods":true }, +{ + "name":"org.elasticmq.rest.sqs.StartMessageMoveTaskActionRequest", + "allDeclaredFields":true, + "queryAllPublicMethods":true +}, +{ + "name":"org.elasticmq.rest.sqs.StartMessageMoveTaskResponse", + "allDeclaredFields":true, + "queryAllPublicMethods":true +}, { "name":"org.elasticmq.rest.sqs.TagQueueActionRequest", "allDeclaredFields":true,