forked from Azure/azure-sdk-for-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request Azure#7 from yunhaoling/sbt2-mgmt-test
[ServiceBus] Add ListQueueRuntimeInfo, GetQueueRuntimeInfo Test And Generalize "list" related tests
- Loading branch information
Showing
4 changed files
with
439 additions
and
138 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
124 changes: 124 additions & 0 deletions
124
sdk/servicebus/azure-servicebus/tests/async_tests/mgmt_tests/utilities.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
#------------------------------------------------------------------------- | ||
# Copyright (c) Microsoft Corporation. All rights reserved. | ||
# Licensed under the MIT License. See License.txt in the project root for | ||
# license information. | ||
#------------------------------------------------------------------------- | ||
import pytest | ||
|
||
from azure.core.exceptions import HttpResponseError | ||
|
||
|
||
class AsyncMgmtListTestHelperInterface(object): | ||
def __init__(self, mgmt_client): | ||
self.sb_mgmt_client = mgmt_client | ||
|
||
async def list_resource_method(self, skip=0, max_count=100): | ||
pass | ||
|
||
async def create_resource_method(self, name): | ||
pass | ||
|
||
async def delete_resource_by_name_method(self, name): | ||
pass | ||
|
||
async def get_resource_name(self, resource): | ||
pass | ||
|
||
|
||
class AsyncMgmtQueueListTestHelper(AsyncMgmtListTestHelperInterface): | ||
async def list_resource_method(self, skip=0, max_count=100): | ||
return await self.sb_mgmt_client.list_queues(skip=skip, max_count=max_count) | ||
|
||
async def create_resource_method(self, name): | ||
await self.sb_mgmt_client.create_queue(name) | ||
|
||
async def delete_resource_by_name_method(self, name): | ||
await self.sb_mgmt_client.delete_queue(name) | ||
|
||
async def get_resource_name(self, queue): | ||
return queue.queue_name | ||
|
||
|
||
class AsyncMgmtQueueListRuntimeInfoTestHelper(AsyncMgmtListTestHelperInterface): | ||
async def list_resource_method(self, skip=0, max_count=100): | ||
return await self.sb_mgmt_client.list_queues_runtime_info(skip=skip, max_count=max_count) | ||
|
||
async def create_resource_method(self, name): | ||
await self.sb_mgmt_client.create_queue(name) | ||
|
||
async def delete_resource_by_name_method(self, name): | ||
await self.sb_mgmt_client.delete_queue(name) | ||
|
||
async def get_resource_name(self, queue_info): | ||
return queue_info.queue_name | ||
|
||
|
||
async def run_test_async_mgmt_list_with_parameters(test_helper): | ||
result = await test_helper.list_resource_method() | ||
assert len(result) == 0 | ||
|
||
resources_names = [] | ||
for i in range(20): | ||
await test_helper.create_resource_method("test_resource{}".format(i)) | ||
resources_names.append("test_resource{}".format(i)) | ||
|
||
result = await test_helper.list_resource_method() | ||
assert len(result) == 20 | ||
|
||
sorted_resources_names = sorted(resources_names) | ||
|
||
result = await test_helper.list_resource_method(skip=5, max_count=10) | ||
expected_result = sorted_resources_names[5:15] | ||
assert len(result) == 10 | ||
for item in result: | ||
expected_result.remove(await test_helper.get_resource_name(item)) | ||
assert len(expected_result) == 0 | ||
|
||
result = await test_helper.list_resource_method(max_count=0) | ||
assert len(result) == 0 | ||
|
||
queues = await test_helper.list_resource_method(skip=0, max_count=0) | ||
assert len(queues) == 0 | ||
|
||
cnt = 20 | ||
for name in resources_names: | ||
await test_helper.delete_resource_by_name_method(name) | ||
cnt -= 1 | ||
assert len(await test_helper.list_resource_method()) == cnt | ||
|
||
assert cnt == 0 | ||
|
||
result = await test_helper.list_resource_method() | ||
assert len(result) == 0 | ||
|
||
|
||
async def run_test_async_mgmt_list_with_negative_parameters(test_helper): | ||
result = await test_helper.list_resource_method() | ||
assert len(result) == 0 | ||
|
||
with pytest.raises(HttpResponseError): | ||
await test_helper.list_resource_method(skip=-1) | ||
|
||
with pytest.raises(HttpResponseError): | ||
await test_helper.list_resource_method(max_count=-1) | ||
|
||
with pytest.raises(HttpResponseError): | ||
await test_helper.list_resource_method(skip=-1, max_count=-1) | ||
|
||
await test_helper.create_resource_method("test_resource") | ||
result = await test_helper.list_resource_method() | ||
assert len(result) == 1 and (await test_helper.get_resource_name(result[0])) == "test_resource" | ||
|
||
with pytest.raises(HttpResponseError): | ||
await test_helper.list_resource_method(skip=-1) | ||
|
||
with pytest.raises(HttpResponseError): | ||
await test_helper.list_resource_method(max_count=-1) | ||
|
||
with pytest.raises(HttpResponseError): | ||
await test_helper.list_resource_method(skip=-1, max_count=-1) | ||
|
||
await test_helper.delete_resource_by_name_method("test_resource") | ||
result = await test_helper.list_resource_method() | ||
assert len(result) == 0 | ||
|
Oops, something went wrong.