Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-72: Add missing client tests for connection operations #45374

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions task_sdk/tests/api/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
import uuid6

from airflow.sdk.api.client import Client, RemoteValidationError, ServerResponseError
from airflow.sdk.api.datamodels._generated import VariableResponse, XComResponse
from airflow.sdk.execution_time.comms import DeferTask, RescheduleTask
from airflow.sdk.api.datamodels._generated import ConnectionResponse, VariableResponse, XComResponse
from airflow.sdk.exceptions import ErrorType
from airflow.sdk.execution_time.comms import DeferTask, ErrorResponse, RescheduleTask
from airflow.utils import timezone
from airflow.utils.state import TerminalTIState

Expand Down Expand Up @@ -538,3 +539,50 @@ def handle_request(request: httpx.Request) -> httpx.Response:
map_index=2,
)
assert result == {"ok": True}


class TestConnectionOperations:
"""
Test that the TestConnectionOperations class works as expected. While the operations are simple, it
still catches the basic functionality of the client for connections including endpoint and
response parsing.
"""

def test_connection_get_success(self):
# Simulate a successful response from the server with a connection
def handle_request(request: httpx.Request) -> httpx.Response:
if request.url.path == "/connections/test_conn":
return httpx.Response(
status_code=200,
json={
"conn_id": "test_conn",
"conn_type": "mysql",
},
)
return httpx.Response(status_code=400, json={"detail": "Bad Request"})

client = make_client(transport=httpx.MockTransport(handle_request))
result = client.connections.get(conn_id="test_conn")

assert isinstance(result, ConnectionResponse)
assert result.conn_id == "test_conn"
assert result.conn_type == "mysql"

def test_connection_get_404_not_found(self):
# Simulate a successful response from the server with a connection
def handle_request(request: httpx.Request) -> httpx.Response:
if request.url.path == "/connections/test_conn":
return httpx.Response(
status_code=404,
json={
"reason": "not_found",
"message": "Connection with ID test_conn not found",
},
)
return httpx.Response(status_code=400, json={"detail": "Bad Request"})

client = make_client(transport=httpx.MockTransport(handle_request))
result = client.connections.get(conn_id="test_conn")

assert isinstance(result, ErrorResponse)
assert result.error == ErrorType.CONNECTION_NOT_FOUND