From a7a91233e6ccf9c53a54cea0427361cb1478d38e Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Sun, 25 Feb 2024 08:56:46 -0800 Subject: [PATCH 01/13] add TODOs --- airbyte/_processors/base.py | 12 ++++++++---- airbyte/_processors/file/base.py | 4 ++-- airbyte/_processors/file/jsonl.py | 2 +- airbyte/_processors/file/parquet.py | 4 ++-- airbyte/_processors/sql/base.py | 2 +- 5 files changed, 14 insertions(+), 10 deletions(-) diff --git a/airbyte/_processors/base.py b/airbyte/_processors/base.py index bde69494..d31ba9f9 100644 --- a/airbyte/_processors/base.py +++ b/airbyte/_processors/base.py @@ -184,7 +184,9 @@ def process_airbyte_messages( stream_batch.append(protocol_util.airbyte_record_message_to_dict(record_msg)) if len(stream_batch) >= max_batch_size: batch_df = pd.DataFrame(stream_batch) - record_batch = pa.Table.from_pandas(batch_df) + record_batch = pa.Table.from_pandas( + batch_df + ) # TODO: Refactor to remove dependency on pyarrow self._process_batch(stream_name, record_batch) progress.log_batch_written(stream_name, len(stream_batch)) stream_batch.clear() @@ -206,7 +208,9 @@ def process_airbyte_messages( # We are at the end of the stream. Process whatever else is queued. for stream_name, stream_batch in stream_batches.items(): batch_df = pd.DataFrame(stream_batch) - record_batch = pa.Table.from_pandas(batch_df) + record_batch = pa.Table.from_pandas( + batch_df + ) # TODO: Refactor to remove dependency on pyarrow self._process_batch(stream_name, record_batch) progress.log_batch_written(stream_name, len(stream_batch)) @@ -227,7 +231,7 @@ def process_airbyte_messages( def _process_batch( self, stream_name: str, - record_batch: pa.Table, + record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow ) -> tuple[str, Any, Exception | None]: """Process a single batch. @@ -252,7 +256,7 @@ def _write_batch( self, stream_name: str, batch_id: str, - record_batch: pa.Table, + record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow ) -> BatchHandle: """Process a single batch. diff --git a/airbyte/_processors/file/base.py b/airbyte/_processors/file/base.py index 9954008f..c1b364ca 100644 --- a/airbyte/_processors/file/base.py +++ b/airbyte/_processors/file/base.py @@ -42,7 +42,7 @@ def _write_batch( self, stream_name: str, batch_id: str, - record_batch: pa.Table, + record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow ) -> FileWriterBatchHandle: """Process a record batch. @@ -55,7 +55,7 @@ def write_batch( self, stream_name: str, batch_id: str, - record_batch: pa.Table, + record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow ) -> FileWriterBatchHandle: """Write a batch of records to the cache. diff --git a/airbyte/_processors/file/jsonl.py b/airbyte/_processors/file/jsonl.py index 1ea66bde..615c14c6 100644 --- a/airbyte/_processors/file/jsonl.py +++ b/airbyte/_processors/file/jsonl.py @@ -40,7 +40,7 @@ def _write_batch( self, stream_name: str, batch_id: str, - record_batch: pa.Table, + record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow ) -> FileWriterBatchHandle: """Process a record batch. diff --git a/airbyte/_processors/file/parquet.py b/airbyte/_processors/file/parquet.py index 5d1f83c0..1961aa53 100644 --- a/airbyte/_processors/file/parquet.py +++ b/airbyte/_processors/file/parquet.py @@ -41,7 +41,7 @@ def get_new_cache_file_path( def _get_missing_columns( self, stream_name: str, - record_batch: pa.Table, + record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow ) -> list[str]: """Return a list of columns that are missing in the batch. @@ -62,7 +62,7 @@ def _write_batch( self, stream_name: str, batch_id: str, - record_batch: pa.Table, + record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow ) -> FileWriterBatchHandle: """Process a record batch. diff --git a/airbyte/_processors/sql/base.py b/airbyte/_processors/sql/base.py index cb2e3f01..716ffa18 100644 --- a/airbyte/_processors/sql/base.py +++ b/airbyte/_processors/sql/base.py @@ -490,7 +490,7 @@ def _write_batch( self, stream_name: str, batch_id: str, - record_batch: pa.Table, + record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow ) -> FileWriterBatchHandle: """Process a record batch. From f86b28821b050d744561651422aa002a57c7f4c3 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 26 Feb 2024 13:22:08 -0800 Subject: [PATCH 02/13] wip updates (doesn't run) --- airbyte/_processors/base.py | 147 +++++++++------------ airbyte/_processors/file/base.py | 191 +++++++++++++++++++++++----- airbyte/_processors/file/jsonl.py | 52 +++----- airbyte/_processors/file/parquet.py | 82 ++++++------ airbyte/_processors/sql/base.py | 36 +++--- 5 files changed, 289 insertions(+), 219 deletions(-) diff --git a/airbyte/_processors/base.py b/airbyte/_processors/base.py index d31ba9f9..3031ea36 100644 --- a/airbyte/_processors/base.py +++ b/airbyte/_processors/base.py @@ -16,9 +16,9 @@ from collections import defaultdict from typing import TYPE_CHECKING, Any, cast, final -import pandas as pd import pyarrow as pa import ulid +from pydantic import BaseModel from airbyte_protocol.models import ( AirbyteMessage, @@ -32,7 +32,6 @@ ) from airbyte import exceptions as exc -from airbyte._util import protocol_util from airbyte.caches.base import CacheBase from airbyte.progress import progress from airbyte.strategies import WriteStrategy @@ -49,8 +48,11 @@ DEBUG_MODE = False # Set to True to enable additional debug logging. -class BatchHandle: - pass +class BatchHandle(BaseModel): + """A handle for a batch of records.""" + + stream_name: str + batch_id: str class AirbyteMessageParsingError(Exception): @@ -60,6 +62,8 @@ class AirbyteMessageParsingError(Exception): class RecordProcessor(abc.ABC): """Abstract base class for classes which can process input records.""" + MAX_BATCH_SIZE: int = DEFAULT_BATCH_SIZE + skip_finalize_step: bool = False def __init__( @@ -81,8 +85,9 @@ def __init__( self.source_catalog: ConfiguredAirbyteCatalog | None = None self._source_name: str | None = None - self._pending_batches: dict[str, dict[str, Any]] = defaultdict(dict, {}) - self._finalized_batches: dict[str, dict[str, Any]] = defaultdict(dict, {}) + self._active_batches: dict[str, BatchHandle] = {} + self._pending_batches: dict[str, list[BatchHandle]] = defaultdict(list, {}) + self._finalized_batches: dict[str, list[BatchHandle]] = defaultdict(list, {}) self._pending_state_messages: dict[str, list[AirbyteStateMessage]] = defaultdict(list, {}) self._finalized_state_messages: dict[ @@ -120,17 +125,13 @@ def register_source( def process_stdin( self, write_strategy: WriteStrategy = WriteStrategy.AUTO, - *, - max_batch_size: int = DEFAULT_BATCH_SIZE, ) -> None: """Process the input stream from stdin. Return a list of summaries for testing. """ input_stream = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8") - self.process_input_stream( - input_stream, write_strategy=write_strategy, max_batch_size=max_batch_size - ) + self.process_input_stream(input_stream, write_strategy=write_strategy) @final def _airbyte_messages_from_buffer( @@ -145,8 +146,6 @@ def process_input_stream( self, input_stream: io.TextIOBase, write_strategy: WriteStrategy = WriteStrategy.AUTO, - *, - max_batch_size: int = DEFAULT_BATCH_SIZE, ) -> None: """Parse the input stream and process data in batches. @@ -156,7 +155,18 @@ def process_input_stream( self.process_airbyte_messages( messages, write_strategy=write_strategy, - max_batch_size=max_batch_size, + ) + + def _process_record_message( + self, + record_msg: AirbyteRecordMessage, + ) -> tuple[str, BatchHandle]: + """Write a record to the cache. + + This method is called for each record message, before the batch is written. + """ + raise NotImplementedError( + "Subclasses must implement the _process_record_message() method.", ) @final @@ -164,8 +174,6 @@ def process_airbyte_messages( self, messages: Iterable[AirbyteMessage], write_strategy: WriteStrategy, - *, - max_batch_size: int = DEFAULT_BATCH_SIZE, ) -> None: """Process a stream of Airbyte messages.""" if not isinstance(write_strategy, WriteStrategy): @@ -174,22 +182,11 @@ def process_airbyte_messages( context={"write_strategy": write_strategy}, ) - stream_batches: dict[str, list[dict]] = defaultdict(list, {}) # Process messages, writing to batches as we go for message in messages: if message.type is Type.RECORD: record_msg = cast(AirbyteRecordMessage, message.record) - stream_name = record_msg.stream - stream_batch = stream_batches[stream_name] - stream_batch.append(protocol_util.airbyte_record_message_to_dict(record_msg)) - if len(stream_batch) >= max_batch_size: - batch_df = pd.DataFrame(stream_batch) - record_batch = pa.Table.from_pandas( - batch_df - ) # TODO: Refactor to remove dependency on pyarrow - self._process_batch(stream_name, record_batch) - progress.log_batch_written(stream_name, len(stream_batch)) - stream_batch.clear() + self._process_record_message(record_msg) elif message.type is Type.STATE: state_msg = cast(AirbyteStateMessage, message.state) @@ -206,15 +203,9 @@ def process_airbyte_messages( pass # We are at the end of the stream. Process whatever else is queued. - for stream_name, stream_batch in stream_batches.items(): - batch_df = pd.DataFrame(stream_batch) - record_batch = pa.Table.from_pandas( - batch_df - ) # TODO: Refactor to remove dependency on pyarrow - self._process_batch(stream_name, record_batch) - progress.log_batch_written(stream_name, len(stream_batch)) - - all_streams = list(self._pending_batches.keys()) + self._flush_active_batches() + + all_streams = list(set(self._pending_batches.keys()) | set(self._finalized_batches.keys())) # Add empty streams to the streams list, so we create a destination table for it for stream_name in self.expected_streams: if stream_name not in all_streams: @@ -227,46 +218,28 @@ def process_airbyte_messages( self._finalize_batches(stream_name, write_strategy=write_strategy) progress.log_stream_finalized(stream_name) - @final - def _process_batch( + def _flush_active_batches( self, - stream_name: str, - record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow - ) -> tuple[str, Any, Exception | None]: - """Process a single batch. - - Returns a tuple of the batch ID, batch handle, and an exception if one occurred. - """ - batch_id = self._new_batch_id() - batch_handle = self._write_batch( - stream_name, - batch_id, - record_batch, - ) or self._get_batch_handle(stream_name, batch_id) - - if self.skip_finalize_step: - self._finalized_batches[stream_name][batch_id] = batch_handle - else: - self._pending_batches[stream_name][batch_id] = batch_handle - - return batch_id, batch_handle, None + ) -> None: + """Flush active batches for all streams.""" + for stream_name in self._active_batches: + self._flush_active_batch(stream_name) - @abc.abstractmethod - def _write_batch( + def _flush_active_batch( self, stream_name: str, - batch_id: str, - record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow - ) -> BatchHandle: - """Process a single batch. + ) -> None: + """Flush the active batch for the given stream. - Returns a batch handle, such as a path or any other custom reference. + This entails moving the active batch to the pending batches, closing any open files, and + logging the batch as written. """ + raise NotImplementedError( + "Subclasses must implement the _flush_active_batch() method.", + ) def _cleanup_batch( # noqa: B027 # Intentionally empty, not abstract self, - stream_name: str, - batch_id: str, batch_handle: BatchHandle, ) -> None: """Clean up the cache. @@ -282,24 +255,23 @@ def _new_batch_id(self) -> str: """Return a new batch handle.""" return str(ulid.ULID()) - def _get_batch_handle( + def _new_batch( self, stream_name: str, - batch_id: str | None = None, # ULID of the batch - ) -> str: - """Return a new batch handle. + ) -> BatchHandle: + """Create and return a new batch handle. By default this is a concatenation of the stream name and batch ID. However, any Python object can be returned, such as a Path object. """ - batch_id = batch_id or self._new_batch_id() - return f"{stream_name}_{batch_id}" + batch_id = self._new_batch_id() + return BatchHandle(stream_name=stream_name, batch_id=batch_id) def _finalize_batches( self, stream_name: str, write_strategy: WriteStrategy, - ) -> dict[str, BatchHandle]: + ) -> list[BatchHandle]: """Finalize all uncommitted batches. Returns a mapping of batch IDs to batch handles, for processed batches. @@ -331,13 +303,15 @@ def _finalize_state_messages( def _finalizing_batches( self, stream_name: str, - ) -> Generator[dict[str, BatchHandle], str, None]: + ) -> Generator[list[BatchHandle], str, None]: """Context manager to use for finalizing batches, if applicable. Returns a mapping of batch IDs to batch handles, for those processed batches. """ - batches_to_finalize = self._pending_batches[stream_name].copy() - state_messages_to_finalize = self._pending_state_messages[stream_name].copy() + batches_to_finalize: list[BatchHandle] = self._pending_batches[stream_name].copy() + state_messages_to_finalize: list[AirbyteStateMessage] = self._pending_state_messages[ + stream_name + ].copy() self._pending_batches[stream_name].clear() self._pending_state_messages[stream_name].clear() @@ -346,11 +320,11 @@ def _finalizing_batches( self._finalize_state_messages(stream_name, state_messages_to_finalize) progress.log_batches_finalized(stream_name, len(batches_to_finalize)) - self._finalized_batches[stream_name].update(batches_to_finalize) + self._finalized_batches[stream_name] += batches_to_finalize self._finalized_state_messages[stream_name] += state_messages_to_finalize - for batch_id, batch_handle in batches_to_finalize.items(): - self._cleanup_batch(stream_name, batch_id, batch_handle) + for batch_handle in batches_to_finalize: + self._cleanup_batch(batch_handle) def _setup(self) -> None: # noqa: B027 # Intentionally empty, not abstract """Create the database. @@ -365,11 +339,14 @@ def _teardown(self) -> None: By default, the base implementation simply calls _cleanup_batch() for all pending batches. """ - for stream_name, pending_batches in self._pending_batches.items(): - for batch_id, batch_handle in pending_batches.items(): + batch_lists: list[list[BatchHandle]] = list(self._pending_batches.values()) + list( + self._finalized_batches.values() + ) + + # TODO: flatten lists and remove nested 'for' + for batch_list in batch_lists: + for batch_handle in batch_list: self._cleanup_batch( - stream_name=stream_name, - batch_id=batch_id, batch_handle=batch_handle, ) diff --git a/airbyte/_processors/file/base.py b/airbyte/_processors/file/base.py index c1b364ca..009b7509 100644 --- a/airbyte/_processors/file/base.py +++ b/airbyte/_processors/file/base.py @@ -4,20 +4,26 @@ from __future__ import annotations import abc -from dataclasses import dataclass, field -from typing import TYPE_CHECKING, cast, final +from contextlib import suppress +from dataclasses import field +from pathlib import Path +from typing import IO, TYPE_CHECKING, Any, cast, final +import ulid from overrides import overrides +from pydantic import Field, PrivateAttr +from airbyte import exceptions as exc from airbyte._processors.base import BatchHandle, RecordProcessor +from airbyte._util.protocol_util import airbyte_record_message_to_dict +from airbyte.progress import progress if TYPE_CHECKING: - from pathlib import Path - - import pyarrow as pa + from io import BufferedWriter from airbyte_protocol.models import ( + AirbyteRecordMessage, AirbyteStateMessage, ) @@ -26,51 +32,107 @@ # The batch handle for file writers is a list of Path objects. -@dataclass class FileWriterBatchHandle(BatchHandle): """The file writer batch handle is a list of Path objects.""" - files: list[Path] = field(default_factory=list) + files: list[Path] = Field(default_factory=list) + _open_file_writer: Any | None = PrivateAttr(default=None) + _record_count: int = PrivateAttr(default=0) + + # TODO: Handle pydantic error: Fields of type "" are not supported. + # open_file_writer: IO[bytes] | None = PrivateAttr(default=None) + + @property + def record_count(self) -> int: + return self._record_count + + @property + def open_file_writer(self) -> IO[bytes] | None: + return self._open_file_writer class FileWriterBase(RecordProcessor, abc.ABC): """A generic base implementation for a file-based cache.""" - @abc.abstractmethod - @overrides - def _write_batch( + default_cache_file_suffix: str = ".batch" + + _active_batches: dict[str, FileWriterBatchHandle] + + def _get_new_cache_file_path( self, stream_name: str, - batch_id: str, - record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow - ) -> FileWriterBatchHandle: - """Process a record batch. + batch_id: str | None = None, # ULID of the batch + ) -> Path: + """Return a new cache file path for the given stream.""" + batch_id = batch_id or str(ulid.ULID()) + target_dir = Path(self.cache.cache_dir) + target_dir.mkdir(parents=True, exist_ok=True) + return target_dir / f"{stream_name}_{batch_id}.{self.default_cache_file_suffix}" + + def _open_new_file( + self, + stream_name: str, + ) -> tuple[Path, IO[bytes]]: + """Open a new file for writing.""" + file_path: Path = self._get_new_cache_file_path(stream_name) + file_handle: BufferedWriter = file_path.open("wb") + return file_path, file_handle - Return a list of paths to one or more cache files. - """ - ... + def _flush_active_batches( + self, + ) -> None: + """Flush active batches for all streams.""" + for stream_name in self._active_batches: + self._flush_active_batch(stream_name) - @final - def write_batch( + def _flush_active_batch( self, stream_name: str, - batch_id: str, - record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow - ) -> FileWriterBatchHandle: - """Write a batch of records to the cache. + ) -> None: + """Flush the active batch for the given stream. - This method is final because it should not be overridden. + This entails moving the active batch to the pending batches, closing any open files, and + logging the batch as written. + """ + if stream_name not in self._active_batches: + return + + batch_handle: FileWriterBatchHandle = self._active_batches[stream_name] + del self._active_batches[stream_name] + + if self.skip_finalize_step: + self._finalized_batches[stream_name].append(batch_handle) + else: + self._pending_batches[stream_name].append(batch_handle) + progress.log_batch_written( + stream_name=stream_name, + batch_size=batch_handle.record_count, + ) + + def _new_batch( + self, + stream_name: str, + ) -> FileWriterBatchHandle: + """Create and return a new batch handle. - Subclasses should override `_write_batch` instead. + The base implementation creates and opens a new file for writing so it is ready to receive + records. """ - return self._write_batch(stream_name, batch_id, record_batch) + batch_id = self._new_batch_id() + new_file_path, new_file_handle = self._open_new_file(stream_name=stream_name) + batch_handle = FileWriterBatchHandle( + stream_name=stream_name, + batch_id=batch_id, + files=[new_file_path], + open_file_writer=new_file_handle, + ) + self._active_batches[stream_name] = batch_handle + return batch_handle @overrides def _cleanup_batch( self, - stream_name: str, - batch_id: str, - batch_handle: BatchHandle, + batch_handle: FileWriterBatchHandle, ) -> None: """Clean up the cache. @@ -78,18 +140,29 @@ def _cleanup_batch( This method is a no-op if the `cleanup` config option is set to False. """ + self._close_batch_files(batch_handle) + if self.cache.cleanup: - batch_handle = cast(FileWriterBatchHandle, batch_handle) - _ = stream_name, batch_id for file_path in batch_handle.files: file_path.unlink() + def _close_batch_files( + self, + batch_handle: FileWriterBatchHandle, + ) -> None: + """Close the current batch.""" + if not batch_handle.open_file_writer: + return + + with suppress(Exception): + batch_handle.open_file_writer.close() + + batch_handle.open_file_writer = None + @final def cleanup_batch( self, - stream_name: str, - batch_id: str, - batch_handle: BatchHandle, + batch_handle: FileWriterBatchHandle, ) -> None: """Clean up the cache. @@ -99,7 +172,7 @@ def cleanup_batch( Subclasses should override `_cleanup_batch` instead. """ - self._cleanup_batch(stream_name, batch_id, batch_handle) + self._cleanup_batch(batch_handle) @overrides def _finalize_state_messages( @@ -111,3 +184,51 @@ def _finalize_state_messages( State messages are not used in file writers, so this method is a no-op. """ pass + + @overrides + def _process_record_message( + self, + record_msg: AirbyteRecordMessage, + ) -> tuple[str, FileWriterBatchHandle]: + """Write a record to the cache. + + This method is called for each record message, before the batch is written. + + Returns: + A tuple of the stream name and the batch handle. + """ + stream_name = record_msg.stream + + batch_handle: FileWriterBatchHandle + if not self._pending_batches[stream_name]: + batch_handle = self._new_batch(stream_name=stream_name) + + else: + batch_handle = cast(FileWriterBatchHandle, self._pending_batches[stream_name][-1]) + + if batch_handle.record_count + 1 > self.MAX_BATCH_SIZE: + # Already at max batch size, so write the batch and start a new one + self._close_batch_files(batch_handle) + progress.log_batch_written( + stream_name=batch_handle.stream_name, + batch_size=batch_handle.record_count, + ) + batch_handle = self._new_batch(stream_name=stream_name) + + if not batch_handle.open_file_writer: + raise exc.AirbyteLibInternalError(message="Expected open file writer.") + + self._write_record_dict( + record_dict=airbyte_record_message_to_dict(record_message=record_msg), + open_file_writer=batch_handle.open_file_writer, + ) + batch_handle.record_count += 1 + return stream_name, batch_handle + + def _write_record_dict( + self, + record_dict: dict, + open_file_writer: IO[bytes], + ) -> None: + """Write one record to a file.""" + raise NotImplementedError("No default implementation.") diff --git a/airbyte/_processors/file/jsonl.py b/airbyte/_processors/file/jsonl.py index 615c14c6..8a238abb 100644 --- a/airbyte/_processors/file/jsonl.py +++ b/airbyte/_processors/file/jsonl.py @@ -4,55 +4,37 @@ from __future__ import annotations import gzip -from pathlib import Path -from typing import TYPE_CHECKING +from typing import IO, TYPE_CHECKING, cast import orjson -import ulid -from overrides import overrides from airbyte._processors.file.base import ( FileWriterBase, - FileWriterBatchHandle, ) if TYPE_CHECKING: - import pyarrow as pa + from pathlib import Path + + pass class JsonlWriter(FileWriterBase): """A Jsonl cache implementation.""" - def get_new_cache_file_path( + default_cache_file_suffix = ".jsonl.gz" + + def _open_new_file( self, stream_name: str, - batch_id: str | None = None, # ULID of the batch - ) -> Path: - """Return a new cache file path for the given stream.""" - batch_id = batch_id or str(ulid.ULID()) - target_dir = Path(self.cache.cache_dir) - target_dir.mkdir(parents=True, exist_ok=True) - return target_dir / f"{stream_name}_{batch_id}.jsonl.gz" - - @overrides - def _write_batch( + ) -> tuple[Path, IO[bytes]]: + """Open a new file for writing.""" + file_path = self._get_new_cache_file_path(stream_name) + return file_path, cast(IO[bytes], gzip.open(file_path, "w")) + + def _write_record_dict( self, - stream_name: str, - batch_id: str, - record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow - ) -> FileWriterBatchHandle: - """Process a record batch. - - Return the path to the cache file. - """ - _ = batch_id # unused - output_file_path = self.get_new_cache_file_path(stream_name) - - with gzip.open(output_file_path, "w") as jsonl_file: - for record in record_batch.to_pylist(): - jsonl_file.write(orjson.dumps(record) + b"\n") - - batch_handle = FileWriterBatchHandle() - batch_handle.files.append(output_file_path) - return batch_handle + record_dict: dict, + open_file_writer: gzip.GzipFile | IO[bytes], + ) -> None: + open_file_writer.write(orjson.dumps(record_dict) + b"\n") diff --git a/airbyte/_processors/file/parquet.py b/airbyte/_processors/file/parquet.py index 1961aa53..ec99dc90 100644 --- a/airbyte/_processors/file/parquet.py +++ b/airbyte/_processors/file/parquet.py @@ -27,16 +27,7 @@ class ParquetWriter(FileWriterBase): """A Parquet cache implementation.""" - def get_new_cache_file_path( - self, - stream_name: str, - batch_id: str | None = None, # ULID of the batch - ) -> Path: - """Return a new cache file path for the given stream.""" - batch_id = batch_id or str(ulid.ULID()) - target_dir = Path(self.cache.cache_dir) - target_dir.mkdir(parents=True, exist_ok=True) - return target_dir / f"{stream_name}_{batch_id}.parquet" + default_cache_file_suffix = ".parquet" def _get_missing_columns( self, @@ -57,42 +48,43 @@ def _get_missing_columns( if col.lower() not in lower_case_set(record_batch.schema.names) ] - @overrides - def _write_batch( - self, - stream_name: str, - batch_id: str, - record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow - ) -> FileWriterBatchHandle: - """Process a record batch. + # TODO: Delete if not needed + # @overrides + # def _write_batch( + # self, + # stream_name: str, + # batch_id: str, + # record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow + # ) -> FileWriterBatchHandle: + # """Process a record batch. - Return the path to the cache file. - """ - _ = batch_id # unused - output_file_path = self.get_new_cache_file_path(stream_name) + # Return the path to the cache file. + # """ + # _ = batch_id # unused + # output_file_path = self._get_new_cache_file_path(stream_name) - missing_columns = self._get_missing_columns(stream_name, record_batch) - if missing_columns: - # We need to append columns with the missing column name(s) and a null type - null_array = cast(pa.Array, pa.array([None] * len(record_batch), type=pa.null())) - for col in missing_columns: - record_batch = record_batch.append_column(col, null_array) + # missing_columns = self._get_missing_columns(stream_name, record_batch) + # if missing_columns: + # # We need to append columns with the missing column name(s) and a null type + # null_array = cast(pa.Array, pa.array([None] * len(record_batch), type=pa.null())) + # for col in missing_columns: + # record_batch = record_batch.append_column(col, null_array) - try: - with parquet.ParquetWriter(output_file_path, schema=record_batch.schema) as writer: - writer.write_table(record_batch) - except Exception as e: - raise exc.AirbyteLibInternalError( - message=f"Failed to write record batch to Parquet file: {e}", - context={ - "stream_name": stream_name, - "batch_id": batch_id, - "output_file_path": output_file_path, - "schema": record_batch.schema, - "record_batch": record_batch, - }, - ) from e + # try: + # with parquet.ParquetWriter(output_file_path, schema=record_batch.schema) as writer: + # writer.write_table(record_batch) + # except Exception as e: + # raise exc.AirbyteLibInternalError( + # message=f"Failed to write record batch to Parquet file: {e}", + # context={ + # "stream_name": stream_name, + # "batch_id": batch_id, + # "output_file_path": output_file_path, + # "schema": record_batch.schema, + # "record_batch": record_batch, + # }, + # ) from e - batch_handle = FileWriterBatchHandle() - batch_handle.files.append(output_file_path) - return batch_handle + # batch_handle = FileWriterBatchHandle() + # batch_handle.files.append(output_file_path) + # return batch_handle diff --git a/airbyte/_processors/sql/base.py b/airbyte/_processors/sql/base.py index 716ffa18..77743a0d 100644 --- a/airbyte/_processors/sql/base.py +++ b/airbyte/_processors/sql/base.py @@ -41,7 +41,6 @@ from collections.abc import Generator from pathlib import Path - import pyarrow as pa from sqlalchemy.engine import Connection, Engine from sqlalchemy.engine.cursor import CursorResult from sqlalchemy.engine.reflection import Inspector @@ -485,23 +484,22 @@ def _get_sql_column_definitions( # columns["_airbyte_loaded_at"] = sqlalchemy.TIMESTAMP() return columns - @overrides - def _write_batch( - self, - stream_name: str, - batch_id: str, - record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow - ) -> FileWriterBatchHandle: - """Process a record batch. + # TODO: Delete if not needed. + # @overrides + # def _write_batch( + # self, + # stream_name: str, + # batch_id: str, + # record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow + # ) -> FileWriterBatchHandle: + # """Process a record batch. - Return the path to the cache file. - """ - return self.file_writer.write_batch(stream_name, batch_id, record_batch) + # Return the path to the cache file. + # """ + # return self.file_writer.write_batch(stream_name, batch_id, record_batch) def _cleanup_batch( self, - stream_name: str, - batch_id: str, batch_handle: BatchHandle, ) -> None: """Clean up the cache. @@ -510,7 +508,7 @@ def _cleanup_batch( Subclasses should call super() if they override this method. """ - self.file_writer.cleanup_batch(stream_name, batch_id, batch_handle) + self.file_writer.cleanup_batch(batch_handle) @final @overrides @@ -518,7 +516,7 @@ def _finalize_batches( self, stream_name: str, write_strategy: WriteStrategy, - ) -> dict[str, BatchHandle]: + ) -> list[BatchHandle]: """Finalize all uncommitted batches. This is a generic 'final' implementation, which should not be overridden. @@ -543,15 +541,15 @@ def _finalize_batches( if not batches_to_finalize: # If there are no batches to finalize, return after ensuring the table exists. - return {} + return [] files: list[Path] = [] # Get a list of all files to finalize from all pending batches. - for batch_handle in batches_to_finalize.values(): + for batch_handle in batches_to_finalize: batch_handle = cast(FileWriterBatchHandle, batch_handle) files += batch_handle.files # Use the max batch ID as the batch ID for table names. - max_batch_id = max(batches_to_finalize.keys()) + max_batch_id = max([batch.batch_id for batch in batches_to_finalize]) temp_table_name = self._write_files_to_new_table( files=files, From d9ab8f00cd4dec02b7418dc441d77ac07fe0a0f2 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Tue, 5 Mar 2024 15:34:10 -0800 Subject: [PATCH 03/13] refactor BatchHandle to a single class, fix mypy lint errors --- airbyte/_batch_handles.py | 47 +++++++++++++++++++++++ airbyte/_processors/base.py | 9 +---- airbyte/_processors/file/__init__.py | 9 +++-- airbyte/_processors/file/base.py | 56 ++++++++-------------------- airbyte/_processors/file/parquet.py | 21 ++++------- airbyte/_processors/sql/base.py | 10 ++--- 6 files changed, 81 insertions(+), 71 deletions(-) create mode 100644 airbyte/_batch_handles.py diff --git a/airbyte/_batch_handles.py b/airbyte/_batch_handles.py new file mode 100644 index 00000000..220e9d52 --- /dev/null +++ b/airbyte/_batch_handles.py @@ -0,0 +1,47 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Batch handle class.""" + +from __future__ import annotations + +from contextlib import suppress +from pathlib import Path # noqa: TCH003 # Pydantic needs this import +from typing import IO, Any, Optional + +from pydantic import BaseModel, Field, PrivateAttr + + +class BatchHandle(BaseModel): + """A handle for a batch of records.""" + + stream_name: str + batch_id: str + + files: list[Path] = Field(default_factory=list) + _open_file_writer: Optional[Any] = PrivateAttr(default=None) + _record_count: int = PrivateAttr(default=0) + + @property + def record_count(self) -> int: + """Return the record count.""" + return self._record_count + + def increment_record_count(self) -> None: + """Increment the record count.""" + self._record_count += 1 + + @property + def open_file_writer(self) -> IO[bytes] | None: + """Return the open file writer, if any, or None.""" + return self._open_file_writer + + def close_files(self) -> None: + """Close the file writer.""" + if self._open_file_writer is None: + return + + with suppress(Exception): + self._open_file_writer.close() + + def __del__(self) -> None: + """Upon deletion, close the file writer.""" + self.close_files() diff --git a/airbyte/_processors/base.py b/airbyte/_processors/base.py index 3031ea36..9111a7f9 100644 --- a/airbyte/_processors/base.py +++ b/airbyte/_processors/base.py @@ -18,7 +18,6 @@ import pyarrow as pa import ulid -from pydantic import BaseModel from airbyte_protocol.models import ( AirbyteMessage, @@ -32,6 +31,7 @@ ) from airbyte import exceptions as exc +from airbyte._batch_handles import BatchHandle from airbyte.caches.base import CacheBase from airbyte.progress import progress from airbyte.strategies import WriteStrategy @@ -48,13 +48,6 @@ DEBUG_MODE = False # Set to True to enable additional debug logging. -class BatchHandle(BaseModel): - """A handle for a batch of records.""" - - stream_name: str - batch_id: str - - class AirbyteMessageParsingError(Exception): """Raised when an Airbyte message is invalid or cannot be parsed.""" diff --git a/airbyte/_processors/file/__init__.py b/airbyte/_processors/file/__init__.py index 26c25484..0f83652b 100644 --- a/airbyte/_processors/file/__init__.py +++ b/airbyte/_processors/file/__init__.py @@ -3,13 +3,14 @@ from __future__ import annotations -from .base import FileWriterBase, FileWriterBatchHandle -from .jsonl import JsonlWriter -from .parquet import ParquetWriter +from airbyte._batch_handles import BatchHandle +from airbyte._processors.file.base import FileWriterBase +from airbyte._processors.file.jsonl import JsonlWriter +from airbyte._processors.file.parquet import ParquetWriter __all__ = [ - "FileWriterBatchHandle", + "BatchHandle", "FileWriterBase", "JsonlWriter", "ParquetWriter", diff --git a/airbyte/_processors/file/base.py b/airbyte/_processors/file/base.py index 009b7509..30a6ced3 100644 --- a/airbyte/_processors/file/base.py +++ b/airbyte/_processors/file/base.py @@ -4,17 +4,15 @@ from __future__ import annotations import abc -from contextlib import suppress -from dataclasses import field from pathlib import Path -from typing import IO, TYPE_CHECKING, Any, cast, final +from typing import IO, TYPE_CHECKING, final import ulid from overrides import overrides -from pydantic import Field, PrivateAttr from airbyte import exceptions as exc -from airbyte._processors.base import BatchHandle, RecordProcessor +from airbyte._batch_handles import BatchHandle +from airbyte._processors.base import RecordProcessor from airbyte._util.protocol_util import airbyte_record_message_to_dict from airbyte.progress import progress @@ -31,32 +29,12 @@ DEFAULT_BATCH_SIZE = 10000 -# The batch handle for file writers is a list of Path objects. -class FileWriterBatchHandle(BatchHandle): - """The file writer batch handle is a list of Path objects.""" - - files: list[Path] = Field(default_factory=list) - _open_file_writer: Any | None = PrivateAttr(default=None) - _record_count: int = PrivateAttr(default=0) - - # TODO: Handle pydantic error: Fields of type "" are not supported. - # open_file_writer: IO[bytes] | None = PrivateAttr(default=None) - - @property - def record_count(self) -> int: - return self._record_count - - @property - def open_file_writer(self) -> IO[bytes] | None: - return self._open_file_writer - - class FileWriterBase(RecordProcessor, abc.ABC): """A generic base implementation for a file-based cache.""" default_cache_file_suffix: str = ".batch" - _active_batches: dict[str, FileWriterBatchHandle] + _active_batches: dict[str, BatchHandle] def _get_new_cache_file_path( self, @@ -97,7 +75,7 @@ def _flush_active_batch( if stream_name not in self._active_batches: return - batch_handle: FileWriterBatchHandle = self._active_batches[stream_name] + batch_handle: BatchHandle = self._active_batches[stream_name] del self._active_batches[stream_name] if self.skip_finalize_step: @@ -112,7 +90,7 @@ def _flush_active_batch( def _new_batch( self, stream_name: str, - ) -> FileWriterBatchHandle: + ) -> BatchHandle: """Create and return a new batch handle. The base implementation creates and opens a new file for writing so it is ready to receive @@ -120,11 +98,10 @@ def _new_batch( """ batch_id = self._new_batch_id() new_file_path, new_file_handle = self._open_new_file(stream_name=stream_name) - batch_handle = FileWriterBatchHandle( + batch_handle = BatchHandle( stream_name=stream_name, batch_id=batch_id, files=[new_file_path], - open_file_writer=new_file_handle, ) self._active_batches[stream_name] = batch_handle return batch_handle @@ -132,7 +109,7 @@ def _new_batch( @overrides def _cleanup_batch( self, - batch_handle: FileWriterBatchHandle, + batch_handle: BatchHandle, ) -> None: """Clean up the cache. @@ -148,21 +125,18 @@ def _cleanup_batch( def _close_batch_files( self, - batch_handle: FileWriterBatchHandle, + batch_handle: BatchHandle, ) -> None: """Close the current batch.""" if not batch_handle.open_file_writer: return - with suppress(Exception): - batch_handle.open_file_writer.close() - - batch_handle.open_file_writer = None + batch_handle.close_files() @final def cleanup_batch( self, - batch_handle: FileWriterBatchHandle, + batch_handle: BatchHandle, ) -> None: """Clean up the cache. @@ -189,7 +163,7 @@ def _finalize_state_messages( def _process_record_message( self, record_msg: AirbyteRecordMessage, - ) -> tuple[str, FileWriterBatchHandle]: + ) -> tuple[str, BatchHandle]: """Write a record to the cache. This method is called for each record message, before the batch is written. @@ -199,12 +173,12 @@ def _process_record_message( """ stream_name = record_msg.stream - batch_handle: FileWriterBatchHandle + batch_handle: BatchHandle if not self._pending_batches[stream_name]: batch_handle = self._new_batch(stream_name=stream_name) else: - batch_handle = cast(FileWriterBatchHandle, self._pending_batches[stream_name][-1]) + batch_handle = self._pending_batches[stream_name][-1] if batch_handle.record_count + 1 > self.MAX_BATCH_SIZE: # Already at max batch size, so write the batch and start a new one @@ -222,7 +196,7 @@ def _process_record_message( record_dict=airbyte_record_message_to_dict(record_message=record_msg), open_file_writer=batch_handle.open_file_writer, ) - batch_handle.record_count += 1 + batch_handle.increment_record_count() return stream_name, batch_handle def _write_record_dict( diff --git a/airbyte/_processors/file/parquet.py b/airbyte/_processors/file/parquet.py index ec99dc90..8a7abf66 100644 --- a/airbyte/_processors/file/parquet.py +++ b/airbyte/_processors/file/parquet.py @@ -8,22 +8,17 @@ """ from __future__ import annotations -from pathlib import Path -from typing import cast - -import pyarrow as pa -import ulid -from overrides import overrides -from pyarrow import parquet +from typing import TYPE_CHECKING from airbyte import exceptions as exc -from airbyte._processors.file.base import ( - FileWriterBase, - FileWriterBatchHandle, -) +from airbyte._processors.file.base import FileWriterBase from airbyte._util.text_util import lower_case_set +if TYPE_CHECKING: + import pyarrow as pa + + class ParquetWriter(FileWriterBase): """A Parquet cache implementation.""" @@ -55,7 +50,7 @@ def _get_missing_columns( # stream_name: str, # batch_id: str, # record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow - # ) -> FileWriterBatchHandle: + # ) -> BatchHandle: # """Process a record batch. # Return the path to the cache file. @@ -85,6 +80,6 @@ def _get_missing_columns( # }, # ) from e - # batch_handle = FileWriterBatchHandle() + # batch_handle = BatchHandle() # batch_handle.files.append(output_file_path) # return batch_handle diff --git a/airbyte/_processors/sql/base.py b/airbyte/_processors/sql/base.py index 345f1109..6cf3bc2d 100644 --- a/airbyte/_processors/sql/base.py +++ b/airbyte/_processors/sql/base.py @@ -6,7 +6,7 @@ import enum from contextlib import contextmanager from functools import cached_property -from typing import TYPE_CHECKING, cast, final +from typing import TYPE_CHECKING, final import pandas as pd import sqlalchemy @@ -27,8 +27,7 @@ from sqlalchemy.sql.elements import TextClause from airbyte import exceptions as exc -from airbyte._processors.base import BatchHandle, RecordProcessor -from airbyte._processors.file.base import FileWriterBase, FileWriterBatchHandle +from airbyte._processors.base import RecordProcessor from airbyte._util.text_util import lower_case_set from airbyte.caches._catalog_manager import CatalogManager from airbyte.datasets._sql import CachedDataset @@ -50,6 +49,8 @@ ConfiguredAirbyteCatalog, ) + from airbyte._batch_handles import BatchHandle + from airbyte._processors.file.base import FileWriterBase from airbyte.caches.base import CacheBase @@ -480,7 +481,7 @@ def _get_sql_column_definitions( # stream_name: str, # batch_id: str, # record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow - # ) -> FileWriterBatchHandle: + # ) -> BatchHandle: # """Process a record batch. # Return the path to the cache file. @@ -535,7 +536,6 @@ def _finalize_batches( files: list[Path] = [] # Get a list of all files to finalize from all pending batches. for batch_handle in batches_to_finalize: - batch_handle = cast(FileWriterBatchHandle, batch_handle) files += batch_handle.files # Use the max batch ID as the batch ID for table names. max_batch_id = max([batch.batch_id for batch in batches_to_finalize]) From b9b0296f72eda0d347471c38ded77d29ac38db71 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Tue, 5 Mar 2024 15:50:05 -0800 Subject: [PATCH 04/13] refactoring --- airbyte/_processors/base.py | 10 ++++++---- airbyte/_processors/file/base.py | 10 +--------- airbyte/_processors/sql/base.py | 14 -------------- 3 files changed, 7 insertions(+), 27 deletions(-) diff --git a/airbyte/_processors/base.py b/airbyte/_processors/base.py index 9111a7f9..dcde9c02 100644 --- a/airbyte/_processors/base.py +++ b/airbyte/_processors/base.py @@ -153,14 +153,16 @@ def process_input_stream( def _process_record_message( self, record_msg: AirbyteRecordMessage, - ) -> tuple[str, BatchHandle]: + ) -> None: """Write a record to the cache. This method is called for each record message, before the batch is written. + + By default this is a no-op but file writers can override this method to write the record to + files. """ - raise NotImplementedError( - "Subclasses must implement the _process_record_message() method.", - ) + _ = record_msg # Unused + pass @final def process_airbyte_messages( diff --git a/airbyte/_processors/file/base.py b/airbyte/_processors/file/base.py index 30a6ced3..50e12a14 100644 --- a/airbyte/_processors/file/base.py +++ b/airbyte/_processors/file/base.py @@ -56,13 +56,6 @@ def _open_new_file( file_handle: BufferedWriter = file_path.open("wb") return file_path, file_handle - def _flush_active_batches( - self, - ) -> None: - """Flush active batches for all streams.""" - for stream_name in self._active_batches: - self._flush_active_batch(stream_name) - def _flush_active_batch( self, stream_name: str, @@ -163,7 +156,7 @@ def _finalize_state_messages( def _process_record_message( self, record_msg: AirbyteRecordMessage, - ) -> tuple[str, BatchHandle]: + ) -> None: """Write a record to the cache. This method is called for each record message, before the batch is written. @@ -197,7 +190,6 @@ def _process_record_message( open_file_writer=batch_handle.open_file_writer, ) batch_handle.increment_record_count() - return stream_name, batch_handle def _write_record_dict( self, diff --git a/airbyte/_processors/sql/base.py b/airbyte/_processors/sql/base.py index 6cf3bc2d..7bc26a07 100644 --- a/airbyte/_processors/sql/base.py +++ b/airbyte/_processors/sql/base.py @@ -474,20 +474,6 @@ def _get_sql_column_definitions( # columns["_airbyte_loaded_at"] = sqlalchemy.TIMESTAMP() return columns - # TODO: Delete if not needed. - # @overrides - # def _write_batch( - # self, - # stream_name: str, - # batch_id: str, - # record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow - # ) -> BatchHandle: - # """Process a record batch. - - # Return the path to the cache file. - # """ - # return self.file_writer.write_batch(stream_name, batch_id, record_batch) - def _cleanup_batch( self, batch_handle: BatchHandle, From caf429a765e5c8c692aef17a43e14eac6bf3b8b6 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Wed, 6 Mar 2024 19:12:27 -0800 Subject: [PATCH 05/13] major refactor, passing tests again --- airbyte/_batch_handles.py | 49 ++++++-- airbyte/_processors/base.py | 189 +++++----------------------- airbyte/_processors/file/base.py | 150 +++++++++++++--------- airbyte/_processors/file/jsonl.py | 7 +- airbyte/_processors/file/parquet.py | 4 +- airbyte/_processors/sql/base.py | 108 ++++++++++------ 6 files changed, 231 insertions(+), 276 deletions(-) diff --git a/airbyte/_batch_handles.py b/airbyte/_batch_handles.py index 220e9d52..e405dfca 100644 --- a/airbyte/_batch_handles.py +++ b/airbyte/_batch_handles.py @@ -3,22 +3,45 @@ from __future__ import annotations -from contextlib import suppress -from pathlib import Path # noqa: TCH003 # Pydantic needs this import -from typing import IO, Any, Optional +from typing import IO, TYPE_CHECKING, Callable -from pydantic import BaseModel, Field, PrivateAttr +if TYPE_CHECKING: + from pathlib import Path -class BatchHandle(BaseModel): + +class BatchHandle: """A handle for a batch of records.""" - stream_name: str - batch_id: str + def __init__( + self, + stream_name: str, + batch_id: str, + files: list[Path], + file_opener: Callable[[Path], IO[bytes]], + ) -> None: + """Initialize the batch handle.""" + self._stream_name = stream_name + self._batch_id = batch_id + self._files = files + self._record_count = 0 + assert self._files, "A batch must have at least one file." + self._open_file_writer: IO[bytes] = file_opener(self._files[0]) + + @property + def files(self) -> list[Path]: + """Return the files.""" + return self._files + + @property + def batch_id(self) -> str: + """Return the batch ID.""" + return self._batch_id - files: list[Path] = Field(default_factory=list) - _open_file_writer: Optional[Any] = PrivateAttr(default=None) - _record_count: int = PrivateAttr(default=0) + @property + def stream_name(self) -> str: + """Return the stream name.""" + return self._stream_name @property def record_count(self) -> int: @@ -36,11 +59,11 @@ def open_file_writer(self) -> IO[bytes] | None: def close_files(self) -> None: """Close the file writer.""" - if self._open_file_writer is None: + if self.open_file_writer is None: return - with suppress(Exception): - self._open_file_writer.close() + # with suppress(Exception): + self.open_file_writer.close() def __del__(self) -> None: """Upon deletion, close the file writer.""" diff --git a/airbyte/_processors/base.py b/airbyte/_processors/base.py index dcde9c02..49acb1e0 100644 --- a/airbyte/_processors/base.py +++ b/airbyte/_processors/base.py @@ -10,14 +10,12 @@ from __future__ import annotations import abc -import contextlib import io import sys from collections import defaultdict from typing import TYPE_CHECKING, Any, cast, final import pyarrow as pa -import ulid from airbyte_protocol.models import ( AirbyteMessage, @@ -31,20 +29,17 @@ ) from airbyte import exceptions as exc -from airbyte._batch_handles import BatchHandle from airbyte.caches.base import CacheBase -from airbyte.progress import progress from airbyte.strategies import WriteStrategy from airbyte.types import _get_pyarrow_type if TYPE_CHECKING: - from collections.abc import Generator, Iterable, Iterator + from collections.abc import Iterable, Iterator from airbyte.caches._catalog_manager import CatalogManager -DEFAULT_BATCH_SIZE = 10_000 DEBUG_MODE = False # Set to True to enable additional debug logging. @@ -55,10 +50,6 @@ class AirbyteMessageParsingError(Exception): class RecordProcessor(abc.ABC): """Abstract base class for classes which can process input records.""" - MAX_BATCH_SIZE: int = DEFAULT_BATCH_SIZE - - skip_finalize_step: bool = False - def __init__( self, cache: CacheBase, @@ -78,10 +69,6 @@ def __init__( self.source_catalog: ConfiguredAirbyteCatalog | None = None self._source_name: str | None = None - self._active_batches: dict[str, BatchHandle] = {} - self._pending_batches: dict[str, list[BatchHandle]] = defaultdict(list, {}) - self._finalized_batches: dict[str, list[BatchHandle]] = defaultdict(list, {}) - self._pending_state_messages: dict[str, list[AirbyteStateMessage]] = defaultdict(list, {}) self._finalized_state_messages: dict[ str, @@ -150,19 +137,18 @@ def process_input_stream( write_strategy=write_strategy, ) - def _process_record_message( + @abc.abstractmethod + def process_record_message( self, record_msg: AirbyteRecordMessage, ) -> None: """Write a record to the cache. - This method is called for each record message, before the batch is written. + This method is called for each record message. - By default this is a no-op but file writers can override this method to write the record to - files. + In most cases, the SQL processor will not perform any action, but will pass this along to to + the file processor. """ - _ = record_msg # Unused - pass @final def process_airbyte_messages( @@ -181,7 +167,7 @@ def process_airbyte_messages( for message in messages: if message.type is Type.RECORD: record_msg = cast(AirbyteRecordMessage, message.record) - self._process_record_message(record_msg) + self.process_record_message(record_msg) elif message.type is Type.STATE: state_msg = cast(AirbyteStateMessage, message.state) @@ -197,129 +183,34 @@ def process_airbyte_messages( # Type.LOG, Type.TRACE, Type.CONTROL, etc. pass - # We are at the end of the stream. Process whatever else is queued. - self._flush_active_batches() - - all_streams = list(set(self._pending_batches.keys()) | set(self._finalized_batches.keys())) - # Add empty streams to the streams list, so we create a destination table for it - for stream_name in self.expected_streams: - if stream_name not in all_streams: - if DEBUG_MODE: - print(f"Stream {stream_name} has no data") - all_streams.append(stream_name) - - # Finalize any pending batches - for stream_name in all_streams: - self._finalize_batches(stream_name, write_strategy=write_strategy) - progress.log_stream_finalized(stream_name) - - def _flush_active_batches( - self, - ) -> None: - """Flush active batches for all streams.""" - for stream_name in self._active_batches: - self._flush_active_batch(stream_name) - - def _flush_active_batch( - self, - stream_name: str, - ) -> None: - """Flush the active batch for the given stream. - - This entails moving the active batch to the pending batches, closing any open files, and - logging the batch as written. - """ - raise NotImplementedError( - "Subclasses must implement the _flush_active_batch() method.", + self.flush_all( + write_strategy=write_strategy, ) - def _cleanup_batch( # noqa: B027 # Intentionally empty, not abstract - self, - batch_handle: BatchHandle, - ) -> None: - """Clean up the cache. - - This method is called after the given batch has been finalized. - - For instance, file writers can override this method to delete the files created. Caches, - similarly, can override this method to delete any other temporary artifacts. - """ - pass - - def _new_batch_id(self) -> str: - """Return a new batch handle.""" - return str(ulid.ULID()) - - def _new_batch( - self, - stream_name: str, - ) -> BatchHandle: - """Create and return a new batch handle. - - By default this is a concatenation of the stream name and batch ID. - However, any Python object can be returned, such as a Path object. - """ - batch_id = self._new_batch_id() - return BatchHandle(stream_name=stream_name, batch_id=batch_id) - - def _finalize_batches( - self, - stream_name: str, - write_strategy: WriteStrategy, - ) -> list[BatchHandle]: - """Finalize all uncommitted batches. - - Returns a mapping of batch IDs to batch handles, for processed batches. - - This is a generic implementation, which can be overridden. - """ - _ = write_strategy # Unused - with self._finalizing_batches(stream_name) as batches_to_finalize: - if batches_to_finalize and not self.skip_finalize_step: - raise NotImplementedError( - "Caches need to be finalized but no _finalize_batch() method " - f"exists for class {self.__class__.__name__}", - ) - - return batches_to_finalize + # Clean up files, if requested. + if self.cache.cleanup: + self.cleanup_all() @abc.abstractmethod + def flush_all(self, write_strategy: WriteStrategy) -> None: + """Finalize any pending writes.""" + def _finalize_state_messages( self, stream_name: str, state_messages: list[AirbyteStateMessage], ) -> None: - """Handle state messages. - Might be a no-op if the processor doesn't handle incremental state.""" - pass - - @final - @contextlib.contextmanager - def _finalizing_batches( - self, - stream_name: str, - ) -> Generator[list[BatchHandle], str, None]: - """Context manager to use for finalizing batches, if applicable. - - Returns a mapping of batch IDs to batch handles, for those processed batches. - """ - batches_to_finalize: list[BatchHandle] = self._pending_batches[stream_name].copy() - state_messages_to_finalize: list[AirbyteStateMessage] = self._pending_state_messages[ - stream_name - ].copy() - self._pending_batches[stream_name].clear() - self._pending_state_messages[stream_name].clear() - - progress.log_batches_finalizing(stream_name, len(batches_to_finalize)) - yield batches_to_finalize - self._finalize_state_messages(stream_name, state_messages_to_finalize) - progress.log_batches_finalized(stream_name, len(batches_to_finalize)) - - self._finalized_batches[stream_name] += batches_to_finalize - self._finalized_state_messages[stream_name] += state_messages_to_finalize - - for batch_handle in batches_to_finalize: - self._cleanup_batch(batch_handle) + """Handle state messages by passing them to the catalog manager.""" + if not self._catalog_manager: + raise exc.AirbyteLibInternalError( + message="Catalog manager should exist but does not.", + ) + if state_messages and self._source_name: + self._catalog_manager.save_state( + source_name=self._source_name, + stream_name=stream_name, + state=state_messages[-1], + ) def _setup(self) -> None: # noqa: B027 # Intentionally empty, not abstract """Create the database. @@ -329,27 +220,6 @@ def _setup(self) -> None: # noqa: B027 # Intentionally empty, not abstract """ pass - def _teardown(self) -> None: - """Teardown the processor resources. - - By default, the base implementation simply calls _cleanup_batch() for all pending batches. - """ - batch_lists: list[list[BatchHandle]] = list(self._pending_batches.values()) + list( - self._finalized_batches.values() - ) - - # TODO: flatten lists and remove nested 'for' - for batch_list in batch_lists: - for batch_handle in batch_list: - self._cleanup_batch( - batch_handle=batch_handle, - ) - - @final - def __del__(self) -> None: - """Teardown temporary resources when instance is unloaded from memory.""" - self._teardown() - @final def _get_stream_config( self, @@ -384,3 +254,10 @@ def _get_stream_pyarrow_schema( ].items() ] ) + + def cleanup_all(self) -> None: # noqa: B027 # Intentionally empty, not abstract + """Clean up all resources. + + The default implementation is a no-op. + """ + pass diff --git a/airbyte/_processors/file/base.py b/airbyte/_processors/file/base.py index 50e12a14..3339b61b 100644 --- a/airbyte/_processors/file/base.py +++ b/airbyte/_processors/file/base.py @@ -4,37 +4,48 @@ from __future__ import annotations import abc +from collections import defaultdict from pathlib import Path from typing import IO, TYPE_CHECKING, final import ulid -from overrides import overrides from airbyte import exceptions as exc from airbyte._batch_handles import BatchHandle -from airbyte._processors.base import RecordProcessor from airbyte._util.protocol_util import airbyte_record_message_to_dict from airbyte.progress import progress if TYPE_CHECKING: - from io import BufferedWriter from airbyte_protocol.models import ( AirbyteRecordMessage, - AirbyteStateMessage, ) + from airbyte.caches.base import CacheBase + from airbyte.strategies import WriteStrategy + DEFAULT_BATCH_SIZE = 10000 -class FileWriterBase(RecordProcessor, abc.ABC): +class FileWriterBase(abc.ABC): """A generic base implementation for a file-based cache.""" default_cache_file_suffix: str = ".batch" - _active_batches: dict[str, BatchHandle] + MAX_BATCH_SIZE: int = DEFAULT_BATCH_SIZE + + def __init__( + self, + cache: CacheBase, + ) -> None: + """Initialize the file writer.""" + self.cache = cache + + self._active_batches: dict[str, BatchHandle] = {} + self._pending_batches: dict[str, list[BatchHandle]] = defaultdict(list, {}) + self._finalized_batches: dict[str, list[BatchHandle]] = defaultdict(list, {}) def _get_new_cache_file_path( self, @@ -45,16 +56,14 @@ def _get_new_cache_file_path( batch_id = batch_id or str(ulid.ULID()) target_dir = Path(self.cache.cache_dir) target_dir.mkdir(parents=True, exist_ok=True) - return target_dir / f"{stream_name}_{batch_id}.{self.default_cache_file_suffix}" + return target_dir / f"{stream_name}_{batch_id}{self.default_cache_file_suffix}" def _open_new_file( self, - stream_name: str, - ) -> tuple[Path, IO[bytes]]: + file_path: Path, + ) -> IO[bytes]: """Open a new file for writing.""" - file_path: Path = self._get_new_cache_file_path(stream_name) - file_handle: BufferedWriter = file_path.open("wb") - return file_path, file_handle + return file_path.open("wb") def _flush_active_batch( self, @@ -69,12 +78,10 @@ def _flush_active_batch( return batch_handle: BatchHandle = self._active_batches[stream_name] + batch_handle.close_files() del self._active_batches[stream_name] - if self.skip_finalize_step: - self._finalized_batches[stream_name].append(batch_handle) - else: - self._pending_batches[stream_name].append(batch_handle) + self._pending_batches[stream_name].append(batch_handle) progress.log_batch_written( stream_name=stream_name, batch_size=batch_handle.record_count, @@ -88,35 +95,25 @@ def _new_batch( The base implementation creates and opens a new file for writing so it is ready to receive records. + + This also flushes the active batch if one already exists for the given stream. """ + if stream_name in self._active_batches: + self._flush_active_batch(stream_name) + batch_id = self._new_batch_id() - new_file_path, new_file_handle = self._open_new_file(stream_name=stream_name) + new_file_path = self._get_new_cache_file_path(stream_name) + batch_handle = BatchHandle( stream_name=stream_name, batch_id=batch_id, files=[new_file_path], + file_opener=self._open_new_file, ) self._active_batches[stream_name] = batch_handle return batch_handle - @overrides - def _cleanup_batch( - self, - batch_handle: BatchHandle, - ) -> None: - """Clean up the cache. - - For file writers, this means deleting the files created and declared in the batch. - - This method is a no-op if the `cleanup` config option is set to False. - """ - self._close_batch_files(batch_handle) - - if self.cache.cleanup: - for file_path in batch_handle.files: - file_path.unlink() - - def _close_batch_files( + def _close_batch( self, batch_handle: BatchHandle, ) -> None: @@ -127,10 +124,7 @@ def _close_batch_files( batch_handle.close_files() @final - def cleanup_batch( - self, - batch_handle: BatchHandle, - ) -> None: + def cleanup_all(self) -> None: """Clean up the cache. For file writers, this means deleting the files created and declared in the batch. @@ -139,21 +133,15 @@ def cleanup_batch( Subclasses should override `_cleanup_batch` instead. """ - self._cleanup_batch(batch_handle) + for batch_list in self._pending_batches.values(): + for batch_handle in batch_list: + self._cleanup_batch(batch_handle) - @overrides - def _finalize_state_messages( - self, - stream_name: str, - state_messages: list[AirbyteStateMessage], - ) -> None: - """ - State messages are not used in file writers, so this method is a no-op. - """ - pass + for batch_list in self._finalized_batches.values(): + for batch_handle in batch_list: + self._cleanup_batch(batch_handle) - @overrides - def _process_record_message( + def process_record_message( self, record_msg: AirbyteRecordMessage, ) -> None: @@ -167,22 +155,17 @@ def _process_record_message( stream_name = record_msg.stream batch_handle: BatchHandle - if not self._pending_batches[stream_name]: + if stream_name not in self._active_batches: batch_handle = self._new_batch(stream_name=stream_name) else: - batch_handle = self._pending_batches[stream_name][-1] + batch_handle = self._active_batches[stream_name] if batch_handle.record_count + 1 > self.MAX_BATCH_SIZE: - # Already at max batch size, so write the batch and start a new one - self._close_batch_files(batch_handle) - progress.log_batch_written( - stream_name=batch_handle.stream_name, - batch_size=batch_handle.record_count, - ) + # Already at max batch size, so start a new batch. batch_handle = self._new_batch(stream_name=stream_name) - if not batch_handle.open_file_writer: + if batch_handle.open_file_writer is None: raise exc.AirbyteLibInternalError(message="Expected open file writer.") self._write_record_dict( @@ -191,6 +174,51 @@ def _process_record_message( ) batch_handle.increment_record_count() + def _flush_active_batches( + self, + ) -> None: + """Flush active batches for all streams.""" + streams = list(self._active_batches.keys()) + for stream_name in streams: + self._flush_active_batch(stream_name) + + def _cleanup_batch( + self, + batch_handle: BatchHandle, + ) -> None: + """Clean up the cache. + + For file writers, this means deleting the files created and declared in the batch. + + This method is a no-op if the `cleanup` config option is set to False. + """ + self._close_batch(batch_handle) + + if self.cache.cleanup: + for file_path in batch_handle.files: + if file_path.exists(): + file_path.unlink() + + def _new_batch_id(self) -> str: + """Return a new batch handle.""" + return str(ulid.ULID()) + + def flush_all(self, write_strategy: WriteStrategy) -> None: + """Finalize any pending writes.""" + # We are at the end of the stream. Process whatever else is queued. + self._flush_active_batches() + + # Destructor + + @final + def __del__(self) -> None: + """Teardown temporary resources when instance is unloaded from memory.""" + if self.cache.cleanup: + self.cleanup_all() + + # Abstract methods + + @abc.abstractmethod def _write_record_dict( self, record_dict: dict, diff --git a/airbyte/_processors/file/jsonl.py b/airbyte/_processors/file/jsonl.py index 8a238abb..0578fded 100644 --- a/airbyte/_processors/file/jsonl.py +++ b/airbyte/_processors/file/jsonl.py @@ -26,11 +26,10 @@ class JsonlWriter(FileWriterBase): def _open_new_file( self, - stream_name: str, - ) -> tuple[Path, IO[bytes]]: + file_path: Path, + ) -> IO[bytes]: """Open a new file for writing.""" - file_path = self._get_new_cache_file_path(stream_name) - return file_path, cast(IO[bytes], gzip.open(file_path, "w")) + return cast(IO[bytes], gzip.open(file_path, "w")) def _write_record_dict( self, diff --git a/airbyte/_processors/file/parquet.py b/airbyte/_processors/file/parquet.py index 8a7abf66..34498fe2 100644 --- a/airbyte/_processors/file/parquet.py +++ b/airbyte/_processors/file/parquet.py @@ -33,9 +33,9 @@ def _get_missing_columns( The comparison is based on a case-insensitive comparison of the column names. """ - if not self._catalog_manager: + if not self.cache.processor._catalog_manager: raise exc.AirbyteLibInternalError(message="Catalog manager should exist but does not.") - stream = self._catalog_manager.get_stream_config(stream_name) + stream = self.cache.processor._catalog_manager.get_stream_config(stream_name) stream_property_names = stream.stream.json_schema["properties"].keys() return [ col diff --git a/airbyte/_processors/sql/base.py b/airbyte/_processors/sql/base.py index 7bc26a07..4a8ea9b7 100644 --- a/airbyte/_processors/sql/base.py +++ b/airbyte/_processors/sql/base.py @@ -3,6 +3,7 @@ from __future__ import annotations +import contextlib import enum from contextlib import contextmanager from functools import cached_property @@ -31,6 +32,7 @@ from airbyte._util.text_util import lower_case_set from airbyte.caches._catalog_manager import CatalogManager from airbyte.datasets._sql import CachedDataset +from airbyte.progress import progress from airbyte.strategies import WriteStrategy from airbyte.types import SQLTypeConverter @@ -45,6 +47,7 @@ from sqlalchemy.sql.base import Executable from airbyte_protocol.models import ( + AirbyteRecordMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog, ) @@ -90,9 +93,7 @@ def __init__( engine=self.get_sql_engine(), table_name_resolver=lambda stream_name: self.get_sql_table_name(stream_name), ) - self.file_writer = file_writer or self.file_writer_class( - cache, catalog_manager=self._catalog_manager - ) + self.file_writer = file_writer or self.file_writer_class(cache) self.type_converter = self.type_converter_class() self._cached_table_definitions: dict[str, sqlalchemy.Table] = {} @@ -160,11 +161,11 @@ def register_source( This method is called by the source when it is initialized. """ self._source_name = source_name - self.file_writer.register_source( - source_name, - incoming_source_catalog, - stream_names=stream_names, - ) + # self.register_source( + # source_name, + # incoming_source_catalog, + # stream_names=stream_names, + # ) self._ensure_schema_exists() super().register_source( source_name, @@ -233,6 +234,19 @@ def get_pandas_dataframe( engine = self.get_sql_engine() return pd.read_sql_table(table_name, engine) + def process_record_message( + self, + record_msg: AirbyteRecordMessage, + ) -> None: + """Write a record to the cache. + + This method is called for each record message, before the batch is written. + + In most cases, the SQL processor will not perform any action, but will pass this along to to + the file processor. + """ + self.file_writer.process_record_message(record_msg) + # Protected members (non-public interface): def _init_connection_settings(self, connection: Connection) -> None: @@ -474,28 +488,20 @@ def _get_sql_column_definitions( # columns["_airbyte_loaded_at"] = sqlalchemy.TIMESTAMP() return columns - def _cleanup_batch( - self, - batch_handle: BatchHandle, - ) -> None: - """Clean up the cache. - - For SQL caches, we only need to call the cleanup operation on the file writer. - - Subclasses should call super() if they override this method. - """ - self.file_writer.cleanup_batch(batch_handle) + def flush_all(self, write_strategy: WriteStrategy) -> None: + """Finalize any pending writes.""" + for stream_name in self.expected_streams: + self.flush_stream(stream_name, write_strategy=write_strategy) @final - @overrides - def _finalize_batches( + def flush_stream( self, stream_name: str, write_strategy: WriteStrategy, ) -> list[BatchHandle]: """Finalize all uncommitted batches. - This is a generic 'final' implementation, which should not be overridden. + This is a generic 'final' SQL implementation, which should not be overridden. Returns a mapping of batch IDs to batch handles, for those processed batches. @@ -503,7 +509,10 @@ def _finalize_batches( Some sources will send us duplicate records within the same stream, although this is a fairly rare edge case we can ignore in V1. """ - with self._finalizing_batches(stream_name) as batches_to_finalize: + # Flush any pending writes + self.file_writer.flush_all(write_strategy=write_strategy) + + with self.finalizing_batches(stream_name) as batches_to_finalize: # Make sure the target schema and target table exist. self._ensure_schema_exists() final_table_name = self._ensure_final_table_exists( @@ -541,26 +550,45 @@ def _finalize_batches( finally: self._drop_temp_table(temp_table_name, if_exists=True) - # Return the batch handles as measure of work completed. - return batches_to_finalize + # Return the batch handles as measure of work completed. + return batches_to_finalize - @overrides - def _finalize_state_messages( + @final + def cleanup_all(self) -> None: + """Clean resources.""" + self.file_writer.cleanup_all() + + # Finalizing context manager + + @final + @contextlib.contextmanager + def finalizing_batches( self, stream_name: str, - state_messages: list[AirbyteStateMessage], - ) -> None: - """Handle state messages by passing them to the catalog manager.""" - if not self._catalog_manager: - raise exc.AirbyteLibInternalError( - message="Catalog manager should exist but does not.", - ) - if state_messages and self._source_name: - self._catalog_manager.save_state( - source_name=self._source_name, - stream_name=stream_name, - state=state_messages[-1], - ) + ) -> Generator[list[BatchHandle], str, None]: + """Context manager to use for finalizing batches, if applicable. + + Returns a mapping of batch IDs to batch handles, for those processed batches. + """ + batches_to_finalize: list[BatchHandle] = self.file_writer._pending_batches[ + stream_name + ].copy() + state_messages_to_finalize: list[AirbyteStateMessage] = self._pending_state_messages[ + stream_name + ].copy() + self.file_writer._pending_batches[stream_name].clear() + self._pending_state_messages[stream_name].clear() + + progress.log_batches_finalizing(stream_name, len(batches_to_finalize)) + yield batches_to_finalize + self._finalize_state_messages(stream_name, state_messages_to_finalize) + progress.log_batches_finalized(stream_name, len(batches_to_finalize)) + + self.file_writer._finalized_batches[stream_name] += batches_to_finalize + self._finalized_state_messages[stream_name] += state_messages_to_finalize + + for batch_handle in batches_to_finalize: + self.file_writer._cleanup_batch(batch_handle) def _execute_sql(self, sql: str | TextClause | Executable) -> CursorResult: """Execute the given SQL statement.""" From 1cb73d4be2b4655e2e510b618403b86f77c40dcb Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 7 Mar 2024 09:08:42 -0800 Subject: [PATCH 06/13] minor renames, cleanup --- airbyte/_batch_handles.py | 10 ++++++++++ airbyte/_processors/base.py | 25 +++++++++++++++++++++---- airbyte/_processors/file/base.py | 13 ++----------- airbyte/_processors/sql/base.py | 14 +++++--------- 4 files changed, 38 insertions(+), 24 deletions(-) diff --git a/airbyte/_batch_handles.py b/airbyte/_batch_handles.py index e405dfca..212c8b2d 100644 --- a/airbyte/_batch_handles.py +++ b/airbyte/_batch_handles.py @@ -65,6 +65,16 @@ def close_files(self) -> None: # with suppress(Exception): self.open_file_writer.close() + def delete_files(self) -> None: + """Delete the files. + + If any files are open, they will be closed first. + If any files are missing, they will be ignored. + """ + self.close_files() + for file in self.files: + file.unlink(missing_ok=True) + def __del__(self) -> None: """Upon deletion, close the file writer.""" self.close_files() diff --git a/airbyte/_processors/base.py b/airbyte/_processors/base.py index 49acb1e0..074ead64 100644 --- a/airbyte/_processors/base.py +++ b/airbyte/_processors/base.py @@ -37,6 +37,7 @@ if TYPE_CHECKING: from collections.abc import Iterable, Iterator + from airbyte._batch_handles import BatchHandle from airbyte.caches._catalog_manager import CatalogManager @@ -48,7 +49,13 @@ class AirbyteMessageParsingError(Exception): class RecordProcessor(abc.ABC): - """Abstract base class for classes which can process input records.""" + """Abstract base class for classes which can process Airbyte messages from a source. + + This class is responsible for all aspects of handling Airbyte protocol. + + The class leverages the cache's catalog manager class to store and retrieve metadata. + + """ def __init__( self, @@ -183,7 +190,7 @@ def process_airbyte_messages( # Type.LOG, Type.TRACE, Type.CONTROL, etc. pass - self.flush_all( + self.write_all_stream_data( write_strategy=write_strategy, ) @@ -191,9 +198,19 @@ def process_airbyte_messages( if self.cache.cleanup: self.cleanup_all() - @abc.abstractmethod - def flush_all(self, write_strategy: WriteStrategy) -> None: + def write_all_stream_data(self, write_strategy: WriteStrategy) -> None: """Finalize any pending writes.""" + for stream_name in self.expected_streams: + self.write_stream_data(stream_name, write_strategy=write_strategy) + + @abc.abstractmethod + def write_stream_data( + self, + stream_name: str, + write_strategy: WriteStrategy, + ) -> list[BatchHandle]: + """Write pending stream data to the cache.""" + ... def _finalize_state_messages( self, diff --git a/airbyte/_processors/file/base.py b/airbyte/_processors/file/base.py index 3339b61b..c81ea71c 100644 --- a/airbyte/_processors/file/base.py +++ b/airbyte/_processors/file/base.py @@ -17,13 +17,11 @@ if TYPE_CHECKING: - from airbyte_protocol.models import ( AirbyteRecordMessage, ) from airbyte.caches.base import CacheBase - from airbyte.strategies import WriteStrategy DEFAULT_BATCH_SIZE = 10000 @@ -174,7 +172,7 @@ def process_record_message( ) batch_handle.increment_record_count() - def _flush_active_batches( + def flush_active_batches( self, ) -> None: """Flush active batches for all streams.""" @@ -195,19 +193,12 @@ def _cleanup_batch( self._close_batch(batch_handle) if self.cache.cleanup: - for file_path in batch_handle.files: - if file_path.exists(): - file_path.unlink() + batch_handle.delete_files() def _new_batch_id(self) -> str: """Return a new batch handle.""" return str(ulid.ULID()) - def flush_all(self, write_strategy: WriteStrategy) -> None: - """Finalize any pending writes.""" - # We are at the end of the stream. Process whatever else is queued. - self._flush_active_batches() - # Destructor @final diff --git a/airbyte/_processors/sql/base.py b/airbyte/_processors/sql/base.py index 4a8ea9b7..4abba1c5 100644 --- a/airbyte/_processors/sql/base.py +++ b/airbyte/_processors/sql/base.py @@ -488,13 +488,8 @@ def _get_sql_column_definitions( # columns["_airbyte_loaded_at"] = sqlalchemy.TIMESTAMP() return columns - def flush_all(self, write_strategy: WriteStrategy) -> None: - """Finalize any pending writes.""" - for stream_name in self.expected_streams: - self.flush_stream(stream_name, write_strategy=write_strategy) - @final - def flush_stream( + def write_stream_data( self, stream_name: str, write_strategy: WriteStrategy, @@ -510,7 +505,7 @@ def flush_stream( although this is a fairly rare edge case we can ignore in V1. """ # Flush any pending writes - self.file_writer.flush_all(write_strategy=write_strategy) + self.file_writer.flush_active_batches() with self.finalizing_batches(stream_name) as batches_to_finalize: # Make sure the target schema and target table exist. @@ -587,8 +582,9 @@ def finalizing_batches( self.file_writer._finalized_batches[stream_name] += batches_to_finalize self._finalized_state_messages[stream_name] += state_messages_to_finalize - for batch_handle in batches_to_finalize: - self.file_writer._cleanup_batch(batch_handle) + if self.cache.cleanup: + for batch_handle in batches_to_finalize: + batch_handle.delete_files() def _execute_sql(self, sql: str | TextClause | Executable) -> CursorResult: """Execute the given SQL statement.""" From 386452fd09d72c100f220dfa8178f9b5ea6dc005 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 7 Mar 2024 11:51:30 -0800 Subject: [PATCH 07/13] improve finalization process --- airbyte/_batch_handles.py | 3 +++ airbyte/_processors/base.py | 2 +- airbyte/_processors/file/base.py | 28 +++++++++++++++++------ airbyte/_processors/sql/base.py | 9 ++++---- airbyte/caches/base.py | 38 +++++++++++++++++++++++++++++--- airbyte/datasets/_sql.py | 2 +- airbyte/source.py | 4 +--- airbyte/validate.py | 2 +- 8 files changed, 67 insertions(+), 21 deletions(-) diff --git a/airbyte/_batch_handles.py b/airbyte/_batch_handles.py index 212c8b2d..1319ef81 100644 --- a/airbyte/_batch_handles.py +++ b/airbyte/_batch_handles.py @@ -28,6 +28,9 @@ def __init__( assert self._files, "A batch must have at least one file." self._open_file_writer: IO[bytes] = file_opener(self._files[0]) + # Marker for whether the batch has been finalized. + self.finalized: bool = False + @property def files(self) -> list[Path]: """Return the files.""" diff --git a/airbyte/_processors/base.py b/airbyte/_processors/base.py index 074ead64..f3efeafa 100644 --- a/airbyte/_processors/base.py +++ b/airbyte/_processors/base.py @@ -242,7 +242,7 @@ def _get_stream_config( self, stream_name: str, ) -> ConfiguredAirbyteStream: - """Return the column definitions for the given stream.""" + """Return the definition of the given stream.""" if not self._catalog_manager: raise exc.AirbyteLibInternalError( message="Catalog manager should exist but does not.", diff --git a/airbyte/_processors/file/base.py b/airbyte/_processors/file/base.py index c81ea71c..96cc6ab8 100644 --- a/airbyte/_processors/file/base.py +++ b/airbyte/_processors/file/base.py @@ -42,8 +42,7 @@ def __init__( self.cache = cache self._active_batches: dict[str, BatchHandle] = {} - self._pending_batches: dict[str, list[BatchHandle]] = defaultdict(list, {}) - self._finalized_batches: dict[str, list[BatchHandle]] = defaultdict(list, {}) + self._completed_batches: dict[str, list[BatchHandle]] = defaultdict(list, {}) def _get_new_cache_file_path( self, @@ -79,7 +78,7 @@ def _flush_active_batch( batch_handle.close_files() del self._active_batches[stream_name] - self._pending_batches[stream_name].append(batch_handle) + self._completed_batches[stream_name].append(batch_handle) progress.log_batch_written( stream_name=stream_name, batch_size=batch_handle.record_count, @@ -131,11 +130,10 @@ def cleanup_all(self) -> None: Subclasses should override `_cleanup_batch` instead. """ - for batch_list in self._pending_batches.values(): - for batch_handle in batch_list: - self._cleanup_batch(batch_handle) + for batch_handle in self._active_batches.values(): + self._cleanup_batch(batch_handle) - for batch_list in self._finalized_batches.values(): + for batch_list in self._completed_batches.values(): for batch_handle in batch_list: self._cleanup_batch(batch_handle) @@ -217,3 +215,19 @@ def _write_record_dict( ) -> None: """Write one record to a file.""" raise NotImplementedError("No default implementation.") + + # Public methods (for use by Cache and SQL Processor classes) + + def get_active_batch(self, stream_name: str) -> BatchHandle | None: + """Return the active batch for a specific stream name.""" + return self._active_batches.get(stream_name, None) + + def get_pending_batches(self, stream_name: str) -> list[BatchHandle]: + """Return the pending batches for a specific stream name.""" + return [ + batch for batch in self._completed_batches.get(stream_name, []) if not batch.finalized + ] + + def get_finalized_batches(self, stream_name: str) -> list[BatchHandle]: + """Return the finalized batches for a specific stream name.""" + return [batch for batch in self._completed_batches.get(stream_name, []) if batch.finalized] diff --git a/airbyte/_processors/sql/base.py b/airbyte/_processors/sql/base.py index 4abba1c5..c949fdb2 100644 --- a/airbyte/_processors/sql/base.py +++ b/airbyte/_processors/sql/base.py @@ -565,13 +565,10 @@ def finalizing_batches( Returns a mapping of batch IDs to batch handles, for those processed batches. """ - batches_to_finalize: list[BatchHandle] = self.file_writer._pending_batches[ - stream_name - ].copy() + batches_to_finalize: list[BatchHandle] = self.file_writer.get_pending_batches(stream_name) state_messages_to_finalize: list[AirbyteStateMessage] = self._pending_state_messages[ stream_name ].copy() - self.file_writer._pending_batches[stream_name].clear() self._pending_state_messages[stream_name].clear() progress.log_batches_finalizing(stream_name, len(batches_to_finalize)) @@ -579,7 +576,9 @@ def finalizing_batches( self._finalize_state_messages(stream_name, state_messages_to_finalize) progress.log_batches_finalized(stream_name, len(batches_to_finalize)) - self.file_writer._finalized_batches[stream_name] += batches_to_finalize + for batch_handle in batches_to_finalize: + batch_handle.finalized = True + self._finalized_state_messages[stream_name] += state_messages_to_finalize if self.cache.cleanup: diff --git a/airbyte/caches/base.py b/airbyte/caches/base.py index f39f5c28..7f5f34f6 100644 --- a/airbyte/caches/base.py +++ b/airbyte/caches/base.py @@ -5,10 +5,12 @@ import abc from pathlib import Path -from typing import TYPE_CHECKING, Any, Optional, final +from typing import TYPE_CHECKING, Any, Optional, cast, final from pydantic import BaseModel, PrivateAttr +from airbyte import exceptions as exc +from airbyte.caches._catalog_manager import CatalogManager from airbyte.datasets._sql import CachedDataset @@ -76,13 +78,43 @@ def streams( """Return a temporary table name.""" result = {} stream_names = self.processor.expected_streams - if self.processor._catalog_manager is not None: # noqa: SLF001 - stream_names |= set(self.processor._catalog_manager.stream_names) # noqa: SLF001 + if self._has_catalog_manager: + stream_names |= set(self._catalog_manager.stream_names) for stream_name in stream_names: result[stream_name] = CachedDataset(self, stream_name) return result + def _get_state( + self, + source_name: str, + streams: list[str] | None, + ) -> list[dict[str, Any]] | None: + return self._catalog_manager.get_state( + source_name=source_name, + streams=streams, + ) + + @property + def _has_catalog_manager( + self, + ) -> bool: + """Return whether the cache has a catalog manager.""" + # Member is private until we have a public API for it. + return self.processor._catalog_manager is not None # noqa: SLF001 + + @property + def _catalog_manager( + self, + ) -> CatalogManager: + if not self._has_catalog_manager: + raise exc.AirbyteLibInternalError( + message="Catalog manager should exist but does not.", + ) + + # Member is private until we have a public API for it. + return cast(CatalogManager, self.processor._catalog_manager) # noqa: SLF001 + def __getitem__(self, stream: str) -> DatasetBase: return self.streams[stream] diff --git a/airbyte/datasets/_sql.py b/airbyte/datasets/_sql.py index 7911fbfb..b23cccad 100644 --- a/airbyte/datasets/_sql.py +++ b/airbyte/datasets/_sql.py @@ -39,7 +39,7 @@ def __init__( self._stream_name: str = stream_name self._query_statement: Selectable = query_statement super().__init__( - stream_metadata=cache.processor._get_stream_config( # noqa: SLF001 # Member is private until we have a public API for it. + stream_metadata=cache.processor._get_stream_config( # noqa: SLF001 # Member is private until we have a public API for it. stream_name=stream_name ), ) diff --git a/airbyte/source.py b/airbyte/source.py index c20cc3b8..96d8bdc2 100644 --- a/airbyte/source.py +++ b/airbyte/source.py @@ -615,11 +615,9 @@ def read( incoming_source_catalog=self.configured_catalog, stream_names=set(self._selected_stream_names), ) - if not cache.processor._catalog_manager: # noqa: SLF001 - raise exc.AirbyteLibInternalError(message="Catalog manager should exist but does not.") state = ( - cache.processor._catalog_manager.get_state( # noqa: SLF001 + cache._get_state( # noqa: SLF001 # Private method until we have a public API for it. source_name=self.name, streams=self._selected_stream_names, ) diff --git a/airbyte/validate.py b/airbyte/validate.py index 9b3520b5..dea7d2f4 100644 --- a/airbyte/validate.py +++ b/airbyte/validate.py @@ -91,7 +91,7 @@ def full_tests(connector_name: str, sample_config: str) -> None: def install_only_test(connector_name: str) -> None: print("Creating source and validating spec is returned successfully...") source = ab.get_source(connector_name) - source._get_spec(force_refresh=True) # noqa: SLF001 + source._get_spec(force_refresh=True) # noqa: SLF001 # Member is private until we have a public API for it. def run() -> None: From 2bd081c0136787f35bd99449d5bd54233605fa47 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 7 Mar 2024 13:16:06 -0800 Subject: [PATCH 08/13] fix lint issues --- airbyte/_processors/file/parquet.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/airbyte/_processors/file/parquet.py b/airbyte/_processors/file/parquet.py index 34498fe2..b18386f1 100644 --- a/airbyte/_processors/file/parquet.py +++ b/airbyte/_processors/file/parquet.py @@ -10,7 +10,6 @@ from typing import TYPE_CHECKING -from airbyte import exceptions as exc from airbyte._processors.file.base import FileWriterBase from airbyte._util.text_util import lower_case_set @@ -18,6 +17,8 @@ if TYPE_CHECKING: import pyarrow as pa + from airbyte_protocol.models import ConfiguredAirbyteStream + class ParquetWriter(FileWriterBase): """A Parquet cache implementation.""" @@ -33,9 +34,10 @@ def _get_missing_columns( The comparison is based on a case-insensitive comparison of the column names. """ - if not self.cache.processor._catalog_manager: - raise exc.AirbyteLibInternalError(message="Catalog manager should exist but does not.") - stream = self.cache.processor._catalog_manager.get_stream_config(stream_name) + # Access to private member required until the cache exposes a public API for it. + stream: ConfiguredAirbyteStream = self.cache._catalog_manager.get_stream_config( # noqa: SLF001 + stream_name=stream_name + ) stream_property_names = stream.stream.json_schema["properties"].keys() return [ col From 4966879730fe590e383050dd9adad8b463074777 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Thu, 7 Mar 2024 13:21:19 -0800 Subject: [PATCH 09/13] inline pr suggestion --- airbyte/_batch_handles.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte/_batch_handles.py b/airbyte/_batch_handles.py index 1319ef81..d0ac00b2 100644 --- a/airbyte/_batch_handles.py +++ b/airbyte/_batch_handles.py @@ -65,8 +65,8 @@ def close_files(self) -> None: if self.open_file_writer is None: return - # with suppress(Exception): - self.open_file_writer.close() + with suppress(Exception): + self.open_file_writer.close() def delete_files(self) -> None: """Delete the files. From d9d79bf43913d8a85a8f519ff4ad55e624ae2c9d Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Thu, 7 Mar 2024 13:29:19 -0800 Subject: [PATCH 10/13] inline pr suggestion --- airbyte/_processors/sql/base.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/airbyte/_processors/sql/base.py b/airbyte/_processors/sql/base.py index c949fdb2..9beb0b97 100644 --- a/airbyte/_processors/sql/base.py +++ b/airbyte/_processors/sql/base.py @@ -161,11 +161,6 @@ def register_source( This method is called by the source when it is initialized. """ self._source_name = source_name - # self.register_source( - # source_name, - # incoming_source_catalog, - # stream_names=stream_names, - # ) self._ensure_schema_exists() super().register_source( source_name, From 2faf1a3d19e6acfa647f12207cc5926d3c46ccfd Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 7 Mar 2024 13:33:23 -0800 Subject: [PATCH 11/13] fix missing import --- airbyte/_batch_handles.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte/_batch_handles.py b/airbyte/_batch_handles.py index d0ac00b2..289d19e3 100644 --- a/airbyte/_batch_handles.py +++ b/airbyte/_batch_handles.py @@ -3,6 +3,7 @@ from __future__ import annotations +from contextlib import suppress from typing import IO, TYPE_CHECKING, Callable From 5998c6200857e9154b203ba7c8a52780345ee048 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 7 Mar 2024 13:53:26 -0800 Subject: [PATCH 12/13] remove pyarrow and parquet refs --- airbyte/_processors/base.py | 17 ------ airbyte/_processors/file/__init__.py | 2 - airbyte/_processors/file/parquet.py | 87 ---------------------------- airbyte/_processors/sql/duckdb.py | 6 +- airbyte/types.py | 56 ------------------ poetry.lock | 58 ++----------------- pyproject.toml | 2 - 7 files changed, 6 insertions(+), 222 deletions(-) delete mode 100644 airbyte/_processors/file/parquet.py diff --git a/airbyte/_processors/base.py b/airbyte/_processors/base.py index f3efeafa..754c7eb0 100644 --- a/airbyte/_processors/base.py +++ b/airbyte/_processors/base.py @@ -15,8 +15,6 @@ from collections import defaultdict from typing import TYPE_CHECKING, Any, cast, final -import pyarrow as pa - from airbyte_protocol.models import ( AirbyteMessage, AirbyteRecordMessage, @@ -31,7 +29,6 @@ from airbyte import exceptions as exc from airbyte.caches.base import CacheBase from airbyte.strategies import WriteStrategy -from airbyte.types import _get_pyarrow_type if TYPE_CHECKING: @@ -258,20 +255,6 @@ def _get_stream_json_schema( """Return the column definitions for the given stream.""" return self._get_stream_config(stream_name).stream.json_schema - def _get_stream_pyarrow_schema( - self, - stream_name: str, - ) -> pa.Schema: - """Return the column definitions for the given stream.""" - return pa.schema( - fields=[ - pa.field(prop_name, _get_pyarrow_type(prop_def)) - for prop_name, prop_def in self._get_stream_json_schema(stream_name)[ - "properties" - ].items() - ] - ) - def cleanup_all(self) -> None: # noqa: B027 # Intentionally empty, not abstract """Clean up all resources. diff --git a/airbyte/_processors/file/__init__.py b/airbyte/_processors/file/__init__.py index 0f83652b..2ef9b9a4 100644 --- a/airbyte/_processors/file/__init__.py +++ b/airbyte/_processors/file/__init__.py @@ -6,12 +6,10 @@ from airbyte._batch_handles import BatchHandle from airbyte._processors.file.base import FileWriterBase from airbyte._processors.file.jsonl import JsonlWriter -from airbyte._processors.file.parquet import ParquetWriter __all__ = [ "BatchHandle", "FileWriterBase", "JsonlWriter", - "ParquetWriter", ] diff --git a/airbyte/_processors/file/parquet.py b/airbyte/_processors/file/parquet.py deleted file mode 100644 index b18386f1..00000000 --- a/airbyte/_processors/file/parquet.py +++ /dev/null @@ -1,87 +0,0 @@ -# Copyright (c) 2023 Airbyte, Inc., all rights reserved -"""A Parquet file writer implementation. - -NOTE: Parquet is a strongly typed columnar storage format, which has known issues when applied to -variable schemas, schemas with indeterminate types, and schemas that have empty data nodes. -This implementation is deprecated for now in favor of jsonl.gz, and may be removed or revamped in -the future. -""" -from __future__ import annotations - -from typing import TYPE_CHECKING - -from airbyte._processors.file.base import FileWriterBase -from airbyte._util.text_util import lower_case_set - - -if TYPE_CHECKING: - import pyarrow as pa - - from airbyte_protocol.models import ConfiguredAirbyteStream - - -class ParquetWriter(FileWriterBase): - """A Parquet cache implementation.""" - - default_cache_file_suffix = ".parquet" - - def _get_missing_columns( - self, - stream_name: str, - record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow - ) -> list[str]: - """Return a list of columns that are missing in the batch. - - The comparison is based on a case-insensitive comparison of the column names. - """ - # Access to private member required until the cache exposes a public API for it. - stream: ConfiguredAirbyteStream = self.cache._catalog_manager.get_stream_config( # noqa: SLF001 - stream_name=stream_name - ) - stream_property_names = stream.stream.json_schema["properties"].keys() - return [ - col - for col in stream_property_names - if col.lower() not in lower_case_set(record_batch.schema.names) - ] - - # TODO: Delete if not needed - # @overrides - # def _write_batch( - # self, - # stream_name: str, - # batch_id: str, - # record_batch: pa.Table, # TODO: Refactor to remove dependency on pyarrow - # ) -> BatchHandle: - # """Process a record batch. - - # Return the path to the cache file. - # """ - # _ = batch_id # unused - # output_file_path = self._get_new_cache_file_path(stream_name) - - # missing_columns = self._get_missing_columns(stream_name, record_batch) - # if missing_columns: - # # We need to append columns with the missing column name(s) and a null type - # null_array = cast(pa.Array, pa.array([None] * len(record_batch), type=pa.null())) - # for col in missing_columns: - # record_batch = record_batch.append_column(col, null_array) - - # try: - # with parquet.ParquetWriter(output_file_path, schema=record_batch.schema) as writer: - # writer.write_table(record_batch) - # except Exception as e: - # raise exc.AirbyteLibInternalError( - # message=f"Failed to write record batch to Parquet file: {e}", - # context={ - # "stream_name": stream_name, - # "batch_id": batch_id, - # "output_file_path": output_file_path, - # "schema": record_batch.schema, - # "record_batch": record_batch, - # }, - # ) from e - - # batch_handle = BatchHandle() - # batch_handle.files.append(output_file_path) - # return batch_handle diff --git a/airbyte/_processors/sql/duckdb.py b/airbyte/_processors/sql/duckdb.py index ba27f03e..a31f39a3 100644 --- a/airbyte/_processors/sql/duckdb.py +++ b/airbyte/_processors/sql/duckdb.py @@ -77,12 +77,8 @@ def _write_files_to_new_table( ) -> str: """Write a file(s) to a new table. - We use DuckDB's `read_parquet` function to efficiently read the files and insert + We use DuckDB native SQL functions to efficiently read the files and insert them into the table in a single operation. - - Note: This implementation is fragile in regards to column ordering. However, since - we are inserting into a temp table we have just created, there should be no - drift between the table schema and the file schema. """ temp_table_name = self._create_table_for_loading( stream_name=stream_name, diff --git a/airbyte/types.py b/airbyte/types.py index 64322bda..a95dbf59 100644 --- a/airbyte/types.py +++ b/airbyte/types.py @@ -5,7 +5,6 @@ from typing import cast -import pyarrow as pa import sqlalchemy from rich import print @@ -81,61 +80,6 @@ def _get_airbyte_type( # noqa: PLR0911 # Too many return statements raise SQLTypeConversionError(err_msg) -def _get_pyarrow_type( # noqa: PLR0911 # Too many return statements - json_schema_property_def: dict[str, str | dict | list], -) -> pa.DataType: - json_schema_type = json_schema_property_def.get("type", None) - json_schema_format = json_schema_property_def.get("format", None) - - # if json_schema_type is an array of two strings with one of them being null, pick the other one - # this strategy is often used by connectors to indicate a field might not be set all the time - if isinstance(json_schema_type, list): - non_null_types = [t for t in json_schema_type if t != "null"] - if len(non_null_types) == 1: - json_schema_type = non_null_types[0] - - if json_schema_type == "string": - if json_schema_format == "date": - return pa.date64() - - if json_schema_format == "date-time": - return pa.timestamp("ns") - - if json_schema_format == "time": - return pa.timestamp("ns") - - if json_schema_type == "string": - return pa.string() - - if json_schema_type == "number": - return pa.float64() - - if json_schema_type == "integer": - return pa.int64() - - if json_schema_type == "boolean": - return pa.bool_() - - if json_schema_type == "object": - return pa.struct( - fields={ - k: _get_pyarrow_type(v) - for k, v in cast(dict, json_schema_property_def.get("properties", {})).items() - } - ) - - if json_schema_type == "array": - items_def = json_schema_property_def.get("items", None) - if isinstance(items_def, dict): - subtype: pa.DataType = _get_pyarrow_type(items_def) - return pa.list_(subtype) - - return pa.list_(pa.string()) - - err_msg = f"Could not determine PyArrow type from JSON schema type: {json_schema_property_def}" - raise SQLTypeConversionError(err_msg) - - class SQLTypeConverter: """A base class to perform type conversions.""" diff --git a/poetry.lock b/poetry.lock index 49c66861..a3f5d6c3 100644 --- a/poetry.lock +++ b/poetry.lock @@ -670,18 +670,18 @@ grpc = ["grpcio (>=1.38.0,<2.0dev)", "grpcio-status (>=1.38.0,<2.0.dev0)"] [[package]] name = "google-cloud-secret-manager" -version = "2.18.2" +version = "2.18.3" description = "Google Cloud Secret Manager API client library" optional = false python-versions = ">=3.7" files = [ - {file = "google-cloud-secret-manager-2.18.2.tar.gz", hash = "sha256:a00d62115a70083e86b1d44ca7ebcae041b36a44cc62ea57de4005731f8d3c88"}, - {file = "google_cloud_secret_manager-2.18.2-py2.py3-none-any.whl", hash = "sha256:bb2f2b23a588f9eeae51df4a028d6198a603231a056f838e6a476b5aecadbbca"}, + {file = "google-cloud-secret-manager-2.18.3.tar.gz", hash = "sha256:1db2f409324536e34f985081d389e3974ca3a3668df7845cad0be03ab8c0fa7d"}, + {file = "google_cloud_secret_manager-2.18.3-py2.py3-none-any.whl", hash = "sha256:4d4af82bddd9099ebdbe79e0c6b68f6c6cabea8323a3c1275bcead8f56310fb7"}, ] [package.dependencies] google-api-core = {version = ">=1.34.1,<2.0.dev0 || >=2.11.dev0,<3.0.0dev", extras = ["grpc"]} -google-auth = ">=2.14.1,<3.0.0dev" +google-auth = ">=2.14.1,<2.24.0 || >2.24.0,<2.25.0 || >2.25.0,<3.0.0dev" grpc-google-iam-v1 = ">=0.12.4,<1.0.0dev" proto-plus = ">=1.22.3,<2.0.0dev" protobuf = ">=3.19.5,<3.20.0 || >3.20.0,<3.20.1 || >3.20.1,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<5.0.0dev" @@ -1639,54 +1639,6 @@ files = [ {file = "psycopg2_binary-2.9.9-cp39-cp39-win_amd64.whl", hash = "sha256:f7ae5d65ccfbebdfa761585228eb4d0df3a8b15cfb53bd953e713e09fbb12957"}, ] -[[package]] -name = "pyarrow" -version = "14.0.2" -description = "Python library for Apache Arrow" -optional = false -python-versions = ">=3.8" -files = [ - {file = "pyarrow-14.0.2-cp310-cp310-macosx_10_14_x86_64.whl", hash = "sha256:ba9fe808596c5dbd08b3aeffe901e5f81095baaa28e7d5118e01354c64f22807"}, - {file = "pyarrow-14.0.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:22a768987a16bb46220cef490c56c671993fbee8fd0475febac0b3e16b00a10e"}, - {file = "pyarrow-14.0.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2dbba05e98f247f17e64303eb876f4a80fcd32f73c7e9ad975a83834d81f3fda"}, - {file = "pyarrow-14.0.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a898d134d00b1eca04998e9d286e19653f9d0fcb99587310cd10270907452a6b"}, - {file = "pyarrow-14.0.2-cp310-cp310-manylinux_2_28_aarch64.whl", hash = "sha256:87e879323f256cb04267bb365add7208f302df942eb943c93a9dfeb8f44840b1"}, - {file = "pyarrow-14.0.2-cp310-cp310-manylinux_2_28_x86_64.whl", hash = "sha256:76fc257559404ea5f1306ea9a3ff0541bf996ff3f7b9209fc517b5e83811fa8e"}, - {file = "pyarrow-14.0.2-cp310-cp310-win_amd64.whl", hash = "sha256:b0c4a18e00f3a32398a7f31da47fefcd7a927545b396e1f15d0c85c2f2c778cd"}, - {file = "pyarrow-14.0.2-cp311-cp311-macosx_10_14_x86_64.whl", hash = "sha256:87482af32e5a0c0cce2d12eb3c039dd1d853bd905b04f3f953f147c7a196915b"}, - {file = "pyarrow-14.0.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:059bd8f12a70519e46cd64e1ba40e97eae55e0cbe1695edd95384653d7626b23"}, - {file = "pyarrow-14.0.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3f16111f9ab27e60b391c5f6d197510e3ad6654e73857b4e394861fc79c37200"}, - {file = "pyarrow-14.0.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:06ff1264fe4448e8d02073f5ce45a9f934c0f3db0a04460d0b01ff28befc3696"}, - {file = "pyarrow-14.0.2-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:6dd4f4b472ccf4042f1eab77e6c8bce574543f54d2135c7e396f413046397d5a"}, - {file = "pyarrow-14.0.2-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:32356bfb58b36059773f49e4e214996888eeea3a08893e7dbde44753799b2a02"}, - {file = "pyarrow-14.0.2-cp311-cp311-win_amd64.whl", hash = "sha256:52809ee69d4dbf2241c0e4366d949ba035cbcf48409bf404f071f624ed313a2b"}, - {file = "pyarrow-14.0.2-cp312-cp312-macosx_10_14_x86_64.whl", hash = "sha256:c87824a5ac52be210d32906c715f4ed7053d0180c1060ae3ff9b7e560f53f944"}, - {file = "pyarrow-14.0.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:a25eb2421a58e861f6ca91f43339d215476f4fe159eca603c55950c14f378cc5"}, - {file = "pyarrow-14.0.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5c1da70d668af5620b8ba0a23f229030a4cd6c5f24a616a146f30d2386fec422"}, - {file = "pyarrow-14.0.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2cc61593c8e66194c7cdfae594503e91b926a228fba40b5cf25cc593563bcd07"}, - {file = "pyarrow-14.0.2-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:78ea56f62fb7c0ae8ecb9afdd7893e3a7dbeb0b04106f5c08dbb23f9c0157591"}, - {file = "pyarrow-14.0.2-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:37c233ddbce0c67a76c0985612fef27c0c92aef9413cf5aa56952f359fcb7379"}, - {file = "pyarrow-14.0.2-cp312-cp312-win_amd64.whl", hash = "sha256:e4b123ad0f6add92de898214d404e488167b87b5dd86e9a434126bc2b7a5578d"}, - {file = "pyarrow-14.0.2-cp38-cp38-macosx_10_14_x86_64.whl", hash = "sha256:e354fba8490de258be7687f341bc04aba181fc8aa1f71e4584f9890d9cb2dec2"}, - {file = "pyarrow-14.0.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:20e003a23a13da963f43e2b432483fdd8c38dc8882cd145f09f21792e1cf22a1"}, - {file = "pyarrow-14.0.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fc0de7575e841f1595ac07e5bc631084fd06ca8b03c0f2ecece733d23cd5102a"}, - {file = "pyarrow-14.0.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:66e986dc859712acb0bd45601229021f3ffcdfc49044b64c6d071aaf4fa49e98"}, - {file = "pyarrow-14.0.2-cp38-cp38-manylinux_2_28_aarch64.whl", hash = "sha256:f7d029f20ef56673a9730766023459ece397a05001f4e4d13805111d7c2108c0"}, - {file = "pyarrow-14.0.2-cp38-cp38-manylinux_2_28_x86_64.whl", hash = "sha256:209bac546942b0d8edc8debda248364f7f668e4aad4741bae58e67d40e5fcf75"}, - {file = "pyarrow-14.0.2-cp38-cp38-win_amd64.whl", hash = "sha256:1e6987c5274fb87d66bb36816afb6f65707546b3c45c44c28e3c4133c010a881"}, - {file = "pyarrow-14.0.2-cp39-cp39-macosx_10_14_x86_64.whl", hash = "sha256:a01d0052d2a294a5f56cc1862933014e696aa08cc7b620e8c0cce5a5d362e976"}, - {file = "pyarrow-14.0.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:a51fee3a7db4d37f8cda3ea96f32530620d43b0489d169b285d774da48ca9785"}, - {file = "pyarrow-14.0.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:64df2bf1ef2ef14cee531e2dfe03dd924017650ffaa6f9513d7a1bb291e59c15"}, - {file = "pyarrow-14.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3c0fa3bfdb0305ffe09810f9d3e2e50a2787e3a07063001dcd7adae0cee3601a"}, - {file = "pyarrow-14.0.2-cp39-cp39-manylinux_2_28_aarch64.whl", hash = "sha256:c65bf4fd06584f058420238bc47a316e80dda01ec0dfb3044594128a6c2db794"}, - {file = "pyarrow-14.0.2-cp39-cp39-manylinux_2_28_x86_64.whl", hash = "sha256:63ac901baec9369d6aae1cbe6cca11178fb018a8d45068aaf5bb54f94804a866"}, - {file = "pyarrow-14.0.2-cp39-cp39-win_amd64.whl", hash = "sha256:75ee0efe7a87a687ae303d63037d08a48ef9ea0127064df18267252cfe2e9541"}, - {file = "pyarrow-14.0.2.tar.gz", hash = "sha256:36cef6ba12b499d864d1def3e990f97949e0b79400d08b7cf74504ffbd3eb025"}, -] - -[package.dependencies] -numpy = ">=1.16.6" - [[package]] name = "pyarrow-stubs" version = "10.0.1.7" @@ -2795,4 +2747,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = ">=3.9,<4.0" -content-hash = "a6b886f25692bf3dc4f8503c6d81ef0b7d690fe93432e0bf58812c534b1fe037" +content-hash = "6e1c84109ec85756c6aa736ca9c844f1f5db5174e19d3182fb58473e24a44e70" diff --git a/pyproject.toml b/pyproject.toml index 0166f599..213ded22 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,8 +28,6 @@ pendulum = "<=3.0.0" psycopg2-binary = "^2.9.9" # psycopg = {extras = ["binary", "pool"], version = "^3.1.16"} # Psycopg3 is not supported in SQLAlchemy 1.x: -pyarrow = "^14.0.2" -pydantic = "<=2.0" python-dotenv = "^1.0.1" python-ulid = "^2.2.0" requests = "^2.31.0" From 12985f60118f8dc73dffc2b00e62c344b14ac191 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 7 Mar 2024 13:56:59 -0800 Subject: [PATCH 13/13] add back pydantic --- poetry.lock | 12 +----------- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/poetry.lock b/poetry.lock index a3f5d6c3..6d346db2 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1639,16 +1639,6 @@ files = [ {file = "psycopg2_binary-2.9.9-cp39-cp39-win_amd64.whl", hash = "sha256:f7ae5d65ccfbebdfa761585228eb4d0df3a8b15cfb53bd953e713e09fbb12957"}, ] -[[package]] -name = "pyarrow-stubs" -version = "10.0.1.7" -description = "Type annotations for pyarrow" -optional = false -python-versions = ">=3.7,<4.0" -files = [ - {file = "pyarrow_stubs-10.0.1.7-py3-none-any.whl", hash = "sha256:cccc7a46eddeea4e3cb85330eb8972c116a615da6188b8ae1f7a44cb724b21ac"}, -] - [[package]] name = "pyasn1" version = "0.5.1" @@ -2747,4 +2737,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = ">=3.9,<4.0" -content-hash = "6e1c84109ec85756c6aa736ca9c844f1f5db5174e19d3182fb58473e24a44e70" +content-hash = "3490632ee893aa38e9b9bd6fc3435f0122cfa4beaf1694902d2558e529e9df3d" diff --git a/pyproject.toml b/pyproject.toml index 213ded22..c25556b8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ pendulum = "<=3.0.0" psycopg2-binary = "^2.9.9" # psycopg = {extras = ["binary", "pool"], version = "^3.1.16"} # Psycopg3 is not supported in SQLAlchemy 1.x: +pydantic = "<=2.0" python-dotenv = "^1.0.1" python-ulid = "^2.2.0" requests = "^2.31.0" @@ -47,7 +48,6 @@ faker = "^21.0.0" mypy = "^1.7.1" pandas-stubs = "^2.1.4.231218" pdoc = "^14.3.0" -pyarrow-stubs = "^10.0.1.7" pytest = "^7.4.3" pytest-docker = "^2.0.1" pytest-mypy = "^0.10.3"