diff --git a/.github/workflows/python_pytest.yml b/.github/workflows/python_pytest.yml index 12c126e5..5787c344 100644 --- a/.github/workflows/python_pytest.yml +++ b/.github/workflows/python_pytest.yml @@ -43,11 +43,9 @@ jobs: run: poetry run pytest -m "not slow and not requires_creds" --durations=5 --exitfirst pytest: - name: Pytest (All, Python ${{ matrix.python-version }}) + name: Pytest (All, Python ${{ matrix.python-version }}, ${{ matrix.os }}) # Don't run on forks if: github.repository_owner == 'airbytehq' - - runs-on: ubuntu-latest strategy: matrix: python-version: [ @@ -55,8 +53,13 @@ jobs: '3.10', '3.11', ] + os: [ + Ubuntu, + Windows, + ] fail-fast: false + runs-on: "${{ matrix.os }}-latest" steps: # Common steps: - name: Checkout code diff --git a/airbyte/_executor.py b/airbyte/_executor.py index 5bc36bbc..03f4087b 100644 --- a/airbyte/_executor.py +++ b/airbyte/_executor.py @@ -11,8 +11,10 @@ from typing import IO, TYPE_CHECKING, Any, NoReturn, cast from rich import print +from typing_extensions import Literal from airbyte import exceptions as exc +from airbyte._util.meta import is_windows from airbyte.sources.registry import ConnectorMetadata @@ -23,6 +25,14 @@ _LATEST_VERSION = "latest" +def _get_bin_dir(venv_path: Path, /) -> Path: + """Get the directory where executables are installed.""" + if is_windows(): + return venv_path / "Scripts" + + return venv_path / "bin" + + class Executor(ABC): def __init__( self, @@ -164,7 +174,13 @@ def _get_venv_path(self) -> Path: return self.install_root / self._get_venv_name() def _get_connector_path(self) -> Path: - return self._get_venv_path() / "bin" / self.name + suffix: Literal[".exe", ""] = ".exe" if is_windows() else "" + return _get_bin_dir(self._get_venv_path()) / (self.name + suffix) + + @property + def interpreter_path(self) -> Path: + suffix: Literal[".exe", ""] = ".exe" if is_windows() else "" + return _get_bin_dir(self._get_venv_path()) / ("python" + suffix) def _run_subprocess_and_raise_on_failure(self, args: list[str]) -> None: result = subprocess.run( @@ -202,7 +218,7 @@ def install(self) -> None: [sys.executable, "-m", "venv", str(self._get_venv_path())] ) - pip_path = str(self._get_venv_path() / "bin" / "pip") + pip_path = str(_get_bin_dir(self._get_venv_path()) / "pip") print( f"Installing '{self.name}' into virtual environment '{self._get_venv_path()!s}'.\n" f"Running 'pip install {self.pip_url}'...\n" @@ -281,10 +297,6 @@ def _get_installed_version( return None - @property - def interpreter_path(self) -> Path: - return self._get_venv_path() / "bin" / "python" - def ensure_installation( self, *, diff --git a/airbyte/_processors/sql/snowflake.py b/airbyte/_processors/sql/snowflake.py index 7806d91f..9f0fa863 100644 --- a/airbyte/_processors/sql/snowflake.py +++ b/airbyte/_processors/sql/snowflake.py @@ -60,11 +60,12 @@ def _write_files_to_new_table( batch_id=batch_id, ) internal_sf_stage_name = f"@%{temp_table_name}" + + def path_str(path: Path) -> str: + return str(path.absolute()).replace("\\", "\\\\") + put_files_statements = "\n".join( - [ - f"PUT 'file://{file_path.absolute()!s}' {internal_sf_stage_name};" - for file_path in files - ] + [f"PUT 'file://{path_str(file_path)}' {internal_sf_stage_name};" for file_path in files] ) self._execute_sql(put_files_statements) properties_list: list[str] = list(self._get_stream_properties(stream_name).keys()) diff --git a/airbyte/_util/meta.py b/airbyte/_util/meta.py index ffd9ee55..bdd768e4 100644 --- a/airbyte/_util/meta.py +++ b/airbyte/_util/meta.py @@ -44,6 +44,10 @@ def is_langchain() -> bool: return "langchain_airbyte" in sys.modules +def is_windows() -> bool: + return system() == "Windows" + + @lru_cache def is_colab() -> bool: return bool(get_colab_release_version()) diff --git a/airbyte/caches/duckdb.py b/airbyte/caches/duckdb.py index 5ed2abc2..1bbaf550 100644 --- a/airbyte/caches/duckdb.py +++ b/airbyte/caches/duckdb.py @@ -20,6 +20,7 @@ from typing import Union from overrides import overrides +from typing_extensions import Literal from airbyte._processors.sql.duckdb import DuckDBSqlProcessor from airbyte.caches.base import CacheBase @@ -60,5 +61,8 @@ def get_database_name(self) -> str: if self.db_path == ":memory:": return "memory" + # Split the path on the appropriate separator ("/" or "\") + split_on: Literal["/", "\\"] = "\\" if "\\" in str(self.db_path) else "/" + # Return the file name without the extension - return str(self.db_path).split("/")[-1].split(".")[0] + return str(self.db_path).split(sep=split_on)[-1].split(".")[0] diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index 26ed1a0f..7ce37070 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -46,12 +46,12 @@ @contextmanager -def as_temp_files(files_contents: list[Any]) -> Generator[list[str], Any, None]: +def as_temp_files(files_contents: list[dict | str]) -> Generator[list[str], Any, None]: """Write the given contents to temporary files and yield the file paths as strings.""" temp_files: list[Any] = [] try: for content in files_contents: - temp_file = tempfile.NamedTemporaryFile(mode="w+t", delete=True) + temp_file = tempfile.NamedTemporaryFile(mode="w+t", delete=False) temp_file.write( json.dumps(content) if isinstance(content, dict) else content, ) @@ -61,7 +61,7 @@ def as_temp_files(files_contents: list[Any]) -> Generator[list[str], Any, None]: finally: for temp_file in temp_files: with suppress(Exception): - temp_file.close() + temp_file.unlink() class Source: diff --git a/airbyte/sources/util.py b/airbyte/sources/util.py index fb37cdf1..c21fb427 100644 --- a/airbyte/sources/util.py +++ b/airbyte/sources/util.py @@ -4,6 +4,7 @@ from __future__ import annotations import shutil +import sys import warnings from pathlib import Path from typing import Any @@ -86,9 +87,21 @@ def get_source( # Assume this is a path local_executable = Path(local_executable).absolute() else: + which_executable: str | None = None which_executable = shutil.which(local_executable) + if not which_executable and sys.platform == "win32": + # Try with the .exe extension + local_executable = f"{local_executable}.exe" + which_executable = shutil.which(local_executable) + if which_executable is None: - raise FileNotFoundError(local_executable) + raise exc.AirbyteConnectorExecutableNotFoundError( + connector_name=name, + context={ + "executable": local_executable, + "working_directory": Path.cwd().absolute(), + }, + ) from FileNotFoundError(local_executable) local_executable = Path(which_executable).absolute() print(f"Using local `{name}` executable: {local_executable!s}") diff --git a/airbyte/validate.py b/airbyte/validate.py index dea7d2f4..89789a80 100644 --- a/airbyte/validate.py +++ b/airbyte/validate.py @@ -18,6 +18,7 @@ import airbyte as ab from airbyte import exceptions as exc +from airbyte._executor import _get_bin_dir def _parse_args() -> argparse.Namespace: @@ -128,7 +129,7 @@ def validate(connector_dir: str, sample_config: str, *, validate_install_only: b if not venv_path.exists(): _run_subprocess_and_raise_on_failure([sys.executable, "-m", "venv", venv_name]) - pip_path = str(venv_path / "bin" / "pip") + pip_path = str(_get_bin_dir(Path(venv_path)) / "pip") _run_subprocess_and_raise_on_failure([pip_path, "install", connector_dir]) diff --git a/tests/conftest.py b/tests/conftest.py index 21ab5825..88dcf215 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,13 +6,16 @@ import json import logging import os +from pathlib import Path import shutil import socket import subprocess import time +from requests.exceptions import HTTPError import ulid from airbyte._util.google_secrets import get_gcp_secret +from airbyte._util.meta import is_windows from airbyte.caches.base import CacheBase from airbyte.caches.bigquery import BigQueryCache from airbyte.caches.duckdb import DuckDBCache @@ -22,11 +25,10 @@ import psycopg2 as psycopg import pytest from _pytest.nodes import Item -from google.cloud import secretmanager -from pytest_docker.plugin import get_docker_ip from sqlalchemy import create_engine from airbyte.caches import PostgresCache +from airbyte._executor import _get_bin_dir from airbyte.caches.util import new_local_cache from airbyte.sources.base import as_temp_files @@ -84,6 +86,12 @@ def test_priority(item: Item) -> int: # Sort the items list in-place based on the test_priority function items.sort(key=test_priority) + for item in items: + # Skip tests that require Docker if Docker is not available (including on Windows). + if "new_postgres_cache" in item.fixturenames or "postgres_cache" in item.fixturenames: + if True or not is_docker_available(): + item.add_marker(pytest.mark.skip(reason="Skipping tests (Docker not available)")) + def is_port_in_use(port): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: @@ -121,12 +129,27 @@ def test_pg_connection(host) -> bool: return False +def is_docker_available(): + if is_windows(): + # Linux containers are not supported on Windows CI runners + return False + try: + _ = docker.from_env() + return True + except docker.errors.DockerException: + return False + + @pytest.fixture(scope="session") -def pg_dsn(): +def new_postgres_cache(): + """Fixture to return a fresh Postgres cache. + + Each test that uses this fixture will get a unique table prefix. + """ client = docker.from_env() try: client.images.get(PYTEST_POSTGRES_IMAGE) - except docker.errors.ImageNotFound: + except (docker.errors.ImageNotFound, HTTPError): # Pull the image if it doesn't exist, to avoid failing our sleep timer # if the image needs to download on-demand. client.images.pull(PYTEST_POSTGRES_IMAGE) @@ -170,20 +193,8 @@ def pg_dsn(): if final_host is None: raise Exception(f"Failed to connect to the PostgreSQL database on host {host}.") - yield final_host - # Stop and remove the container after the tests are done - postgres.stop() - postgres.remove() - - -@pytest.fixture -def new_pg_cache(pg_dsn): - """Fixture to return a fresh cache. - - Each test that uses this fixture will get a unique table prefix. - """ config = PostgresCache( - host=pg_dsn, + host=final_host, port=PYTEST_POSTGRES_PORT, username="postgres", password="postgres", @@ -195,6 +206,10 @@ def new_pg_cache(pg_dsn): ) yield config + # Stop and remove the container after the tests are done + postgres.stop() + postgres.remove() + @pytest.fixture def new_snowflake_cache(): @@ -284,7 +299,8 @@ def source_test_installation(): shutil.rmtree(venv_dir) subprocess.run(["python", "-m", "venv", venv_dir], check=True) - subprocess.run([f"{venv_dir}/bin/pip", "install", "-e", "./tests/integration_tests/fixtures/source-test"], check=True) + pip_path = str(_get_bin_dir(Path(venv_dir)) / "pip") + subprocess.run([pip_path, "install", "-e", "./tests/integration_tests/fixtures/source-test"], check=True) yield @@ -311,9 +327,13 @@ def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: all_cache_type_fixtures: dict[str, str] = { "BigQuery": "new_bigquery_cache", "DuckDB": "new_duckdb_cache", - "Postgres": "new_pg_cache", + "Postgres": "new_postgres_cache", "Snowflake": "new_snowflake_cache", } + if is_windows(): + # Postgres tests require Linux containers + all_cache_type_fixtures.pop("Postgres") + if "new_generic_cache" in metafunc.fixturenames: metafunc.parametrize( "new_generic_cache", diff --git a/tests/integration_tests/test_all_cache_types.py b/tests/integration_tests/test_all_cache_types.py index 7560002f..d2462186 100644 --- a/tests/integration_tests/test_all_cache_types.py +++ b/tests/integration_tests/test_all_cache_types.py @@ -8,11 +8,13 @@ from __future__ import annotations import os +from pathlib import Path import sys import pytest import airbyte as ab +from airbyte._executor import _get_bin_dir # Product count is always the same, regardless of faker scale. @@ -32,10 +34,10 @@ @pytest.fixture(autouse=True) def add_venv_bin_to_path(monkeypatch): # Get the path to the bin directory of the virtual environment - venv_bin_path = os.path.join(sys.prefix, 'bin') + venv_bin_path = str(_get_bin_dir(Path(sys.prefix))) # Add the bin directory to the PATH - new_path = f"{venv_bin_path}:{os.environ['PATH']}" + new_path = f"{venv_bin_path}{os.pathsep}{os.environ['PATH']}" monkeypatch.setenv('PATH', new_path) diff --git a/tests/integration_tests/test_duckdb_cache.py b/tests/integration_tests/test_duckdb_cache.py index a940f0ca..abdcaf11 100644 --- a/tests/integration_tests/test_duckdb_cache.py +++ b/tests/integration_tests/test_duckdb_cache.py @@ -9,6 +9,7 @@ from collections.abc import Generator import os +from pathlib import Path import sys import shutil @@ -17,6 +18,7 @@ import airbyte as ab +from airbyte._executor import _get_bin_dir from airbyte.caches.duckdb import DuckDBCache from airbyte.caches.util import new_local_cache, get_default_cache @@ -37,10 +39,10 @@ @pytest.fixture(autouse=True) def add_venv_bin_to_path(monkeypatch): # Get the path to the bin directory of the virtual environment - venv_bin_path = os.path.join(sys.prefix, 'bin') + venv_bin_path = str(_get_bin_dir(Path(sys.prefix))) # Add the bin directory to the PATH - new_path = f"{venv_bin_path}:{os.environ['PATH']}" + new_path = f"{venv_bin_path}{os.pathsep}{os.environ['PATH']}" monkeypatch.setenv('PATH', new_path) @@ -76,12 +78,6 @@ def duckdb_cache() -> Generator[DuckDBCache, None, None]: return -def test_which_source_faker() -> None: - """Test that source-faker is available on PATH.""" - assert shutil.which("source-faker") is not None, \ - f"Can't find source-faker on PATH: {os.environ['PATH']}" - - def test_duckdb_cache(duckdb_cache: DuckDBCache) -> None: """Test that the duckdb cache is available.""" assert duckdb_cache diff --git a/tests/integration_tests/test_source_faker_integration.py b/tests/integration_tests/test_source_faker_integration.py index da23d83f..93ff3e41 100644 --- a/tests/integration_tests/test_source_faker_integration.py +++ b/tests/integration_tests/test_source_faker_integration.py @@ -18,6 +18,7 @@ import airbyte as ab +from airbyte._executor import _get_bin_dir from airbyte.caches.base import CacheBase from airbyte.caches.duckdb import DuckDBCache from airbyte.caches.postgres import PostgresCache @@ -40,10 +41,10 @@ @pytest.fixture(autouse=True) def add_venv_bin_to_path(monkeypatch): # Get the path to the bin directory of the virtual environment - venv_bin_path = os.path.join(sys.prefix, 'bin') + venv_bin_path = str(_get_bin_dir(Path(sys.prefix))) # Add the bin directory to the PATH - new_path = f"{venv_bin_path}:{os.environ['PATH']}" + new_path = f"{venv_bin_path}{os.pathsep}{os.environ['PATH']}" monkeypatch.setenv('PATH', new_path) @@ -101,9 +102,9 @@ def duckdb_cache() -> Generator[DuckDBCache, None, None]: @pytest.fixture(scope="function") -def postgres_cache(new_pg_cache) -> Generator[PostgresCache, None, None]: +def postgres_cache(new_postgres_cache) -> Generator[PostgresCache, None, None]: """Fixture to return a fresh cache.""" - yield new_pg_cache + yield new_postgres_cache # TODO: Delete cache DB file after test is complete. return @@ -122,7 +123,7 @@ def all_cache_types( def test_which_source_faker() -> None: """Test that source-faker is available on PATH.""" - assert shutil.which("source-faker") is not None, \ + assert shutil.which("source-faker"), \ f"Can't find source-faker on PATH: {os.environ['PATH']}" diff --git a/tests/integration_tests/test_source_test_fixture.py b/tests/integration_tests/test_source_test_fixture.py index 116ce871..04c3d8e6 100644 --- a/tests/integration_tests/test_source_test_fixture.py +++ b/tests/integration_tests/test_source_test_fixture.py @@ -5,7 +5,7 @@ import os import shutil import itertools -from contextlib import nullcontext as does_not_raise +from contextlib import nullcontext as does_not_raise, suppress from typing import Any from unittest.mock import Mock, call, patch import tempfile @@ -24,6 +24,7 @@ from airbyte.version import get_version from airbyte.results import ReadResult from airbyte.datasets import CachedDataset, LazyDataset, SQLDataset +from airbyte._executor import _get_bin_dir import airbyte as ab from airbyte.results import ReadResult @@ -186,22 +187,27 @@ def test_check_fail(): def test_file_write_and_cleanup() -> None: """Ensure files are written to the correct location and cleaned up afterwards.""" - with tempfile.TemporaryDirectory() as temp_dir_1, tempfile.TemporaryDirectory() as temp_dir_2: - cache_w_cleanup = ab.new_local_cache(cache_dir=temp_dir_1, cleanup=True) - cache_wo_cleanup = ab.new_local_cache(cache_dir=temp_dir_2, cleanup=False) + temp_dir_root = Path(tempfile.mkdtemp()) + temp_dir_1 = temp_dir_root / "cache_1" + temp_dir_2 = temp_dir_root / "cache_2" - source = ab.get_source("source-test", config={"apiKey": "test"}) - source.select_all_streams() + cache_w_cleanup = ab.new_local_cache(cache_dir=temp_dir_1, cleanup=True) + cache_wo_cleanup = ab.new_local_cache(cache_dir=temp_dir_2, cleanup=False) - _ = source.read(cache_w_cleanup) - _ = source.read(cache_wo_cleanup) + source = ab.get_source("source-test", config={"apiKey": "test"}) + source.select_all_streams() - # We expect all files to be cleaned up: - assert len(list(Path(temp_dir_1).glob("*.jsonl.gz"))) == 0, "Expected files to be cleaned up" + _ = source.read(cache_w_cleanup) + _ = source.read(cache_wo_cleanup) - # There are three streams, but only two of them have data: - assert len(list(Path(temp_dir_2).glob("*.jsonl.gz"))) == 2, "Expected files to exist" + # We expect all files to be cleaned up: + assert len(list(Path(temp_dir_1).glob("*.jsonl.gz"))) == 0, "Expected files to be cleaned up" + # There are three streams, but only two of them have data: + assert len(list(Path(temp_dir_2).glob("*.jsonl.gz"))) == 2, "Expected files to exist" + + with suppress(Exception): + shutil.rmtree(str(temp_dir_root)) def assert_cache_data(expected_test_stream_data: dict[str, list[dict[str, str | int]]], cache: SqlProcessorBase, streams: list[str] = None): for stream_name in streams or expected_test_stream_data.keys(): @@ -580,7 +586,7 @@ def test_check_fail_on_missing_config(method_call): with pytest.raises(exc.AirbyteConnectorConfigurationMissingError): method_call(source) -def test_sync_with_merge_to_postgres(new_pg_cache: PostgresCache, expected_test_stream_data: dict[str, list[dict[str, str | int]]]): +def test_sync_with_merge_to_postgres(new_postgres_cache: PostgresCache, expected_test_stream_data: dict[str, list[dict[str, str | int]]]): """Test that the merge strategy works as expected. In this test, we sync the same data twice. If the data is not duplicated, we assume @@ -592,12 +598,12 @@ def test_sync_with_merge_to_postgres(new_pg_cache: PostgresCache, expected_test_ source.select_all_streams() # Read twice to test merge strategy - result: ReadResult = source.read(new_pg_cache) - result: ReadResult = source.read(new_pg_cache) + result: ReadResult = source.read(new_postgres_cache) + result: ReadResult = source.read(new_postgres_cache) assert result.processed_records == 3 for stream_name, expected_data in expected_test_stream_data.items(): - if len(new_pg_cache[stream_name]) > 0: + if len(new_postgres_cache[stream_name]) > 0: pd.testing.assert_frame_equal( result[stream_name].to_pandas(), pd.DataFrame(expected_data), @@ -617,17 +623,17 @@ def test_airbyte_version() -> None: def test_sync_to_postgres( - new_pg_cache: PostgresCache, + new_postgres_cache: PostgresCache, expected_test_stream_data: dict[str, list[dict[str, str | int]]], ) -> None: source = ab.get_source("source-test", config={"apiKey": "test"}) source.select_all_streams() - result: ReadResult = source.read(new_pg_cache) + result: ReadResult = source.read(new_postgres_cache) assert result.processed_records == 3 for stream_name, expected_data in expected_test_stream_data.items(): - if len(new_pg_cache[stream_name]) > 0: + if len(new_postgres_cache[stream_name]) > 0: pd.testing.assert_frame_equal( result[stream_name].to_pandas(), pd.DataFrame(expected_data), @@ -690,17 +696,21 @@ def test_failing_path_connector(): with pytest.raises(Exception): ab.get_source("source-test", config={"apiKey": "test"}, use_local_install=True) -def test_succeeding_path_connector(): - new_path = f"{os.path.abspath('.venv-source-test/bin')}:{os.environ['PATH']}" +def test_succeeding_path_connector(monkeypatch): + venv_bin_path = str(_get_bin_dir(Path(".venv-source-test"))) + + # Add the bin directory to the PATH + new_path = f"{venv_bin_path}{os.pathsep}{os.environ['PATH']}" # Patch the PATH env var to include the test venv bin folder - with patch.dict(os.environ, {"PATH": new_path}): - source = ab.get_source( - "source-test", - config={"apiKey": "test"}, - local_executable="source-test", - ) - source.check() + monkeypatch.setenv('PATH', new_path) + + source = ab.get_source( + "source-test", + config={"apiKey": "test"}, + local_executable="source-test", + ) + source.check() def test_install_uninstall(): with tempfile.TemporaryDirectory() as temp_dir: @@ -728,11 +738,11 @@ def test_install_uninstall(): source.install() assert os.path.exists(install_root / ".venv-source-test") - assert os.path.exists(install_root / ".venv-source-test/bin/source-test") + assert os.path.exists(_get_bin_dir(install_root / ".venv-source-test")) source.check() source.uninstall() assert not os.path.exists(install_root / ".venv-source-test") - assert not os.path.exists(install_root / ".venv-source-test/bin/source-test") + assert not os.path.exists(_get_bin_dir(install_root / ".venv-source-test"))