Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: Add ruff lint rules #19

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
20 changes: 11 additions & 9 deletions airbyte_cdk/config_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
)

import time
from collections.abc import MutableMapping
from copy import copy
from typing import Any, List, MutableMapping
from typing import Any

import orjson

from airbyte_cdk.models import (
AirbyteControlConnectorConfigMessage,
Expand All @@ -18,14 +21,14 @@
OrchestratorType,
Type,
)
from orjson import orjson


class ObservedDict(dict): # type: ignore # disallow_any_generics is set to True, and dict is equivalent to dict[Any]
def __init__(
self,
non_observed_mapping: MutableMapping[Any, Any],
observer: ConfigObserver,
*,
update_on_unchanged_value: bool = True,
) -> None:
non_observed_mapping = copy(non_observed_mapping)
Expand All @@ -37,25 +40,25 @@ def __init__(
non_observed_mapping[item] = ObservedDict(value, observer)

# Observe nested list of dicts
if isinstance(value, List):
if isinstance(value, list):
for i, sub_value in enumerate(value):
if isinstance(sub_value, MutableMapping):
value[i] = ObservedDict(sub_value, observer)
super().__init__(non_observed_mapping)

def __setitem__(self, item: Any, value: Any) -> None:
def __setitem__(self, item: Any, value: Any) -> None: # noqa: ANN401 (any-type)
"""Override dict.__setitem__ by:
1. Observing the new value if it is a dict
2. Call observer update if the new value is different from the previous one
"""
previous_value = self.get(item)
if isinstance(value, MutableMapping):
value = ObservedDict(value, self.observer)
if isinstance(value, List):
if isinstance(value, list):
for i, sub_value in enumerate(value):
if isinstance(sub_value, MutableMapping):
value[i] = ObservedDict(sub_value, self.observer)
super(ObservedDict, self).__setitem__(item, value)
super().__setitem__(item, value)
if self.update_on_unchanged_value or value != previous_value:
self.observer.update()

Expand All @@ -76,7 +79,7 @@ def observe_connector_config(
non_observed_connector_config: MutableMapping[str, Any],
) -> ObservedDict:
if isinstance(non_observed_connector_config, ObservedDict):
raise ValueError("This connector configuration is already observed")
raise ValueError("This connector configuration is already observed") # noqa: TRY004 (expected TypeError)
connector_config_observer = ConfigObserver()
observed_connector_config = ObservedDict(
non_observed_connector_config, connector_config_observer
Expand All @@ -86,8 +89,7 @@ def observe_connector_config(


def emit_configuration_as_airbyte_control_message(config: MutableMapping[str, Any]) -> None:
"""
WARNING: deprecated - emit_configuration_as_airbyte_control_message is being deprecated in favor of the MessageRepository mechanism.
"""WARNING: deprecated - emit_configuration_as_airbyte_control_message is being deprecated in favor of the MessageRepository mechanism.
See the airbyte_cdk.sources.message package
"""
airbyte_message = create_connector_config_control_message(config)
Expand Down
44 changes: 22 additions & 22 deletions airbyte_cdk/connector.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from __future__ import annotations

import json
import logging
import os
import pkgutil
from abc import ABC, abstractmethod
from typing import Any, Generic, Mapping, Optional, Protocol, TypeVar
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any, Generic, Protocol, TypeVar

import yaml

from airbyte_cdk.models import (
AirbyteConnectionStatus,
ConnectorSpecification,
ConnectorSpecificationSerializer,
)


def load_optional_package_file(package: str, filename: str) -> Optional[bytes]:
if TYPE_CHECKING:
import logging


def load_optional_package_file(package: str, filename: str) -> bytes | None:
"""Gets a resource from a package, returning None if it does not exist"""
try:
return pkgutil.get_data(package, filename)
Expand All @@ -35,43 +40,39 @@ class BaseConnector(ABC, Generic[TConfig]):

@abstractmethod
def configure(self, config: Mapping[str, Any], temp_dir: str) -> TConfig:
"""
Persist config in temporary directory to run the Source job
"""
"""Persist config in temporary directory to run the Source job"""

@staticmethod
def read_config(config_path: str) -> Mapping[str, Any]:
config = BaseConnector._read_json_file(config_path)
if isinstance(config, Mapping):
return config
else:
raise ValueError(
f"The content of {config_path} is not an object and therefore is not a valid config. Please ensure the file represent a config."
)
raise ValueError(
f"The content of {config_path} is not an object and therefore is not a valid config. Please ensure the file represent a config."
)

@staticmethod
def _read_json_file(file_path: str) -> Any:
with open(file_path, "r") as file:
def _read_json_file(file_path: str) -> Any: # noqa: ANN401 (any-type)
with open(file_path, encoding="utf-8") as file: # noqa: PTH123, FURB101 (prefer pathlib)
contents = file.read()

try:
return json.loads(contents)
except json.JSONDecodeError as error:
raise ValueError(
f"Could not read json file {file_path}: {error}. Please ensure that it is a valid JSON."
)
) from None

@staticmethod
def write_config(config: TConfig, config_path: str) -> None:
with open(config_path, "w") as fh:
with open(config_path, "w", encoding="utf-8") as fh: # noqa: PTH123, FURB103 (replace with pathlib)
fh.write(json.dumps(config))

def spec(self, logger: logging.Logger) -> ConnectorSpecification:
"""
Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password)
"""Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password)
required to run this integration. By default, this will be loaded from a "spec.yaml" or a "spec.json" in the package root.
"""

_ = logger # unused
package = self.__class__.__module__.split(".")[0]

yaml_spec = load_optional_package_file(package, "spec.yaml")
Expand All @@ -90,16 +91,15 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
except json.JSONDecodeError as error:
raise ValueError(
f"Could not read json spec file: {error}. Please ensure that it is a valid JSON."
)
) from None
else:
raise FileNotFoundError("Unable to find spec.yaml or spec.json in the package.")

return ConnectorSpecificationSerializer.load(spec_obj)

@abstractmethod
def check(self, logger: logging.Logger, config: TConfig) -> AirbyteConnectionStatus:
"""
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
"""Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
to the Stripe API.
"""

Expand All @@ -114,7 +114,7 @@ class DefaultConnectorMixin:
def configure(
self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str
) -> Mapping[str, Any]:
config_path = os.path.join(temp_dir, "config.json")
config_path = os.path.join(temp_dir, "config.json") # noqa: PTH118 (should use pathlib)
self.write_config(config, config_path)
return config

Expand Down
41 changes: 29 additions & 12 deletions airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from __future__ import annotations

import dataclasses
from collections.abc import Mapping
from datetime import datetime
from typing import Any, List, Mapping
from typing import TYPE_CHECKING, Any

from airbyte_cdk.connector_builder.message_grouper import MessageGrouper
from airbyte_cdk.models import (
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
Type,
)
from airbyte_cdk.models import Type
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
Expand All @@ -23,6 +25,11 @@
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


if TYPE_CHECKING:
from airbyte_cdk.connector_builder.models import StreamRead


DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5
DEFAULT_MAXIMUM_NUMBER_OF_SLICES = 5
DEFAULT_MAXIMUM_RECORDS = 100
Expand Down Expand Up @@ -68,28 +75,38 @@ def read_stream(
source: DeclarativeSource,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
state: List[AirbyteStateMessage],
state: list[AirbyteStateMessage],
limits: TestReadLimits,
) -> AirbyteMessage:
try:
handler = MessageGrouper(limits.max_pages_per_slice, limits.max_slices, limits.max_records)
stream_name = configured_catalog.streams[
handler = MessageGrouper(
max_pages_per_slice=limits.max_pages_per_slice,
max_slices=limits.max_slices,
max_record_limit=limits.max_records,
)
stream_name: str = configured_catalog.streams[
0
].stream.name # The connector builder only supports a single stream
stream_read = handler.get_message_groups(
source, config, configured_catalog, state, limits.max_records
stream_read: StreamRead = handler.get_message_groups(
source=source,
config=config,
configured_catalog=configured_catalog,
state=state,
record_limit=limits.max_records,
)
return AirbyteMessage(
type=MessageType.RECORD,
record=AirbyteRecordMessage(
data=dataclasses.asdict(stream_read), stream=stream_name, emitted_at=_emitted_at()
data=dataclasses.asdict(stream_read),
stream=stream_name,
emitted_at=_emitted_at(),
),
)
except Exception as exc:
error = AirbyteTracedException.from_exception(
error: AirbyteTracedException = AirbyteTracedException.from_exception(
exc,
message=filter_secrets(
f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}"
f"Error reading stream with config={config} and catalog={configured_catalog}: {exc!s}"
),
)
return error.as_airbyte_message()
Expand All @@ -106,8 +123,8 @@ def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
),
)
except Exception as exc:
error = AirbyteTracedException.from_exception(
exc, message=f"Error resolving manifest: {str(exc)}"
error: AirbyteTracedException = AirbyteTracedException.from_exception(
exc, message=f"Error resolving manifest: {exc!s}"
)
return error.as_airbyte_message()

Expand Down
25 changes: 13 additions & 12 deletions airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from __future__ import annotations

import sys
from typing import Any, List, Mapping, Optional, Tuple
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any

import orjson

from airbyte_cdk.connector import BaseConnector
from airbyte_cdk.connector_builder.connector_builder_handler import (
Expand All @@ -25,12 +28,11 @@
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.source import Source
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from orjson import orjson


def get_config_and_catalog_from_args(
args: List[str],
) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]:
args: list[str],
) -> tuple[str, Mapping[str, Any], ConfiguredAirbyteCatalog | None, Any]:
# TODO: Add functionality for the `debug` logger.
# Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`.
parsed_args = AirbyteEntrypoint.parse_args(args)
Expand Down Expand Up @@ -69,22 +71,21 @@ def handle_connector_builder_request(
source: ManifestDeclarativeSource,
command: str,
config: Mapping[str, Any],
catalog: Optional[ConfiguredAirbyteCatalog],
state: List[AirbyteStateMessage],
catalog: ConfiguredAirbyteCatalog | None,
state: list[AirbyteStateMessage],
limits: TestReadLimits,
) -> AirbyteMessage:
if command == "resolve_manifest":
return resolve_manifest(source)
elif command == "test_read":
if command == "test_read":
assert (
catalog is not None
), "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
return read_stream(source, config, catalog, state, limits)
else:
raise ValueError(f"Unrecognized command {command}.")
raise ValueError(f"Unrecognized command {command}.")


def handle_request(args: List[str]) -> str:
def handle_request(args: list[str]) -> str:
command, config, catalog, state = get_config_and_catalog_from_args(args)
limits = get_limits(config)
source = create_source(config, limits)
Expand All @@ -100,7 +101,7 @@ def handle_request(args: List[str]) -> str:
print(handle_request(sys.argv[1:]))
except Exception as exc:
error = AirbyteTracedException.from_exception(
exc, message=f"Error handling request: {str(exc)}"
exc, message=f"Error handling request: {exc!s}"
)
m = error.as_airbyte_message()
print(orjson.dumps(AirbyteMessageSerializer.dump(m)).decode())
Loading
Loading