Skip to content

Commit

Permalink
AirbyteLib: Improved progress print, especially in the terminal (#34973)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored Feb 7, 2024
1 parent cc2a6e2 commit e06243b
Show file tree
Hide file tree
Showing 9 changed files with 332 additions and 251 deletions.
3 changes: 2 additions & 1 deletion airbyte-lib/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.venv*
.venv*
.env
4 changes: 4 additions & 0 deletions airbyte-lib/airbyte_lib/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ def ensure_installation(

# If the connector path does not exist, uninstall and re-install.
# This is sometimes caused by a failed or partial installation.
print(
"Connector executable not found within the virtual environment "
f"at {self._get_connector_path()!s}.\nReinstalling..."
)
self.uninstall()
self.install()
reinstalled = True
Expand Down
3 changes: 3 additions & 0 deletions airbyte-lib/airbyte_lib/_factories/connector_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def get_source(
raise FileNotFoundError(local_executable)
local_executable = Path(which_executable).absolute()

print(f"Using local `{name}` executable: {local_executable!s}")
return Source(
name=name,
config=config,
Expand All @@ -66,6 +67,8 @@ def get_source(
),
)

# else: we are installing a connector in a virtual environment:

metadata: ConnectorMetadata | None = None
try:
metadata = get_connector_metadata(name)
Expand Down
98 changes: 78 additions & 20 deletions airbyte-lib/airbyte_lib/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
import sys
import time
from contextlib import suppress
from enum import Enum, auto
from typing import cast

from rich.errors import LiveError
from rich.live import Live as RichLive
from rich.markdown import Markdown as RichMarkdown


DEFAULT_REFRESHES_PER_SECOND = 2
IS_REPL = hasattr(sys, "ps1") # True if we're in a Python REPL, in which case we can use Rich.

try:
Expand All @@ -26,6 +28,25 @@
IS_NOTEBOOK = False


class ProgressStyle(Enum):
"""An enum of progress bar styles."""

AUTO = auto()
"""Automatically select the best style for the environment."""

RICH = auto()
"""A Rich progress bar."""

IPYTHON = auto()
"""Use IPython display methods."""

PLAIN = auto()
"""A plain text progress print."""

NONE = auto()
"""Skip progress prints."""


MAX_UPDATE_FREQUENCY = 1000
"""The max number of records to read before updating the progress bar."""

Expand Down Expand Up @@ -71,7 +92,10 @@ def _get_elapsed_time_str(seconds: int) -> str:
class ReadProgress:
"""A simple progress bar for the command line and IPython notebooks."""

def __init__(self) -> None:
def __init__(
self,
style: ProgressStyle = ProgressStyle.AUTO,
) -> None:
"""Initialize the progress tracker."""
# Streams expected (for progress bar)
self.num_streams_expected = 0
Expand All @@ -95,20 +119,48 @@ def __init__(self) -> None:

self.last_update_time: float | None = None

self.rich_view: RichLive | None = None
if not IS_NOTEBOOK and not IS_REPL:
# If we're in a terminal, use a Rich view to display the progress updates.
self.rich_view = RichLive()
try:
self.rich_view.start()
except LiveError:
self.rich_view = None
self._rich_view: RichLive | None = None
self.style: ProgressStyle = style
if self.style == ProgressStyle.AUTO:
self.style = ProgressStyle.PLAIN
if IS_NOTEBOOK:
self.style = ProgressStyle.IPYTHON

elif IS_REPL:
self.style = ProgressStyle.PLAIN

else:
# Test for Rich availability:
self._rich_view = RichLive()
try:
self._rich_view.start()
self._rich_view.stop()
self._rich_view = None
self.style = ProgressStyle.RICH
except LiveError:
# Rich live view not available. Using plain text progress.
self._rich_view = None
self.style = ProgressStyle.PLAIN

def _start(self) -> None:
"""Start the progress bar."""
if self.style == ProgressStyle.RICH and not self._rich_view:
self._rich_view = RichLive(
auto_refresh=True,
refresh_per_second=DEFAULT_REFRESHES_PER_SECOND,
)
self._rich_view.start()

def _stop(self) -> None:
"""Stop the progress bar."""
if self._rich_view:
with suppress(Exception):
self._rich_view.stop()
self._rich_view = None

def __del__(self) -> None:
"""Close the Rich view."""
if self.rich_view:
with suppress(Exception):
self.rich_view.stop()
self._stop()

def log_success(self) -> None:
"""Log success and stop tracking progress."""
Expand All @@ -118,9 +170,7 @@ def log_success(self) -> None:
self.finalize_end_time = time.time()

self.update_display(force_refresh=True)
if self.rich_view:
with suppress(Exception):
self.rich_view.stop()
self._stop()

def reset(self, num_streams_expected: int) -> None:
"""Reset the progress tracker."""
Expand All @@ -144,6 +194,8 @@ def reset(self, num_streams_expected: int) -> None:
self.total_batches_finalized = 0
self.finalized_stream_names = set()

self._start()

@property
def elapsed_seconds(self) -> int:
"""Return the number of seconds elapsed since the read operation started."""
Expand Down Expand Up @@ -242,11 +294,10 @@ def log_batches_finalized(self, stream_name: str, num_batches: int) -> None:
def log_stream_finalized(self, stream_name: str) -> None:
"""Log that a stream has been finalized."""
self.finalized_stream_names.add(stream_name)
self.update_display(force_refresh=True)
if len(self.finalized_stream_names) == self.num_streams_expected:
self.log_success()

self.update_display(force_refresh=True)

def update_display(self, *, force_refresh: bool = False) -> None:
"""Update the display."""
# Don't update more than twice per second unless force_refresh is True.
Expand All @@ -259,13 +310,20 @@ def update_display(self, *, force_refresh: bool = False) -> None:

status_message = self._get_status_message()

if IS_NOTEBOOK:
if self.style == ProgressStyle.IPYTHON:
# We're in a notebook so use the IPython display.
ipy_display.clear_output(wait=True)
ipy_display.display(ipy_display.Markdown(status_message))

elif self.rich_view is not None:
self.rich_view.update(RichMarkdown(status_message))
elif self.style == ProgressStyle.RICH and self._rich_view is not None:
self._rich_view.update(RichMarkdown(status_message))

elif self.style == ProgressStyle.PLAIN:
# TODO: Add a plain text progress print option that isn't too noisy.
pass

elif self.style == ProgressStyle.NONE:
pass

self.last_update_time = time.time()

Expand Down
6 changes: 5 additions & 1 deletion airbyte-lib/airbyte_lib/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import TYPE_CHECKING, Any

import jsonschema
import pendulum
import yaml
from rich import print

Expand Down Expand Up @@ -338,7 +339,8 @@ def check(self) -> None:
for msg in self._execute(["check", "--config", config_file]):
if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus:
if msg.connectionStatus.status != Status.FAILED:
return # Success!
print(f"Connection check succeeded for `{self.name}`.")
return

raise exc.AirbyteConnectorCheckFailedError(
help_url=self.docs_url,
Expand Down Expand Up @@ -534,6 +536,7 @@ def read(
stream_names=set(self.get_selected_streams()),
)
state = cache.get_state() if not force_full_refresh else None
print(f"Started `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}...")
cache.process_airbyte_messages(
self._tally_records(
self._read(
Expand All @@ -543,6 +546,7 @@ def read(
),
write_strategy=write_strategy,
)
print(f"Completed `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}.")

return ReadResult(
processed_records=self._processed_records,
Expand Down
7 changes: 6 additions & 1 deletion airbyte-lib/examples/run_faker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@
import airbyte_lib as ab


SCALE = 5_000_000 # Number of records to generate between users and purchases.
SCALE = 500_000 # Number of records to generate between users and purchases.

# This is a dummy secret, just to test functionality.
DUMMY_SECRET = ab.get_secret("DUMMY_SECRET")


print("Installing Faker source...")
source = ab.get_source(
"source-faker",
config={"count": SCALE / 2},
install_if_missing=True,
)
print("Faker source installed.")
source.check()
source.set_streams(["products", "users", "purchases"])

Expand Down
10 changes: 7 additions & 3 deletions airbyte-lib/examples/run_github.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""A simple test of AirbyteLib, using the Faker source connector.
Usage (from airbyte-lib root directory):
> poetry run python ./examples/run_faker.py
> poetry run python ./examples/run_github.py
No setup is needed, but you may need to delete the .venv-source-faker folder
if your installation gets interrupted or corrupted.
Expand All @@ -12,15 +12,19 @@
import airbyte_lib as ab


# Create a token here: https://github.com/settings/tokens
GITHUB_TOKEN = ab.get_secret("GITHUB_PERSONAL_ACCESS_TOKEN")


source = ab.get_source("source-github")
source.set_config(
{"repositories": ["airbytehq/airbyte"], "credentials": {"personal_access_token": GITHUB_TOKEN}}
{
"repositories": ["airbytehq/quickstarts"],
"credentials": {"personal_access_token": GITHUB_TOKEN},
}
)
source.check()
source.set_streams(["products", "users", "purchases"])
source.set_streams(["issues", "pull_requests", "commits"])

result = source.read()

Expand Down
Loading

0 comments on commit e06243b

Please sign in to comment.