Skip to content

Commit

Permalink
Fix: Resolve Windows compatibility issues (#100)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored Mar 19, 2024
1 parent 225af9e commit ee47a3b
Show file tree
Hide file tree
Showing 13 changed files with 150 additions and 83 deletions.
9 changes: 6 additions & 3 deletions .github/workflows/python_pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,23 @@ jobs:
run: poetry run pytest -m "not slow and not requires_creds" --durations=5 --exitfirst

pytest:
name: Pytest (All, Python ${{ matrix.python-version }})
name: Pytest (All, Python ${{ matrix.python-version }}, ${{ matrix.os }})
# Don't run on forks
if: github.repository_owner == 'airbytehq'

runs-on: ubuntu-latest
strategy:
matrix:
python-version: [
'3.9',
'3.10',
'3.11',
]
os: [
Ubuntu,
Windows,
]
fail-fast: false

runs-on: "${{ matrix.os }}-latest"
steps:
# Common steps:
- name: Checkout code
Expand Down
24 changes: 18 additions & 6 deletions airbyte/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
from typing import IO, TYPE_CHECKING, Any, NoReturn, cast

from rich import print
from typing_extensions import Literal

from airbyte import exceptions as exc
from airbyte._util.meta import is_windows
from airbyte.sources.registry import ConnectorMetadata


Expand All @@ -23,6 +25,14 @@
_LATEST_VERSION = "latest"


def _get_bin_dir(venv_path: Path, /) -> Path:
"""Get the directory where executables are installed."""
if is_windows():
return venv_path / "Scripts"

return venv_path / "bin"


class Executor(ABC):
def __init__(
self,
Expand Down Expand Up @@ -164,7 +174,13 @@ def _get_venv_path(self) -> Path:
return self.install_root / self._get_venv_name()

def _get_connector_path(self) -> Path:
return self._get_venv_path() / "bin" / self.name
suffix: Literal[".exe", ""] = ".exe" if is_windows() else ""
return _get_bin_dir(self._get_venv_path()) / (self.name + suffix)

@property
def interpreter_path(self) -> Path:
suffix: Literal[".exe", ""] = ".exe" if is_windows() else ""
return _get_bin_dir(self._get_venv_path()) / ("python" + suffix)

def _run_subprocess_and_raise_on_failure(self, args: list[str]) -> None:
result = subprocess.run(
Expand Down Expand Up @@ -202,7 +218,7 @@ def install(self) -> None:
[sys.executable, "-m", "venv", str(self._get_venv_path())]
)

pip_path = str(self._get_venv_path() / "bin" / "pip")
pip_path = str(_get_bin_dir(self._get_venv_path()) / "pip")
print(
f"Installing '{self.name}' into virtual environment '{self._get_venv_path()!s}'.\n"
f"Running 'pip install {self.pip_url}'...\n"
Expand Down Expand Up @@ -281,10 +297,6 @@ def _get_installed_version(

return None

@property
def interpreter_path(self) -> Path:
return self._get_venv_path() / "bin" / "python"

def ensure_installation(
self,
*,
Expand Down
9 changes: 5 additions & 4 deletions airbyte/_processors/sql/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ def _write_files_to_new_table(
batch_id=batch_id,
)
internal_sf_stage_name = f"@%{temp_table_name}"

def path_str(path: Path) -> str:
return str(path.absolute()).replace("\\", "\\\\")

put_files_statements = "\n".join(
[
f"PUT 'file://{file_path.absolute()!s}' {internal_sf_stage_name};"
for file_path in files
]
[f"PUT 'file://{path_str(file_path)}' {internal_sf_stage_name};" for file_path in files]
)
self._execute_sql(put_files_statements)
properties_list: list[str] = list(self._get_stream_properties(stream_name).keys())
Expand Down
4 changes: 4 additions & 0 deletions airbyte/_util/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ def is_langchain() -> bool:
return "langchain_airbyte" in sys.modules


def is_windows() -> bool:
return system() == "Windows"


@lru_cache
def is_colab() -> bool:
return bool(get_colab_release_version())
Expand Down
6 changes: 5 additions & 1 deletion airbyte/caches/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing import Union

from overrides import overrides
from typing_extensions import Literal

from airbyte._processors.sql.duckdb import DuckDBSqlProcessor
from airbyte.caches.base import CacheBase
Expand Down Expand Up @@ -60,5 +61,8 @@ def get_database_name(self) -> str:
if self.db_path == ":memory:":
return "memory"

# Split the path on the appropriate separator ("/" or "\")
split_on: Literal["/", "\\"] = "\\" if "\\" in str(self.db_path) else "/"

# Return the file name without the extension
return str(self.db_path).split("/")[-1].split(".")[0]
return str(self.db_path).split(sep=split_on)[-1].split(".")[0]
6 changes: 3 additions & 3 deletions airbyte/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@


@contextmanager
def as_temp_files(files_contents: list[Any]) -> Generator[list[str], Any, None]:
def as_temp_files(files_contents: list[dict | str]) -> Generator[list[str], Any, None]:
"""Write the given contents to temporary files and yield the file paths as strings."""
temp_files: list[Any] = []
try:
for content in files_contents:
temp_file = tempfile.NamedTemporaryFile(mode="w+t", delete=True)
temp_file = tempfile.NamedTemporaryFile(mode="w+t", delete=False)
temp_file.write(
json.dumps(content) if isinstance(content, dict) else content,
)
Expand All @@ -61,7 +61,7 @@ def as_temp_files(files_contents: list[Any]) -> Generator[list[str], Any, None]:
finally:
for temp_file in temp_files:
with suppress(Exception):
temp_file.close()
temp_file.unlink()


class Source:
Expand Down
15 changes: 14 additions & 1 deletion airbyte/sources/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

import shutil
import sys
import warnings
from pathlib import Path
from typing import Any
Expand Down Expand Up @@ -86,9 +87,21 @@ def get_source(
# Assume this is a path
local_executable = Path(local_executable).absolute()
else:
which_executable: str | None = None
which_executable = shutil.which(local_executable)
if not which_executable and sys.platform == "win32":
# Try with the .exe extension
local_executable = f"{local_executable}.exe"
which_executable = shutil.which(local_executable)

if which_executable is None:
raise FileNotFoundError(local_executable)
raise exc.AirbyteConnectorExecutableNotFoundError(
connector_name=name,
context={
"executable": local_executable,
"working_directory": Path.cwd().absolute(),
},
) from FileNotFoundError(local_executable)
local_executable = Path(which_executable).absolute()

print(f"Using local `{name}` executable: {local_executable!s}")
Expand Down
3 changes: 2 additions & 1 deletion airbyte/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import airbyte as ab
from airbyte import exceptions as exc
from airbyte._executor import _get_bin_dir


def _parse_args() -> argparse.Namespace:
Expand Down Expand Up @@ -128,7 +129,7 @@ def validate(connector_dir: str, sample_config: str, *, validate_install_only: b
if not venv_path.exists():
_run_subprocess_and_raise_on_failure([sys.executable, "-m", "venv", venv_name])

pip_path = str(venv_path / "bin" / "pip")
pip_path = str(_get_bin_dir(Path(venv_path)) / "pip")

_run_subprocess_and_raise_on_failure([pip_path, "install", connector_dir])

Expand Down
58 changes: 39 additions & 19 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@
import json
import logging
import os
from pathlib import Path
import shutil
import socket
import subprocess
import time
from requests.exceptions import HTTPError

import ulid
from airbyte._util.google_secrets import get_gcp_secret
from airbyte._util.meta import is_windows
from airbyte.caches.base import CacheBase
from airbyte.caches.bigquery import BigQueryCache
from airbyte.caches.duckdb import DuckDBCache
Expand All @@ -22,11 +25,10 @@
import psycopg2 as psycopg
import pytest
from _pytest.nodes import Item
from google.cloud import secretmanager
from pytest_docker.plugin import get_docker_ip
from sqlalchemy import create_engine

from airbyte.caches import PostgresCache
from airbyte._executor import _get_bin_dir
from airbyte.caches.util import new_local_cache
from airbyte.sources.base import as_temp_files

Expand Down Expand Up @@ -84,6 +86,12 @@ def test_priority(item: Item) -> int:
# Sort the items list in-place based on the test_priority function
items.sort(key=test_priority)

for item in items:
# Skip tests that require Docker if Docker is not available (including on Windows).
if "new_postgres_cache" in item.fixturenames or "postgres_cache" in item.fixturenames:
if True or not is_docker_available():
item.add_marker(pytest.mark.skip(reason="Skipping tests (Docker not available)"))


def is_port_in_use(port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
Expand Down Expand Up @@ -121,12 +129,27 @@ def test_pg_connection(host) -> bool:
return False


def is_docker_available():
if is_windows():
# Linux containers are not supported on Windows CI runners
return False
try:
_ = docker.from_env()
return True
except docker.errors.DockerException:
return False


@pytest.fixture(scope="session")
def pg_dsn():
def new_postgres_cache():
"""Fixture to return a fresh Postgres cache.
Each test that uses this fixture will get a unique table prefix.
"""
client = docker.from_env()
try:
client.images.get(PYTEST_POSTGRES_IMAGE)
except docker.errors.ImageNotFound:
except (docker.errors.ImageNotFound, HTTPError):
# Pull the image if it doesn't exist, to avoid failing our sleep timer
# if the image needs to download on-demand.
client.images.pull(PYTEST_POSTGRES_IMAGE)
Expand Down Expand Up @@ -170,20 +193,8 @@ def pg_dsn():
if final_host is None:
raise Exception(f"Failed to connect to the PostgreSQL database on host {host}.")

yield final_host
# Stop and remove the container after the tests are done
postgres.stop()
postgres.remove()


@pytest.fixture
def new_pg_cache(pg_dsn):
"""Fixture to return a fresh cache.
Each test that uses this fixture will get a unique table prefix.
"""
config = PostgresCache(
host=pg_dsn,
host=final_host,
port=PYTEST_POSTGRES_PORT,
username="postgres",
password="postgres",
Expand All @@ -195,6 +206,10 @@ def new_pg_cache(pg_dsn):
)
yield config

# Stop and remove the container after the tests are done
postgres.stop()
postgres.remove()


@pytest.fixture
def new_snowflake_cache():
Expand Down Expand Up @@ -284,7 +299,8 @@ def source_test_installation():
shutil.rmtree(venv_dir)

subprocess.run(["python", "-m", "venv", venv_dir], check=True)
subprocess.run([f"{venv_dir}/bin/pip", "install", "-e", "./tests/integration_tests/fixtures/source-test"], check=True)
pip_path = str(_get_bin_dir(Path(venv_dir)) / "pip")
subprocess.run([pip_path, "install", "-e", "./tests/integration_tests/fixtures/source-test"], check=True)

yield

Expand All @@ -311,9 +327,13 @@ def pytest_generate_tests(metafunc: pytest.Metafunc) -> None:
all_cache_type_fixtures: dict[str, str] = {
"BigQuery": "new_bigquery_cache",
"DuckDB": "new_duckdb_cache",
"Postgres": "new_pg_cache",
"Postgres": "new_postgres_cache",
"Snowflake": "new_snowflake_cache",
}
if is_windows():
# Postgres tests require Linux containers
all_cache_type_fixtures.pop("Postgres")

if "new_generic_cache" in metafunc.fixturenames:
metafunc.parametrize(
"new_generic_cache",
Expand Down
6 changes: 4 additions & 2 deletions tests/integration_tests/test_all_cache_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
from __future__ import annotations

import os
from pathlib import Path
import sys

import pytest

import airbyte as ab
from airbyte._executor import _get_bin_dir


# Product count is always the same, regardless of faker scale.
Expand All @@ -32,10 +34,10 @@
@pytest.fixture(autouse=True)
def add_venv_bin_to_path(monkeypatch):
# Get the path to the bin directory of the virtual environment
venv_bin_path = os.path.join(sys.prefix, 'bin')
venv_bin_path = str(_get_bin_dir(Path(sys.prefix)))

# Add the bin directory to the PATH
new_path = f"{venv_bin_path}:{os.environ['PATH']}"
new_path = f"{venv_bin_path}{os.pathsep}{os.environ['PATH']}"
monkeypatch.setenv('PATH', new_path)


Expand Down
Loading

0 comments on commit ee47a3b

Please sign in to comment.