Skip to content

Commit

Permalink
refactor sql classes as SqlProcessors
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers committed Feb 22, 2024
1 parent e34d494 commit eea527e
Show file tree
Hide file tree
Showing 28 changed files with 1,318 additions and 1,283 deletions.
5 changes: 3 additions & 2 deletions airbyte/_factories/cache_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ def get_default_cache() -> DuckDBCache:
Cache files are stored in the `.cache` directory, relative to the current
working directory.
"""

cache_dir = Path("./.cache/default_cache")
return DuckDBCache(
db_path="./.cache/default_cache_db.duckdb",
db_path=cache_dir / "default_cache.duckdb",
cache_dir=cache_dir,
)


Expand Down
16 changes: 0 additions & 16 deletions airbyte/_file_writers/__init__.py

This file was deleted.

Empty file added airbyte/_processors/__init__.py
Empty file.
31 changes: 15 additions & 16 deletions airbyte/_processors.py → airbyte/_processors/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""Abstract base class for Processors, including SQL and File writers.
"""Define abstract base class for Processors, including Caches and File writers.
Processors can all take input from STDIN or a stream of Airbyte messages.
Processors can take input from STDIN or a stream of Airbyte messages.
Caches will pass their input to the File Writer. They share a common base class so certain
abstractions like "write" and "finalize" can be handled in either layer, or both.
Expand Down Expand Up @@ -34,7 +33,7 @@

from airbyte import exceptions as exc
from airbyte._util import protocol_util
from airbyte.config import CacheConfigBase
from airbyte.caches.base import CacheBase
from airbyte.progress import progress
from airbyte.strategies import WriteStrategy
from airbyte.types import _get_pyarrow_type
Expand Down Expand Up @@ -62,20 +61,20 @@ class RecordProcessor(abc.ABC):
"""Abstract base class for classes which can process input records."""

skip_finalize_step: bool = False
_expected_streams: set[str]

def __init__(
self,
config: CacheConfigBase,
cache: CacheBase,
*,
catalog_manager: CatalogManager | None = None,
) -> None:
self.config = config
if not isinstance(self.config, CacheConfigBase):
self._expected_streams: set[str] | None = None
self.cache: CacheBase = cache
if not isinstance(self.cache, CacheBase):
raise exc.AirbyteLibInputError(
message=(
f"Expected config class of type 'CacheConfigBase'. "
f"Instead received type '{type(self.config).__name__}'."
f"Expected config class of type 'CacheBase'. "
f"Instead received type '{type(self.cache).__name__}'."
),
)

Expand All @@ -94,6 +93,11 @@ def __init__(
self._catalog_manager: CatalogManager | None = catalog_manager
self._setup()

@property
def expected_streams(self) -> set[str]:
"""Return the expected stream names."""
return self._expected_streams or set()

def register_source(
self,
source_name: str,
Expand All @@ -112,11 +116,6 @@ def register_source(
)
self._expected_streams = stream_names

@property
def _streams_with_data(self) -> set[str]:
"""Return a list of known streams."""
return self._pending_batches.keys() | self._finalized_batches.keys()

@final
def process_stdin(
self,
Expand Down Expand Up @@ -213,7 +212,7 @@ def process_airbyte_messages(

all_streams = list(self._pending_batches.keys())
# Add empty streams to the streams list, so we create a destination table for it
for stream_name in self._expected_streams:
for stream_name in self.expected_streams:
if stream_name not in all_streams:
if DEBUG_MODE:
print(f"Stream {stream_name} has no data")
Expand Down
16 changes: 16 additions & 0 deletions airbyte/_processors/file/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""File processors."""

from __future__ import annotations

from .base import FileWriterBase, FileWriterBatchHandle
from .jsonl import JsonlWriter
from .parquet import ParquetWriter


__all__ = [
"FileWriterBatchHandle",
"FileWriterBase",
"JsonlWriter",
"ParquetWriter",
]
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

"""Define abstract base class for File Writers, which write and read from file storage."""
"""Abstract base class for File Writers, which write and read from file storage."""

from __future__ import annotations

Expand All @@ -11,8 +10,8 @@

from overrides import overrides

from airbyte._processors import BatchHandle, RecordProcessor
from airbyte.config import CacheConfigBase
from airbyte._processors.base import BatchHandle, RecordProcessor
from airbyte.caches.base import CacheBase


if TYPE_CHECKING:
Expand All @@ -34,7 +33,7 @@ class FileWriterBatchHandle(BatchHandle):
files: list[Path] = field(default_factory=list)


class FileWriterConfigBase(CacheConfigBase):
class FileWriterConfigBase(CacheBase):
"""Configuration for the Snowflake cache."""

cache_dir: Path = Path("./.cache/files/")
Expand All @@ -46,8 +45,6 @@ class FileWriterConfigBase(CacheConfigBase):
class FileWriterBase(RecordProcessor, abc.ABC):
"""A generic base implementation for a file-based cache."""

config: FileWriterConfigBase

@abc.abstractmethod
@overrides
def _write_batch(
Expand Down Expand Up @@ -90,7 +87,7 @@ def _cleanup_batch(
This method is a no-op if the `cleanup` config option is set to False.
"""
if self.config.cleanup:
if self.cache.cleanup:
batch_handle = cast(FileWriterBatchHandle, batch_handle)
_ = stream_name, batch_id
for file_path in batch_handle.files:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,47 +1,37 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

"""A Parquet cache implementation."""

from __future__ import annotations

import gzip
from pathlib import Path
from typing import TYPE_CHECKING, cast
from typing import TYPE_CHECKING

import orjson
import ulid
from overrides import overrides

from airbyte._file_writers.base import (
from airbyte._processors.file.base import (
FileWriterBase,
FileWriterBatchHandle,
FileWriterConfigBase,
)


if TYPE_CHECKING:
import pyarrow as pa


class JsonlWriterConfig(FileWriterConfigBase):
"""Configuration for the Snowflake cache."""

# Inherits `cache_dir` from base class


class JsonlWriter(FileWriterBase):
"""A Jsonl cache implementation."""

config_class = JsonlWriterConfig

def get_new_cache_file_path(
self,
stream_name: str,
batch_id: str | None = None, # ULID of the batch
) -> Path:
"""Return a new cache file path for the given stream."""
batch_id = batch_id or str(ulid.ULID())
config: JsonlWriterConfig = cast(JsonlWriterConfig, self.config)
target_dir = Path(config.cache_dir)
target_dir = Path(self.cache.cache_dir)
target_dir.mkdir(parents=True, exist_ok=True)
return target_dir / f"{stream_name}_{batch_id}.jsonl.gz"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

"""A Parquet cache implementation.
# Copyright (c) 2023 Airbyte, Inc., all rights reserved
"""A Parquet file writer implementation.
NOTE: Parquet is a strongly typed columnar storage format, which has known issues when applied to
variable schemas, schemas with indeterminate types, and schemas that have empty data nodes.
Expand All @@ -18,7 +17,7 @@
from pyarrow import parquet

from airbyte import exceptions as exc
from airbyte._file_writers.base import (
from airbyte._processors.file.base import (
FileWriterBase,
FileWriterBatchHandle,
FileWriterConfigBase,
Expand All @@ -42,7 +41,7 @@ def get_new_cache_file_path(
) -> Path:
"""Return a new cache file path for the given stream."""
batch_id = batch_id or str(ulid.ULID())
config: ParquetWriterConfig = cast(ParquetWriterConfig, self.config)
config: ParquetWriterConfig = cast(ParquetWriterConfig, self.cache)
target_dir = Path(config.cache_dir)
target_dir.mkdir(parents=True, exist_ok=True)
return target_dir / f"{stream_name}_{batch_id}.parquet"
Expand Down
2 changes: 2 additions & 0 deletions airbyte/_processors/sql/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""SQL processors."""
Loading

0 comments on commit eea527e

Please sign in to comment.