Skip to content

Commit

Permalink
Final Integration Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vaibhavatlan committed Nov 22, 2024
1 parent fb5fad1 commit f2bd60f
Showing 1 changed file with 35 additions and 30 deletions.
65 changes: 35 additions & 30 deletions tests/integration/test_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from pyatlan.model.workflow import WorkflowResponse, WorkflowSchedule
from tests.integration.client import TestId, delete_asset
from tests.integration.connection_test import create_connection
from pyatlan.client.atlan import AtlanClient

MODULE_NAME = TestId.make_unique("WorfklowClient")
WORKFLOW_TEMPLATE_REF = "workflowTemplateRef"
Expand Down Expand Up @@ -271,75 +270,81 @@ def test_workflow_add_remove_schedule(client: AtlanClient, workflow: WorkflowRes
response = client.workflow.remove_schedule(workflow)
_assert_remove_schedule(response, workflow)


def test_get_all_credentials(client: AtlanClient):

credentials = client.credentials.get_all()
assert credentials, "Expected credentials but found None"
assert credentials.records is not None, "Expected records but found None"
assert len(credentials.records) > 0, "Expected at least one record but found none"
assert (
len(credentials.records or []) > 0
), "Expected at least one record but found none"


def test_get_all_credentials_with_limit_and_offset(client: AtlanClient):
limit = 5
offset = 2
credentials = client.credentials.get_all(limit=limit, offset=offset)
assert credentials.records is not None, "Expected records but found None"
assert len(credentials.records) <= limit, (
f"Expected at most {limit} records, got {len(credentials.records)}"
)
assert (
len(credentials.records or []) <= limit
), f"Expected at most {limit} records, got {len(credentials.records or [])}"


def test_get_all_credentials_with_filter_limit_offset(client: AtlanClient):
filter_criteria = {"connectorType": "rest"}
limit = 1
offset = 1
credentials = client.credentials.get_all(filter=filter_criteria, limit=limit, offset=offset)
assert len(credentials.records) <= limit, "Exceeded limit in results"
for cred in credentials.records:
assert cred.connector_type == "rest"
credentials = client.credentials.get_all(
filter=filter_criteria, limit=limit, offset=offset
)
assert len(credentials.records or []) <= limit, "Exceeded limit in results"
for cred in credentials.records or []:
assert (
cred.connector_type == "rest"
), f"Expected 'rest', got {cred.connector_type}"


def test_get_all_credentials_with_multiple_filters(client: AtlanClient):
filter_criteria = {
"connectorType": "jdbc",
"isActive": True
}
filter_criteria = {"connectorType": "jdbc", "isActive": True}

credentials = client.credentials.get_all(filter=filter_criteria)
assert credentials, "Expected credentials but found None"
assert credentials.records is not None, "Expected records but found None"
assert len(credentials.records) > 0, "Expected at least one record but found none"
assert (
len(credentials.records or []) > 0
), "Expected at least one record but found none"

for record in credentials.records:
assert record.connector_type == "jdbc", f"Expected 'jdbc', got {record.connectorType}"
for record in credentials.records or []:
assert (
record.connector_type == "jdbc"
), f"Expected 'jdbc', got {record.connector_type}"
assert record.is_active, f"Expected active record, but got inactive: {record}"


def test_get_all_credentials_with_invalid_filter_key(client: AtlanClient):
filter_criteria = {
"invalidKey": "someValue"
}
filter_criteria = {"invalidKey": "someValue"}
try:
credentials = client.credentials.get_all(filter=filter_criteria)
client.credentials.get_all(filter=filter_criteria)
pytest.fail("Expected an error due to invalid filter key, but none occurred.")
except Exception as e:
assert "400" in str(e), f"Expected a 400 error, but got: {e}"

def test_get_all_credentials_with_invalid_filter_value(client: AtlanClient):

filter_criteria = {
"connectorType": 123 # Invalid type (should be a string)
}
def test_get_all_credentials_with_invalid_filter_value(client: AtlanClient):
filter_criteria = {"connector_type": 123}

try:
credentials = client.credentials.get_all(filter=filter_criteria)
client.credentials.get_all(filter=filter_criteria)
pytest.fail("Expected an error due to invalid filter value, but none occurred.")
except Exception as e:
assert "400" in str(e), f"Expected a 400 error, but got: {e}"

def test_get_all_credentials_with_large_limit(client: AtlanClient):

limit = 100 # Larger than the total number of records
def test_get_all_credentials_with_large_limit(client: AtlanClient):
limit = 100
credentials = client.credentials.get_all(limit=limit)

assert credentials, "Expected credentials but found None"
assert credentials.records is not None, "Expected records but found None"
assert len(credentials.records) <= limit, f"Expected at most {limit} records, but got {len(credentials.records)}"
assert (
len(credentials.records or []) <= limit
), f"Expected at most {limit} records, but got {len(credentials.records or [])}"

0 comments on commit f2bd60f

Please sign in to comment.