Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add integration test for start_message_move_task and fix reflect config #1046

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions integration-tests/conf/queue-storage.conf
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ queues {
}
}

queueName2 {
defaultVisibilityTimeout = 1 second
delay = 0 seconds
receiveMessageWait = 0 seconds
deadLettersQueue {
name = "myDLQ"
maxReceiveCount = 3
}
}

myDLQ { }

fifoQueue {
Expand Down
51 changes: 43 additions & 8 deletions integration-tests/python/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ def stop(self):
def get_elasticmq_port(self):
return self.get_exposed_port(9321)

def create_sqs_client(self):
def create_sqs_resource(self):
port = self.get_elasticmq_port()
return boto3.resource('sqs', region_name='radom', endpoint_url=f'http://localhost:{port}', aws_access_key_id='test', aws_secret_access_key='test')
return boto3.resource('sqs', region_name='random', endpoint_url=f'http://localhost:{port}', aws_access_key_id='test', aws_secret_access_key='test')

def create_sqs_client(self):
port = self.get_elasticmq_port()
return boto3.client('sqs', region_name='random', endpoint_url=f'http://localhost:{port}', aws_access_key_id='test', aws_secret_access_key='test')

@pytest.fixture(scope="function")
def message_storage_container():
Expand All @@ -67,7 +70,7 @@ def queue_storage_container():
container.stop()

def test_messages_storage(message_storage_container):
sqs = message_storage_container.create_sqs_client()
sqs = message_storage_container.create_sqs_resource()
queue = sqs.create_queue(QueueName='simpleQueue', Attributes={'VisibilityTimeout': '1'})
assert queue is not None
queue.send_message(MessageBody='Hello 1')
Expand All @@ -85,7 +88,7 @@ def test_messages_storage(message_storage_container):

message_storage_container.start()

sqs = message_storage_container.create_sqs_client()
sqs = message_storage_container.create_sqs_resource()
queue = sqs.get_queue_by_name(QueueName='simpleQueue')
assert queue is not None
queue.send_message(MessageBody='Hello 4')
Expand All @@ -98,7 +101,7 @@ def test_messages_storage(message_storage_container):
assert not os.path.exists(os.path.join(os.getcwd(), ".data", "queues.conf"))

def test_queue_storage(queue_storage_container):
sqs = queue_storage_container.create_sqs_client()
sqs = queue_storage_container.create_sqs_resource()
queue = sqs.create_queue(QueueName='simpleQueue', Attributes={'VisibilityTimeout': '1'})
assert queue is not None
queue.send_message(MessageBody='Hello 1')
Expand All @@ -117,7 +120,7 @@ def test_queue_storage(queue_storage_container):

queue_storage_container.start()

sqs = queue_storage_container.create_sqs_client()
sqs = queue_storage_container.create_sqs_resource()
queue = sqs.get_queue_by_name(QueueName='simpleQueue')
assert queue is not None
queue.send_message(MessageBody='Hello 4')
Expand All @@ -130,8 +133,40 @@ def test_queue_storage(queue_storage_container):
assert os.path.exists(os.path.join(os.getcwd(), ".data", "queues.conf"))

def test_list_dead_letter_source_queues(queue_storage_container):
sqs = queue_storage_container.create_sqs_client()
sqs = queue_storage_container.create_sqs_resource()
queue = sqs.get_queue_by_name(QueueName='myDLQ')
queues = list(queue.dead_letter_source_queues.all())
print(queues)
assert len(queues) == 2
assert len(queues) == 3

def test_message_move_task(queue_storage_container):
sqs = queue_storage_container.create_sqs_resource()
queue = sqs.get_queue_by_name(QueueName='queueName2')
dlq = sqs.get_queue_by_name(QueueName='myDLQ')

# populate the queue with 3 messages
queue.send_message(MessageBody='Hello 1')
queue.send_message(MessageBody='Hello 2')
queue.send_message(MessageBody='Hello 3')

# receive from queue maxReceiveCount + 1 to make it move to DLQ
messages1 = queue.receive_messages(MaxNumberOfMessages=10)
assert len(messages1) == 3
time.sleep(1.5)
messages2 = queue.receive_messages(MaxNumberOfMessages=10)
assert len(messages2) == 3
time.sleep(1.5)
messages3 = queue.receive_messages(MaxNumberOfMessages=10)
assert len(messages3) == 3
time.sleep(1.5)
messages4 = queue.receive_messages(MaxNumberOfMessages=10)
assert len(messages4) == 0

# start the message move task
client = queue_storage_container.create_sqs_client()
client.start_message_move_task(SourceArn=dlq.attributes['QueueArn'])
time.sleep(1)

# receive again
messages5 = queue.receive_messages(MaxNumberOfMessages=10)
assert len(messages5) == 3
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,16 @@
"allDeclaredFields":true,
"queryAllPublicMethods":true
},
{
"name":"org.elasticmq.rest.sqs.StartMessageMoveTaskActionRequest",
"allDeclaredFields":true,
"queryAllPublicMethods":true
},
{
"name":"org.elasticmq.rest.sqs.StartMessageMoveTaskResponse",
"allDeclaredFields":true,
"queryAllPublicMethods":true
},
{
"name":"org.elasticmq.rest.sqs.TagQueueActionRequest",
"allDeclaredFields":true,
Expand Down
Loading