Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Additional config options, upgrade SDK, fix tests, run pip check #23

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ jobs:
build:
strategy:
matrix:
py_version: ["3.8", "3.9", "3.10"]
py_version: ["3.9", "3.10"]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Python ${{ matrix.py_version }}
uses: actions/setup-python@v3
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.py_version }}
- name: Install Tap Airbyte Wrapper
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/project_add.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
runs-on: ubuntu-latest
if: ${{ github.actor != 'dependabot[bot]' }}
steps:
- uses: actions/add-to-project@v0.5.0
- uses: actions/add-to-project@v1.0.2
with:
project-url: https://github.com/orgs/MeltanoLabs/projects/3
github-token: ${{ secrets.MELTYBOT_PROJECT_ADD_PAT }}
1,568 changes: 530 additions & 1,038 deletions poetry.lock

Large diffs are not rendered by default.

75 changes: 48 additions & 27 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,44 +1,65 @@
[tool.poetry]
name = "tap-airbyte"
version = "0.6.0"
description = "`tap-airbyte` is a Singer tap for Airbyte, built with the Meltano Singer SDK."
version = "0.7.0"
description = "Singer tap for Airbyte, built with the Meltano Singer SDK."
readme = "README.md"
authors = ["Alex Butler"]
keywords = ["ELT", "Airbyte"]
license = "Apache 2.0"
keywords = [
"ELT",
"Airbyte",
]
classifiers = [
"Intended Audience :: Developers",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
]
license = "Apache-2.0"

[tool.poetry.dependencies]
python = "<3.12,>=3.8" # airbyte cdk is broken on 3.11 due to mutable defaults in dataclasses, go figure
requests = "^2.25.1"
singer-sdk = { version = "^0.36.0" }
fs-s3fs = { version = "^1.1.1", optional = true }
orjson = "^3.8.3"

[tool.poetry.dev-dependencies]
pytest = "^6.2.5"
tox = "^3.24.4"
flake8 = "^3.9.2"
black = ">=21"
pydocstyle = "^6.1.1"
mypy = "^0.910"
types-requests = "^2.26.1"
isort = "^5.10.1"
python = ">=3.9,<3.11"
singer-sdk = { version="~=0.39.0", extras = [] }
fs-s3fs = { version = "~=1.1.1", optional = true }
orjson = "^3.10.6"
virtualenv = "^20.26.3"

[tool.poetry.group.dev.dependencies]
pytest = ">=8"
singer-sdk = { version="~=0.39.0", extras = ["testing"] }

[tool.poetry.extras]
s3 = ["fs-s3fs"]

[tool.mypy]
python_version = "3.10"
warn_unused_configs = true

[tool.ruff]
src = ["tap_airbyte"]
target-version = "py39"

[tool.ruff.lint]
ignore = [
"ANN101", # missing-type-self
"ANN102", # missing-type-cls
"COM812", # missing-trailing-comma
"ISC001", # single-line-implicit-string-concatenation
]
select = ["ALL"]

[tool.ruff.lint.flake8-annotations]
allow-star-arg-any = true

[tool.ruff.lint.isort]
known-first-party = ["tap_airbyte"]

[tool.black] # https://black.readthedocs.io/en/stable/usage_and_configuration/the_basics.html#configuration-via-a-file
line-length = 100
target-version = ["py39"]
target-version = ["py310"]
preview = true

[tool.isort] # https://pycqa.github.io/isort/docs/configuration/options.html
color_output = true
line_length = 100
profile = "black"
src_paths = "tap_airbyte"

[build-system]
requires = ["poetry-core>=1.0.8"]
requires = ["poetry-core==1.9.0"]
build-backend = "poetry.core.masonry.api"

[tool.poetry.scripts]
Expand Down
139 changes: 98 additions & 41 deletions tap_airbyte/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import orjson
import requests
import singer_sdk._singerlib as singer
import virtualenv
from singer_sdk import Stream, Tap
from singer_sdk import typing as th
from singer_sdk.cli import common_options
Expand Down Expand Up @@ -163,10 +164,26 @@ class TapAirbyte(Tap):
" documentation"
),
),
th.Property(
"skip_native_check",
th.BooleanType,
required=False,
default=False,
description="Disables the check for natively executable sources. By default, AirByte sources are checked "
"to see if they are able to be executed natively without using containers. This disables that "
"check and forces them to run in containers.",
),
th.Property(
"native_source_python",
th.StringType,
required=False,
description="Path to Python executable to use.",
)

).to_dict()
airbyte_mount_dir: str = os.getenv("AIRBYTE_MOUNT_DIR", "/tmp")
pipe_status = None

eof_received = None
# Airbyte image to run
_image: t.Optional[str] = None # type: ignore
_tag: t.Optional[str] = None # type: ignore
Expand Down Expand Up @@ -212,14 +229,14 @@ def cli(cls) -> t.Callable:
context_settings={"help_option_names": ["--help"]},
)
def cli(
version: bool = False,
about: bool = False,
discover: bool = False,
test: bool = False,
config: tuple[str, ...] = (),
state: t.Optional[str] = None,
catalog: t.Optional[str] = None,
format: t.Optional[str] = None,
version: bool = False,
about: bool = False,
discover: bool = False,
test: bool = False,
config: tuple[str, ...] = (),
state: t.Optional[str] = None,
catalog: t.Optional[str] = None,
cli_format: t.Optional[str] = None,
) -> None:
if version:
cls.print_version()
Expand Down Expand Up @@ -255,13 +272,13 @@ def cli(
spec = tap.run_spec()["connectionSpecification"]
except Exception:
cls.logger.info("Tap-Airbyte instantiation failed. Printing basic about info.")
cls.print_about(output_format=format)
cls.print_about(output_format=cli_format)
else:
cls.logger.info(
"Tap-Airbyte instantiation succeeded. Printing spec-enriched about info."
)
cls.config_jsonschema["properties"]["airbyte_config"] = spec
cls.print_about(output_format=format)
cls.print_about(output_format=cli_format)
cls.print_spec_as_config(spec)
return
# End modification
Expand Down Expand Up @@ -310,20 +327,59 @@ def _ensure_oci(self) -> None:
sys.exit(1)
self.logger.info("Successfully executed %s version.", self.container_runtime)

def _ensure_installed(self) -> None:
"""Install the source connector from PyPI if necessary."""
if not self.venv.exists():
subprocess.run(
[sys.executable, "-m", "venv", self.venv],
check=True,
stdout=subprocess.PIPE,
)
if not (self.venv / "bin" / self.source_name).exists():
subprocess.run(
[self.venv / "bin" / "pip", "install", self._get_requirement_string()],
check=True,
stdout=subprocess.PIPE,
)
@property
def native_venv_path(self) -> Path:
"""Get the path to the virtual environment for the connector."""
return Path(__file__).parent.resolve() / f".venv-airbyte-{self.source_name}"

@property
def native_venv_bin_path(self) -> Path:
"""Get the path to the virtual environment for the connector bin."""
return self.native_venv_path / ("Scripts" if sys.platform == "win32" else "bin")

def setup_native_connector_venv(self) -> None:
"""Creates a virtual environment and installs the source connector via PyPI"""
if self.native_venv_path.exists():
self.logger.info("Virtual environment for source already exists.")
return

self.logger.info(
"Creating virtual environment at %s, using %s Python.",
self.native_venv_path,
self.config.get("native_source_python", "default")
)

# Construct the arguments list for virtualenv
args = []

if self.config.get("native_source_python"):
args.append(["-p", self.config["native_source_python"]])
args.append(str(self.native_venv_path))

# Run the virtualenv command
virtualenv.cli_run(args)

self.logger.info(
"Installing %s in the virtual environment..",
self._get_requirement_string()
)

subprocess.run(
[self.native_venv_bin_path / "pip", "install",
self._get_requirement_string()],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)

def _run_pip_check(self) -> str:
process = subprocess.run(
[self.native_venv_bin_path / "pip", "check"],
capture_output=True,
text=True
)

return process.stdout

def _get_requirement_string(self) -> str:
"""Get the requirement string for the source connector."""
Expand All @@ -336,6 +392,8 @@ def _get_requirement_string(self) -> str:
def is_native(self) -> bool:
"""Check if the connector is available on PyPI and can be managed natively without Docker."""
is_native = False
if self.config.get("skip_native_check", False):
return is_native
try:
response = requests.get(
"https://connectors.airbyte.com/files/registries/v0/oss_registry.json",
Expand All @@ -352,13 +410,15 @@ def is_native(self) -> bool:
except Exception:
pass
if is_native:
self._ensure_installed()
self.setup_native_connector_venv()
pip_result = self._run_pip_check()
self.logger.info(f"pip check results: {pip_result}")
else:
self._ensure_oci()
return is_native

def to_command(
self, *airbyte_cmd: str, docker_args: t.Optional[t.List[str]] = None
self, *airbyte_cmd: str, docker_args: t.Optional[t.List[str]] = None
) -> t.List[t.Union[str, Path]]:
"""Construct the command to run the Airbyte connector."""
return (
Expand All @@ -369,7 +429,6 @@ def to_command(
"run",
*(docker_args or []),
f"{self.image}:{self.tag}",
"--",
*airbyte_cmd,
]
)
Expand Down Expand Up @@ -505,10 +564,8 @@ def run_connection_test(self) -> bool:
def run_read(self) -> t.Iterator[subprocess.Popen]:
"""Run the read command for the Airbyte connector."""
with TemporaryDirectory() as host_tmpdir:
with (
open(f"{host_tmpdir}/config.json", "wb") as config,
open(f"{host_tmpdir}/catalog.json", "wb") as catalog,
):
with open(f"{host_tmpdir}/config.json", "wb") as config, open(f"{host_tmpdir}/catalog.json",
"wb") as catalog:
config.write(orjson.dumps(self.config.get("airbyte_config", {})))
catalog.write(orjson.dumps(self.configured_airbyte_catalog))
if self.airbyte_state:
Expand Down Expand Up @@ -727,8 +784,8 @@ def sync_all(self) -> None:
)
stream_buffer.put_nowait(airbyte_message["record"]["data"])
elif airbyte_message["type"] in (
AirbyteMessage.LOG,
AirbyteMessage.TRACE,
AirbyteMessage.LOG,
AirbyteMessage.TRACE,
):
self._process_log_message(airbyte_message)
elif airbyte_message["type"] == AirbyteMessage.STATE:
Expand All @@ -747,7 +804,7 @@ def sync_all(self) -> None:
singer.write_message(singer.StateMessage(self.airbyte_state))
else:
self.logger.warning("Unhandled message: %s", airbyte_message)
# Daemon threads will be terminated when the main thread exits
# Daemon threads will be terminated when the main thread exits,
# so we do not need to wait on them to join after SIGPIPE
if TapAirbyte.pipe_status is not PIPE_CLOSED:
self.logger.info("Waiting for sync threads to finish...")
Expand Down Expand Up @@ -777,13 +834,13 @@ def discover_streams(self) -> t.List["AirbyteStream"]:
if "cursor_field" in stream and isinstance(stream["cursor_field"][0], str):
airbyte_stream.replication_key = stream["cursor_field"][0]
elif (
"source_defined_cursor" in stream
and isinstance(stream["source_defined_cursor"], bool)
and stream["source_defined_cursor"]
"source_defined_cursor" in stream
and isinstance(stream["source_defined_cursor"], bool)
and stream["source_defined_cursor"]
):
# The stream has a source defined cursor. Try using that
if "default_cursor_field" in stream and isinstance(
stream["default_cursor_field"][0], str
stream["default_cursor_field"][0], str
):
airbyte_stream.replication_key = stream["default_cursor_field"][0]
else:
Expand All @@ -797,7 +854,7 @@ def discover_streams(self) -> t.List["AirbyteStream"]:
if "primary_key" in stream and isinstance(stream["primary_key"][0], t.List):
airbyte_stream.primary_keys = stream["primary_key"][0]
elif "source_defined_primary_key" in stream and isinstance(
stream["source_defined_primary_key"][0], t.List
stream["source_defined_primary_key"][0], t.List
):
airbyte_stream.primary_keys = stream["source_defined_primary_key"][0]
except IndexError:
Expand Down Expand Up @@ -843,7 +900,7 @@ def buffer(self) -> Queue:
def get_records(self, context: t.Optional[dict]) -> t.Iterable[dict]:
"""Get records from the stream."""
while (
self.parent.eof_received is False or not self.buffer.empty()
self.parent.eof_received is False or not self.buffer.empty()
) and TapAirbyte.pipe_status is not PIPE_CLOSED:
try:
# The timeout permits the consumer to re-check the producer is alive
Expand Down
Loading
Loading