Skip to content

Commit

Permalink
Merge branch 'airbytehq:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
Udit107710 authored Sep 3, 2024
2 parents fcce6bf + 1775cb4 commit 0975d3d
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pydoc_preview.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:

- name: Generate documentation
run: |
poetry run generate-docs
poetry run poe docs-generate
- name: Upload artifact
uses: actions/upload-pages-artifact@v3
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pydoc_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:

- name: Generate documentation
run: |
poetry run generate-docs
poetry run poe docs-generate
- name: Upload artifact
uses: actions/upload-pages-artifact@v3
Expand Down
8 changes: 8 additions & 0 deletions airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

from airbyte._executors.base import Executor
from airbyte._message_iterators import AirbyteMessageIterator
from airbyte.progress import ProgressTracker


MAX_LOG_LINES = 20
Expand Down Expand Up @@ -352,6 +353,8 @@ def _execute(
self,
args: list[str],
stdin: IO[str] | AirbyteMessageIterator | None = None,
*,
progress_tracker: ProgressTracker | None = None,
) -> Generator[AirbyteMessage, None, None]:
"""Execute the connector with the given arguments.
Expand All @@ -371,6 +374,11 @@ def _execute(
for line in self.executor.execute(args, stdin=stdin):
try:
message: AirbyteMessage = AirbyteMessage.model_validate_json(json_data=line)
if progress_tracker and message.record:
progress_tracker.tally_bytes_read(
len(line),
stream_name=message.record.stream,
)
self._peek_airbyte_message(message)
yield message

Expand Down
105 changes: 103 additions & 2 deletions airbyte/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import datetime
import importlib
import json
import math
import os
import sys
Expand Down Expand Up @@ -207,6 +208,7 @@ def __init__(
self.stream_read_counts: dict[str, int] = defaultdict(int)
self.stream_read_start_times: dict[str, float] = {}
self.stream_read_end_times: dict[str, float] = {}
self.stream_bytes_read: dict[str, int] = defaultdict(int)

# Cache Writes
self.total_records_written = 0
Expand Down Expand Up @@ -238,6 +240,27 @@ def _print_info_message(
if self._file_logger:
self._file_logger.info(message)

@property
def bytes_tracking_enabled(self) -> bool:
"""Return True if bytes are being tracked."""
return bool(self.stream_bytes_read)

@property
def total_bytes_read(self) -> int:
"""Return the total number of bytes read.
Return None if bytes are not being tracked.
"""
return sum(self.stream_bytes_read.values())

@property
def total_megabytes_read(self) -> float:
"""Return the total number of bytes read.
Return None if no bytes have been read, as this is generally due to bytes not being tracked.
"""
return self.total_bytes_read / 1_000_000

def tally_records_read(
self,
messages: Iterable[AirbyteMessage],
Expand Down Expand Up @@ -351,6 +374,13 @@ def tally_confirmed_writes(

self._update_display(force_refresh=True)

def tally_bytes_read(self, bytes_read: int, stream_name: str) -> None:
"""Tally the number of bytes read.
Unlike the other tally methods, this method does not yield messages.
"""
self.stream_bytes_read[stream_name] += bytes_read

# Logging methods

@property
Expand Down Expand Up @@ -393,6 +423,72 @@ def _log_stream_read_end(self, stream_name: str) -> None:
)
self.stream_read_end_times[stream_name] = time.time()

def _log_read_metrics(self) -> None:
"""Log read performance metrics."""
# Source performance metrics
if not self.total_records_read or not self._file_logger:
return

perf_metrics: dict[str, Any] = {
"job_description": {
"description": self.job_description,
}
}
if self._source:
perf_metrics["job_description"]["source"] = self._source.name
if self._cache:
perf_metrics["job_description"]["cache"] = type(self._cache).__name__
if self._destination:
perf_metrics["job_description"]["destination"] = self._destination.name

perf_metrics["records_read"] = self.total_records_read
perf_metrics["read_time_seconds"] = self.elapsed_read_seconds
perf_metrics["read_start_time"] = self.read_start_time
perf_metrics["read_end_time"] = self.read_end_time
if self.elapsed_read_seconds > 0:
perf_metrics["records_per_second"] = round(
self.total_records_read / self.elapsed_read_seconds, 4
)
if self.bytes_tracking_enabled:
mb_read = self.total_megabytes_read
perf_metrics["mb_read"] = mb_read
perf_metrics["mb_per_second"] = round(mb_read / self.elapsed_read_seconds, 4)

stream_metrics = {}
for stream_name, count in self.stream_read_counts.items():
stream_metrics[stream_name] = {
"records_read": count,
"read_start_time": self.stream_read_start_times.get(stream_name),
"read_end_time": self.stream_read_end_times.get(stream_name),
}
if (
stream_name in self.stream_read_end_times
and stream_name in self.stream_read_start_times
and count > 0
):
duration: float = (
self.stream_read_end_times[stream_name]
- self.stream_read_start_times[stream_name]
)
stream_metrics[stream_name]["read_time_seconds"] = duration
if duration > 0:
stream_metrics[stream_name]["records_per_second"] = round(
count
/ (
self.stream_read_end_times[stream_name]
- self.stream_read_start_times[stream_name]
),
4,
)
if self.bytes_tracking_enabled:
mb_read = self.stream_bytes_read[stream_name] / 1_000_000
stream_metrics[stream_name]["mb_read"] = mb_read
stream_metrics[stream_name]["mb_per_second"] = round(mb_read / duration, 4)

perf_metrics["stream_metrics"] = stream_metrics

self._file_logger.info(json.dumps({"read_performance_metrics": perf_metrics}))

@property
def _unclosed_stream_names(self) -> list[str]:
"""Return a list of streams that have not yet been fully read."""
Expand All @@ -416,6 +512,7 @@ def log_success(
self._print_info_message(
f"Completed `{self.job_description}` sync at `{pendulum.now().format('HH:mm:ss')}`."
)
self._log_read_metrics()
send_telemetry(
source=self._source,
cache=self._cache,
Expand Down Expand Up @@ -663,8 +760,12 @@ def _get_status_message(self) -> str:
# Format start time as a friendly string in local timezone:
start_time_str = _to_time_str(self.read_start_time)
records_per_second: float = 0.0
mb_per_second_str = ""
if self.elapsed_read_seconds > 0:
records_per_second = self.total_records_read / self.elapsed_read_seconds
if self.bytes_tracking_enabled:
mb_per_second = self.total_megabytes_read / self.elapsed_read_seconds
mb_per_second_str = f", {mb_per_second:,.2f} MB/s"

status_message = HORIZONTAL_LINE + f"\n### Sync Progress: `{self.job_description}`\n\n"

Expand All @@ -680,7 +781,7 @@ def join_streams_strings(streams_list: list[str]) -> str:
f"**Started reading from source at `{start_time_str}`:**\n\n"
f"- Read **{self.total_records_read:,}** records "
f"over **{self.elapsed_read_time_string}** "
f"({records_per_second:,.1f} records / second).\n\n"
f"({records_per_second:,.1f} records/s{mb_per_second_str}).\n\n"
)

if self.stream_read_counts:
Expand Down Expand Up @@ -747,7 +848,7 @@ def join_streams_strings(streams_list: list[str]) -> str:
status_message += (
f"- Sent **{self.total_destination_records_delivered:,} records** "
f"to destination over **{self.total_destination_write_time_str}** "
f"({self.destination_records_delivered_per_second:,.1f} records per second)."
f"({self.destination_records_delivered_per_second:,.1f} records/s)."
"\n\n"
)
status_message += (
Expand Down
1 change: 1 addition & 0 deletions airbyte/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ def _read_with_catalog(
"--state",
state_file,
],
progress_tracker=progress_tracker,
)
yield from progress_tracker.tally_records_read(message_generator)
progress_tracker.log_read_complete()
Expand Down
14 changes: 11 additions & 3 deletions docs/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,20 @@ Regular documentation lives in the `/docs` folder. Based on the doc strings of p
To generate the documentation, run:

```console
poe generate-docs
poe docs-generate
```

or `poetry run poe generate-docs` if you don't have [Poe](https://poethepoet.natn.io/index.html) installed.
Or to build and open in one step:

The `generate-docs` CLI command is mapped to the `run()` function of `docs/generate.py`.

```console
poe docs-preview
```


or `poetry run poe docs-preview` if you don't have [Poe](https://poethepoet.natn.io/index.html) installed.

The `docs-generate` Poe task is mapped to the `run()` function of `docs/generate.py`.

Documentation pages will be generated in the `docs/generated` folder. The `test_docs.py` test in pytest will automatically update generated content. This updates must be manually committed before docs tests will pass.

Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ venvPath = "./" # Assuming .venv is at the root of your project
venv = ".venv"

[tool.poetry.scripts]
generate-docs = "docs.generate:run"
airbyte-lib-validate-source = "airbyte.validate:run"

[tool.poe.tasks]
Expand All @@ -321,7 +320,8 @@ coverage-reset = { shell = "coverage erase" }

check = { shell = "ruff check . && mypy . && pytest --collect-only -qq" }

docs-generate = {env = {PDOC_ALLOW_EXEC = "1"}, shell = "generate-docs && open docs/generated/index.html" }
docs-generate = {env = {PDOC_ALLOW_EXEC = "1"}, cmd = "python -m docs.generate run"}
docs-preview = {shell = "poe docs-generate && open docs/generated/index.html"}

fix = { shell = "ruff format . && ruff check --fix -s || ruff format ." }
fix-unsafe = { shell = "ruff format . && ruff check --fix --unsafe-fixes . && ruff format ." }
Expand Down
4 changes: 2 additions & 2 deletions tests/docs_tests/test_docs_checked_in.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
@pytest.mark.filterwarnings("ignore")
def test_docs_generation():
"""
Docs need to be able to be generated via `poetry run generate-docs`.
Docs need to be able to be generated via `poetry run poe docs-generate`.
This test runs the docs generation and ensures that it can complete successfully.
Expand All @@ -24,4 +24,4 @@ def test_docs_generation():
# if there is a diff, fail the test
assert (
diff == 0
), "Docs are out of date. Please run `poetry run generate-docs` and commit the changes."
), "Docs are out of date. Please run `poetry run poe docs-generate` and commit the changes."

0 comments on commit 0975d3d

Please sign in to comment.