Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add previously experimental Cloud "deploy" functionality (DRAFT - DO NOT MERGE) #419

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 151 additions & 39 deletions airbyte/_util/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from __future__ import annotations

import json
from typing import Any
from typing import TYPE_CHECKING, Any

import airbyte_api
from airbyte_api import api, models
Expand All @@ -24,9 +24,14 @@
AirbyteError,
AirbyteMissingResourceError,
AirbyteMultipleResourcesError,
PyAirbyteInputError,
)


if TYPE_CHECKING:
from collections.abc import Callable


JOB_WAIT_INTERVAL_SECS = 2.0
JOB_WAIT_TIMEOUT_SECS_DEFAULT = 60 * 60 # 1 hour
CLOUD_API_ROOT = "https://api.airbyte.com/v1"
Expand All @@ -43,7 +48,7 @@ def get_airbyte_server_instance(
*,
api_key: str,
api_root: str,
) -> airbyte_api.Airbyte:
) -> airbyte_api.AirbyteAPI:
"""Get an Airbyte instance."""
return airbyte_api.AirbyteAPI(
security=models.Security(
Expand Down Expand Up @@ -84,36 +89,126 @@ def get_workspace(
)


# List, get, and run connections
# List resources


def list_connections(
workspace_id: str,
*,
api_root: str,
api_key: str,
) -> list[api.ConnectionResponse]:
name: str | None = None,
name_filter: Callable[[str], bool] | None = None,
) -> list[models.ConnectionResponse]:
"""Get a connection."""
if name and name_filter:
raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.")

name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True)

_ = workspace_id # Not used (yet)
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
api_root=api_root,
)
response = airbyte_instance.connections.list_connections(
api.ListConnectionsRequest()(
api.ListConnectionsRequest(
workspace_ids=[workspace_id],
),
)

if status_ok(response.status_code) and response.connections_response:
return response.connections_response.data
if not status_ok(response.status_code) and response.connections_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.connections_response is not None
return [
connection
for connection in response.connections_response.data
if name_filter(connection.name)
]


raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
def list_sources(
workspace_id: str,
*,
api_root: str,
api_key: str,
name: str | None = None,
name_filter: Callable[[str], bool] | None = None,
) -> list[models.SourceResponse]:
"""Get a connection."""
if name and name_filter:
raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.")

name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True)

_ = workspace_id # Not used (yet)
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
api_root=api_root,
)
response = airbyte_instance.sources.list_sources(
api.ListSourcesRequest(
workspace_ids=[workspace_id],
),
)

if not status_ok(response.status_code) and response.sources_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.sources_response is not None
return [source for source in response.sources_response.data if name_filter(source.name)]


def list_destinations(
workspace_id: str,
*,
api_root: str,
api_key: str,
name: str | None = None,
name_filter: Callable[[str], bool] | None = None,
) -> list[models.DestinationResponse]:
"""Get a connection."""
if name and name_filter:
raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.")

name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True)

_ = workspace_id # Not used (yet)
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
api_root=api_root,
)
response = airbyte_instance.destinations.list_destinations(
api.ListDestinationsRequest(
workspace_ids=[workspace_id],
),
)

if not status_ok(response.status_code) and response.destinations_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.destinations_response is not None
return [
destination
for destination in response.destinations_response.data
if name_filter(destination.name)
]


# Get and run connections


def get_connection(
Expand All @@ -122,7 +217,7 @@ def get_connection(
*,
api_root: str,
api_key: str,
) -> api.ConnectionResponse:
) -> models.ConnectionResponse:
"""Get a connection."""
_ = workspace_id # Not used (yet)
airbyte_instance = get_airbyte_server_instance(
Expand All @@ -137,7 +232,11 @@ def get_connection(
if status_ok(response.status_code) and response.connection_response:
return response.connection_response

raise AirbyteMissingResourceError(connection_id, "connection", response.text)
raise AirbyteMissingResourceError(
resource_name_or_id=connection_id,
resource_type="connection",
log_text=response.raw_response.text,
)


def run_connection(
Expand All @@ -146,7 +245,7 @@ def run_connection(
*,
api_root: str,
api_key: str,
) -> api.ConnectionResponse:
) -> models.JobResponse:
"""Get a connection.

If block is True, this will block until the connection is finished running.
Expand Down Expand Up @@ -186,7 +285,7 @@ def get_job_logs(
*,
api_root: str,
api_key: str,
) -> list[api.JobResponse]:
) -> list[models.JobResponse]:
"""Get a job's logs."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
Expand All @@ -213,11 +312,11 @@ def get_job_logs(


def get_job_info(
job_id: str,
job_id: int,
*,
api_root: str,
api_key: str,
) -> api.JobResponse:
) -> models.JobResponse:
"""Get a job."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
Expand All @@ -231,7 +330,11 @@ def get_job_info(
if status_ok(response.status_code) and response.job_response:
return response.job_response

raise AirbyteMissingResourceError(job_id, "job", response.text)
raise AirbyteMissingResourceError(
resource_name_or_id=str(job_id),
resource_type="job",
log_text=response.raw_response.text,
)


# Create, get, and delete sources
Expand All @@ -244,7 +347,7 @@ def create_source(
config: dict[str, Any],
api_root: str,
api_key: str,
) -> api.SourceResponse:
) -> models.SourceResponse:
"""Get a connection."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
Expand Down Expand Up @@ -273,7 +376,7 @@ def get_source(
*,
api_root: str,
api_key: str,
) -> api.SourceResponse:
) -> models.SourceResponse:
"""Get a connection."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
Expand All @@ -284,10 +387,14 @@ def get_source(
source_id=source_id,
),
)
if status_ok(response.status_code) and response.connection_response:
return response.connection_response
if status_ok(response.status_code) and response.source_response:
return response.source_response

raise AirbyteMissingResourceError(source_id, "source", response.text)
raise AirbyteMissingResourceError(
resource_name_or_id=source_id,
resource_type="source",
log_text=response.raw_response.text,
)


def delete_source(
Expand Down Expand Up @@ -327,7 +434,7 @@ def create_destination(
config: dict[str, Any],
api_root: str,
api_key: str,
) -> api.DestinationResponse:
) -> models.DestinationResponse:
"""Get a connection."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
Expand All @@ -354,7 +461,7 @@ def get_destination(
*,
api_root: str,
api_key: str,
) -> api.DestinationResponse:
) -> models.DestinationResponse:
"""Get a connection."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
Expand All @@ -365,33 +472,38 @@ def get_destination(
destination_id=destination_id,
),
)
if status_ok(response.status_code):
if status_ok(response.status_code) and response.destination_response:
# TODO: This is a temporary workaround to resolve an issue where
# the destination API response is of the wrong type.
# https://github.com/airbytehq/pyairbyte/issues/320
raw_response: dict[str, Any] = json.loads(response.raw_response.text)
raw_configuration: dict[str, Any] = raw_response["configuration"]

destination_type = raw_response.get("destinationType")
if destination_type == "snowflake":
response.destination_response.configuration = models.DestinationSnowflake.from_dict(
raw_configuration,
response.destination_response.configuration = models.DestinationSnowflake(
**raw_configuration,
)
if destination_type == "bigquery":
response.destination_response.configuration = models.DestinationBigquery.from_dict(
raw_configuration,
response.destination_response.configuration = models.DestinationBigquery(
**raw_configuration,
)
if destination_type == "postgres":
response.destination_response.configuration = models.DestinationPostgres.from_dict(
raw_configuration,
response.destination_response.configuration = models.DestinationPostgres(
**raw_configuration,
)
if destination_type == "duckdb":
response.destination_response.configuration = models.DestinationDuckdb.from_dict(
raw_configuration,
response.destination_response.configuration = models.DestinationDuckdb(
**raw_configuration,
Comment on lines +475 to +497
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Question: Simplify Destination Type Handling

Currently, the code checks for each destination type individually. Would it be beneficial to use a mapping or factory pattern to reduce repetition and improve scalability? Wdyt?

)

return response.destination_response

raise AirbyteMissingResourceError(destination_id, "destination", response.text)
raise AirbyteMissingResourceError(
resource_name_or_id=destination_id,
resource_type="destination",
log_text=response.raw_response.text,
)


def delete_destination(
Expand Down Expand Up @@ -448,17 +560,17 @@ def create_connection(
)
stream_configurations.append(stream_configuration)

stream_configurations = models.StreamConfigurations(stream_configurations)
stream_configurations_obj = models.StreamConfigurations(stream_configurations)
response = airbyte_instance.connections.create_connection(
models.ConnectionCreateRequest(
name=name,
source_id=source_id,
destination_id=destination_id,
configurations=stream_configurations,
configurations=stream_configurations_obj,
prefix=prefix,
),
)
if not status_ok(response.status_code):
if not status_ok(response.status_code) or response.connection_response is None:
raise AirbyteError(
context={
"source_id": source_id,
Expand Down
22 changes: 22 additions & 0 deletions airbyte/_util/text_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Utility functions for working with text."""

from __future__ import annotations

import ulid


def generate_ulid() -> str:
"""Generate a new ULID."""
return str(ulid.ULID())


def generate_random_suffix() -> str:
"""Generate a random suffix for use in temporary names.

By default, this function generates a ULID and returns a 9-character string
which will be monotonically sortable. It is not guaranteed to be unique but
is sufficient for small-scale and medium-scale use cases.
"""
ulid_str = generate_ulid()
return ulid_str[:6] + ulid_str[-3:]
4 changes: 0 additions & 4 deletions airbyte/caches/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ class CacheBase(SqlConfig, AirbyteWriterInterface):

_name: str = PrivateAttr()

_deployed_api_root: str | None = PrivateAttr(default=None)
_deployed_workspace_id: str | None = PrivateAttr(default=None)
_deployed_destination_id: str | None = PrivateAttr(default=None)

_sql_processor_class: type[SqlProcessorBase] = PrivateAttr()
_read_processor: SqlProcessorBase = PrivateAttr()

Expand Down
Loading
Loading