Skip to content

Commit

Permalink
Add missing type hints (#24345)
Browse files Browse the repository at this point in the history
  • Loading branch information
girarda authored and erohmensing committed Mar 22, 2023
1 parent 55552b4 commit a481fa4
Showing 1 changed file with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,18 @@
import logging
from copy import deepcopy
from json import JSONDecodeError
from typing import Any, Iterable, Iterator, Mapping, Optional, Union
from typing import Any, Iterable, Iterator, List, Mapping, Optional, Union
from urllib.parse import parse_qs, urlparse

from airbyte_cdk.connector_builder.models import HttpRequest, HttpResponse, LogMessage, StreamRead, StreamReadPages, StreamReadSlices
from airbyte_cdk.connector_builder.models import (
HttpRequest,
HttpResponse,
LogMessage,
StreamRead,
StreamReadPages,
StreamReadSlices,
StreamReadSlicesInner,
)
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.utils import AirbyteTracedException
from airbyte_cdk.utils.schema_inferrer import SchemaInferrer
Expand Down Expand Up @@ -133,7 +141,7 @@ def _get_message_groups(
yield StreamReadSlices(pages=current_slice_pages)

@staticmethod
def _need_to_close_page(at_least_one_page_in_group, message) -> bool:
def _need_to_close_page(at_least_one_page_in_group: bool, message: AirbyteMessage) -> bool:
return (
at_least_one_page_in_group
and message.type == MessageType.LOG
Expand All @@ -155,7 +163,7 @@ def _close_page(current_page_request, current_page_response, current_slice_pages
)
current_page_records.clear()

def _read_stream(self, source, config, configured_catalog) -> Iterator[AirbyteMessage]:
def _read_stream(self, source: DeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog) -> Iterator[AirbyteMessage]:
# the generator can raise an exception
# iterate over the generated messages. if next raise an exception, catch it and yield it as an AirbyteLogMessage
try:
Expand Down Expand Up @@ -198,7 +206,7 @@ def _create_response_from_log_message(self, log_message: AirbyteLogMessage) -> O
self.logger.warning(f"Failed to parse log message into response object with error: {error}")
return None

def _has_reached_limit(self, slices):
def _has_reached_limit(self, slices: List[StreamReadSlicesInner]):
if len(slices) >= self._max_slices:
return True

Expand Down

0 comments on commit a481fa4

Please sign in to comment.