Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Add "Destinations 1.0" handling to PyAirbyte: sync IDs, generation IDs, and stream success statuses #330

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion airbyte/_future_cdk/catalog_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

from typing import TYPE_CHECKING, Any, final

import ulid
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved

from airbyte_protocol.models import (
ConfiguredAirbyteCatalog,
)
Expand Down Expand Up @@ -44,7 +46,24 @@ def __init__(
Since the catalog is passed by reference, the catalog manager may be updated with new
streams as they are discovered.
"""
self._catalog: ConfiguredAirbyteCatalog = configured_catalog
self._catalog: ConfiguredAirbyteCatalog = self.validate_catalog(configured_catalog)

@staticmethod
def validate_catalog(catalog: ConfiguredAirbyteCatalog) -> None:
"""Validate the catalog to ensure it is valid.

This requires ensuring that `generationId` and `minGenerationId` are both set. If
not, both values will be set to `1`.
"""
for stream in catalog.streams:
if stream.generation_id is None:
stream.generation_id = 1
if stream.minimum_generation_id is None:
stream.minimum_generation_id = 1
if stream.sync_id is None:
stream.sync_id = 1 # This should ideally increment monotonically with each sync.

return catalog

@property
def configured_catalog(self) -> ConfiguredAirbyteCatalog:
Expand Down
3 changes: 3 additions & 0 deletions airbyte/_message_iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)

from airbyte.constants import AB_EXTRACTED_AT_COLUMN
from airbyte.progress import _new_stream_success_message


if TYPE_CHECKING:
Expand Down Expand Up @@ -90,6 +91,8 @@ def generator() -> Generator[AirbyteMessage, None, None]:
state=state_provider.get_stream_state(stream_name),
)

yield _new_stream_success_message(stream_name)

return cls(generator())

@classmethod
Expand Down
46 changes: 43 additions & 3 deletions airbyte/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,15 @@
from rich.markdown import Markdown as RichMarkdown
from typing_extensions import Literal

from airbyte_protocol.models import AirbyteStreamStatus, Type
from airbyte_protocol.models import (
AirbyteMessage,
AirbyteStreamStatus,
AirbyteStreamStatusTraceMessage,
AirbyteTraceMessage,
StreamDescriptor,
TraceType,
Type,
)

from airbyte._util import meta
from airbyte._util.telemetry import EventState, EventType, send_telemetry
Expand All @@ -40,8 +48,6 @@
from collections.abc import Generator, Iterable
from types import ModuleType

from airbyte_protocol.models import AirbyteMessage

from airbyte._message_iterators import AirbyteMessageIterator
from airbyte.caches.base import CacheBase
from airbyte.destinations.base import Destination
Expand All @@ -68,6 +74,24 @@
IS_NOTEBOOK = False


def _new_stream_success_message(stream_name: str) -> AirbyteMessage:
"""Return a new stream success message."""
return AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
stream=stream_name,
emitted_at=pendulum.now().float_timestamp,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(
name=stream_name,
),
status=AirbyteStreamStatus.COMPLETE,
),
),
)


class ProgressStyle(Enum):
"""An enum of progress bar styles."""

Expand Down Expand Up @@ -201,6 +225,8 @@ def __init__(
def tally_records_read(
self,
messages: Iterable[AirbyteMessage],
*,
auto_close_streams: bool = False,
) -> Generator[AirbyteMessage, Any, None]:
"""This method simply tallies the number of records processed and yields the messages."""
# Update the display before we start.
Expand Down Expand Up @@ -247,6 +273,11 @@ def tally_records_read(
# Update the display.
self._update_display()

if auto_close_streams:
for stream_name in self._unclosed_stream_names:
yield _new_stream_success_message(stream_name)
self._log_stream_read_end(stream_name)

def tally_pending_writes(
self,
messages: IO[str] | AirbyteMessageIterator,
Expand Down Expand Up @@ -342,6 +373,15 @@ def _log_stream_read_end(self, stream_name: str) -> None:
)
self.stream_read_end_times[stream_name] = time.time()

@property
def _unclosed_stream_names(self) -> list[str]:
"""Return a list of streams that have not yet been fully read."""
return [
stream_name
for stream_name in self.stream_read_counts
if stream_name not in self.stream_read_end_times
]

def log_success(
self,
) -> None:
Expand Down
59 changes: 59 additions & 0 deletions examples/run_bigquery_destination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""
Usage:
poetry install
poetry run python examples/run_bigquery_destination.py
"""

from __future__ import annotations

import tempfile
import warnings
from typing import cast

import airbyte as ab
from airbyte.secrets.base import SecretString
from airbyte.secrets.google_gsm import GoogleGSMSecretManager

warnings.filterwarnings("ignore", message="Cannot create BigQuery Storage client")


AIRBYTE_INTERNAL_GCP_PROJECT = "dataline-integration-testing"
SECRET_NAME = "SECRET_DESTINATION-BIGQUERY_CREDENTIALS__CREDS"

bigquery_destination_secret: dict = cast(
SecretString,
GoogleGSMSecretManager(
project=AIRBYTE_INTERNAL_GCP_PROJECT,
credentials_json=ab.get_secret("GCP_GSM_CREDENTIALS"),
).get_secret(SECRET_NAME),
).parse_json()


def main() -> None:
source = ab.get_source(
"source-faker",
config={"count": 1000, "seed": 0, "parallelism": 1, "always_updated": False},
install_if_missing=True,
)
source.check()
source.select_all_streams()

with tempfile.NamedTemporaryFile(mode="w+", delete=False, encoding="utf-8") as temp:
# Write credentials to the temp file
temp.write(bigquery_destination_secret["credentials_json"])
temp.flush()
temp.close()

destination = ab.get_destination(
"destination-bigquery",
config={**bigquery_destination_secret, "dataset_id": "pyairbyte_tests"},
)
write_result = destination.write(
source,
# cache=False, # Toggle comment to test with/without caching
)


if __name__ == "__main__":
main()
Loading