Skip to content

Commit

Permalink
AirbyteLib: add SQLCaches for DuckDB and Postgres (includes Ruff+Mypy…
Browse files Browse the repository at this point in the history
… cleanup) (#33607)
  • Loading branch information
aaronsteers authored Jan 12, 2024
1 parent f637e11 commit 99a23dc
Show file tree
Hide file tree
Showing 49 changed files with 4,925 additions and 371 deletions.
15 changes: 9 additions & 6 deletions airbyte-lib/airbyte_lib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from airbyte_lib._factories.cache_factories import get_default_cache, new_local_cache
from airbyte_lib._factories.connector_factories import get_connector
from airbyte_lib.datasets import CachedDataset
from airbyte_lib.results import ReadResult
from airbyte_lib.source import Source

from .factories import (get_connector, get_in_memory_cache)
from .sync_result import (Dataset, SyncResult)
from .source import (Source)

__all__ = [
"get_connector",
"get_in_memory_cache",
"Dataset",
"SyncResult",
"get_default_cache",
"new_local_cache",
"CachedDataset",
"ReadResult",
"Source",
]
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import subprocess
import sys
from abc import ABC, abstractmethod
from collections.abc import Generator, Iterable, Iterator
from contextlib import contextmanager
from pathlib import Path
from typing import IO, Generator, Iterable, List
from typing import IO, Any, NoReturn

from airbyte_lib.registry import ConnectorMetadata


_LATEST_VERSION = "latest"


Expand All @@ -27,32 +29,32 @@ def __init__(
self.target_version = target_version

@abstractmethod
def execute(self, args: List[str]) -> Iterable[str]:
def execute(self, args: list[str]) -> Iterator[str]:
pass

@abstractmethod
def ensure_installation(self):
def ensure_installation(self) -> None:
pass

@abstractmethod
def install(self):
def install(self) -> None:
pass

@abstractmethod
def uninstall(self):
def uninstall(self) -> None:
pass


@contextmanager
def _stream_from_subprocess(args: List[str]) -> Generator[Iterable[str], None, None]:
def _stream_from_subprocess(args: list[str]) -> Generator[Iterable[str], None, None]:
process = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
)

def _stream_from_file(file: IO[str]):
def _stream_from_file(file: IO[str]) -> Generator[str, Any, None]:
while True:
line = file.readline()
if not line:
Expand Down Expand Up @@ -102,31 +104,31 @@ def __init__(
# TODO: Replace with `f"airbyte-{self.metadata.name}"`
self.pip_url = pip_url or f"../airbyte-integrations/connectors/{self.metadata.name}"

def _get_venv_name(self):
def _get_venv_name(self) -> str:
return f".venv-{self.metadata.name}"

def _get_connector_path(self):
def _get_connector_path(self) -> Path:
return Path(self._get_venv_name(), "bin", self.metadata.name)

def _run_subprocess_and_raise_on_failure(self, args: List[str]):
result = subprocess.run(args)
def _run_subprocess_and_raise_on_failure(self, args: list[str]) -> None:
result = subprocess.run(args, check=False)
if result.returncode != 0:
raise Exception(f"Install process exited with code {result.returncode}")

def uninstall(self):
def uninstall(self) -> None:
venv_name = self._get_venv_name()
if os.path.exists(venv_name):
self._run_subprocess_and_raise_on_failure(["rm", "-rf", venv_name])

def install(self):
def install(self) -> None:
venv_name = self._get_venv_name()
self._run_subprocess_and_raise_on_failure([sys.executable, "-m", "venv", venv_name])

pip_path = os.path.join(venv_name, "bin", "pip")

self._run_subprocess_and_raise_on_failure([pip_path, "install", "-e", self.pip_url])

def _get_installed_version(self):
def _get_installed_version(self) -> str:
"""
In the venv, run the following: python -c "from importlib.metadata import version; print(version('<connector-name>'))"
"""
Expand All @@ -143,7 +145,7 @@ def _get_installed_version(self):

def ensure_installation(
self,
):
) -> None:
"""
Ensure that the connector is installed in a virtual environment.
If not yet installed and if install_if_missing is True, then install.
Expand All @@ -157,13 +159,15 @@ def ensure_installation(
venv_path = Path(venv_name)
if not venv_path.exists():
if not self.install_if_missing:
raise Exception(f"Connector {self.metadata.name} is not available - venv {venv_name} does not exist")
raise Exception(
f"Connector {self.metadata.name} is not available - venv {venv_name} does not exist"
)
self.install()

connector_path = self._get_connector_path()
if not connector_path.exists():
raise FileNotFoundError(
f"Could not find connector '{self.metadata.name}' " f"in venv '{venv_name}' with connector path '{connector_path}'."
f"Could not find connector '{self.metadata.name}' in venv '{venv_name}' with connector path '{connector_path}'.",
)

if self.enforce_version:
Expand All @@ -176,29 +180,33 @@ def ensure_installation(
version_after_install = self._get_installed_version()
if version_after_install != self.target_version:
raise Exception(
f"Failed to install connector {self.metadata.name} version {self.target_version}. Installed version is {version_after_install}"
f"Failed to install connector {self.metadata.name} version {self.target_version}. Installed version is {version_after_install}",
)

def execute(self, args: List[str]) -> Iterable[str]:
def execute(self, args: list[str]) -> Iterator[str]:
connector_path = self._get_connector_path()

with _stream_from_subprocess([str(connector_path)] + args) as stream:
yield from stream


class PathExecutor(Executor):
def ensure_installation(self):
def ensure_installation(self) -> None:
try:
self.execute(["spec"])
except Exception as e:
raise Exception(f"Connector {self.metadata.name} is not available - executing it failed: {e}")
raise Exception(
f"Connector {self.metadata.name} is not available - executing it failed: {e}"
)

def install(self):
def install(self) -> NoReturn:
raise Exception(f"Connector {self.metadata.name} is not available - cannot install it")

def uninstall(self):
raise Exception(f"Connector {self.metadata.name} is installed manually and not managed by airbyte-lib - please remove it manually")
def uninstall(self) -> NoReturn:
raise Exception(
f"Connector {self.metadata.name} is installed manually and not managed by airbyte-lib - please remove it manually"
)

def execute(self, args: List[str]) -> Iterable[str]:
def execute(self, args: list[str]) -> Iterator[str]:
with _stream_from_subprocess([self.metadata.name] + args) as stream:
yield from stream
Empty file.
58 changes: 58 additions & 0 deletions airbyte-lib/airbyte_lib/_factories/cache_factories.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.


from pathlib import Path

import ulid

from airbyte_lib.caches.duckdb import DuckDBCache, DuckDBCacheConfig


def get_default_cache() -> DuckDBCache:
"""Get a local cache for storing data, using the default database path.
Cache files are stored in the `.cache` directory, relative to the current
working directory.
"""
config = DuckDBCacheConfig(
db_path="./.cache/default_cache_db.duckdb",
)
return DuckDBCache(config=config)


def new_local_cache(
cache_name: str | None = None,
cache_dir: str | Path | None = None,
cleanup: bool = True,
) -> DuckDBCache:
"""Get a local cache for storing data, using a name string to seed the path.
Args:
cache_name: Name to use for the cache. Defaults to None.
root_dir: Root directory to store the cache in. Defaults to None.
cleanup: Whether to clean up temporary files. Defaults to True.
Cache files are stored in the `.cache` directory, relative to the current
working directory.
"""
if cache_name:
if " " in cache_name:
raise ValueError(f"Cache name '{cache_name}' cannot contain spaces")

if not cache_name.replace("_", "").isalnum():
raise ValueError(
f"Cache name '{cache_name}' can only contain alphanumeric "
"characters and underscores."
)

cache_name = cache_name or str(ulid.ULID())
cache_dir = cache_dir or Path(f"./.cache/{cache_name}")
if not isinstance(cache_dir, Path):
cache_dir = Path(cache_dir)

config = DuckDBCacheConfig(
db_path=cache_dir / f"db_{cache_name}.duckdb",
cache_dir=cache_dir,
cleanup=cleanup,
)
return DuckDBCache(config=config)
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,19 @@

from typing import Any

from airbyte_lib.cache import InMemoryCache
from airbyte_lib.executor import Executor, PathExecutor, VenvExecutor
from airbyte_lib._executor import Executor, PathExecutor, VenvExecutor
from airbyte_lib.registry import get_connector_metadata
from airbyte_lib.source import Source


def get_in_memory_cache():
return InMemoryCache()


def get_connector(
name: str,
version: str | None = None,
pip_url: str | None = None,
config: dict[str, Any] | None = None,
use_local_install: bool = False,
install_if_missing: bool = True,
):
) -> Source:
"""
Get a connector by name and version.
:param name: connector name
Expand Down
11 changes: 11 additions & 0 deletions airbyte-lib/airbyte_lib/_file_writers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from .base import FileWriterBase, FileWriterBatchHandle, FileWriterConfigBase
from .parquet import ParquetWriter, ParquetWriterConfig


__all__ = [
"FileWriterBatchHandle",
"FileWriterBase",
"FileWriterConfigBase",
"ParquetWriter",
"ParquetWriterConfig",
]
112 changes: 112 additions & 0 deletions airbyte-lib/airbyte_lib/_file_writers/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

"""Define abstract base class for File Writers, which write and read from file storage."""

from __future__ import annotations

import abc
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, cast, final

from overrides import overrides

from airbyte_lib._processors import BatchHandle, RecordProcessor
from airbyte_lib.config import CacheConfigBase


if TYPE_CHECKING:
import pyarrow as pa


DEFAULT_BATCH_SIZE = 10000


# The batch handle for file writers is a list of Path objects.
@dataclass
class FileWriterBatchHandle(BatchHandle):
"""The file writer batch handle is a list of Path objects."""

files: list[Path] = field(default_factory=list)


class FileWriterConfigBase(CacheConfigBase):
"""Configuration for the Snowflake cache."""

cache_dir: Path = Path("./.cache/files/")
"""The directory to store cache files in."""
cleanup: bool = True
"""Whether to clean up temporary files after processing a batch."""


class FileWriterBase(RecordProcessor, abc.ABC):
"""A generic base implementation for a file-based cache."""

config_class = FileWriterConfigBase
config: FileWriterConfigBase

@abc.abstractmethod
@overrides
def _write_batch(
self,
stream_name: str,
batch_id: str,
record_batch: pa.Table | pa.RecordBatch,
) -> FileWriterBatchHandle:
"""
Process a record batch.
Return a list of paths to one or more cache files.
"""
...

@final
def write_batch(
self,
stream_name: str,
batch_id: str,
record_batch: pa.Table | pa.RecordBatch,
) -> FileWriterBatchHandle:
"""Write a batch of records to the cache.
This method is final because it should not be overridden.
Subclasses should override `_write_batch` instead.
"""
return self._write_batch(stream_name, batch_id, record_batch)

@overrides
def _cleanup_batch(
self,
stream_name: str,
batch_id: str,
batch_handle: BatchHandle,
) -> None:
"""Clean up the cache.
For file writers, this means deleting the files created and declared in the batch.
This method is a no-op if the `cleanup` config option is set to False.
"""
if self.config.cleanup:
batch_handle = cast(FileWriterBatchHandle, batch_handle)
_ = stream_name, batch_id
for file_path in batch_handle.files:
file_path.unlink()

@final
def cleanup_batch(
self,
stream_name: str,
batch_id: str,
batch_handle: BatchHandle,
) -> None:
"""Clean up the cache.
For file writers, this means deleting the files created and declared in the batch.
This method is final because it should not be overridden.
Subclasses should override `_cleanup_batch` instead.
"""
self._cleanup_batch(stream_name, batch_id, batch_handle)
Loading

0 comments on commit 99a23dc

Please sign in to comment.