Skip to content

Commit

Permalink
Merge branch 'master' into ddavydov/#1467-source-s3-validate-config
Browse files Browse the repository at this point in the history
  • Loading branch information
davydov-d authored Feb 9, 2023
2 parents 0beffbf + 3905125 commit f1c19d3
Show file tree
Hide file tree
Showing 22 changed files with 541 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@
- name: HubSpot
sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
dockerRepository: airbyte/source-hubspot
dockerImageTag: 0.3.1
dockerImageTag: 0.3.2
documentationUrl: https://docs.airbyte.com/integrations/sources/hubspot
icon: hubspot.svg
sourceType: api
Expand Down Expand Up @@ -1565,7 +1565,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 2.0.0
dockerImageTag: 2.0.1
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
4 changes: 2 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6521,7 +6521,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-hubspot:0.3.1"
- dockerImage: "airbyte/source-hubspot:0.3.2"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/hubspot"
connectionSpecification:
Expand Down Expand Up @@ -13106,7 +13106,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-salesforce:2.0.0"
- dockerImage: "airbyte/source-salesforce:2.0.1"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
{
"streams": [
{
"stream": {
"name": "cohorts",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "active_users",
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-hubspot/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_hubspot ./source_hubspot
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.3.1
LABEL io.airbyte.version=0.3.2
LABEL io.airbyte.name=airbyte/source-hubspot

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@

import logging
from itertools import chain
from typing import Any, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union
from typing import Any, List, Mapping, Optional, Tuple

import requests
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.utils.schema_helpers import split_config
from airbyte_cdk.utils.event_timing import create_timer
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from requests import HTTPError
from source_hubspot.streams import (
API,
Expand Down Expand Up @@ -136,51 +131,3 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
available_streams = streams

return available_streams

def read(
self,
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None,
) -> Iterator[AirbyteMessage]:
"""
This method is overridden to check whether the stream `quotes` exists in the source, if not skip reading that stream.
"""
logger.info(f"Starting syncing {self.name}")
config, internal_config = split_config(config)
# TODO assert all streams exist in the connector
# get the streams once in case the connector needs to make any queries to generate them
stream_instances = {s.name: s for s in self.streams(config)}
state_manager = ConnectorStateManager(stream_instance_map=stream_instances, state=state)
self._stream_to_instance_map = stream_instances
with create_timer(self.name) as timer:
for configured_stream in catalog.streams:
stream_instance = stream_instances.get(configured_stream.stream.name)
if not stream_instance and configured_stream.stream.name == "quotes":
logger.warning("Stream `quotes` does not exist in the source. Skip reading `quotes` stream.")
continue
if not stream_instance:
raise KeyError(
f"The requested stream {configured_stream.stream.name} was not found in the source. Available streams: {stream_instances.keys()}"
)

try:
yield from self._read_stream(
logger=logger,
stream_instance=stream_instance,
configured_stream=configured_stream,
state_manager=state_manager,
internal_config=internal_config,
)
except Exception as e:
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
display_message = stream_instance.get_error_display_message(e)
if display_message:
raise AirbyteTracedException.from_exception(e, message=display_message) from e
raise e
finally:
logger.info(f"Finished syncing {self.name}")
logger.info(timer.report())

logger.info(f"Finished syncing {self.name}")
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import requests
from airbyte_cdk.entrypoint import logger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator, TokenAuthenticator
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
Expand Down Expand Up @@ -206,14 +205,9 @@ class Stream(HttpStream, ABC):
primary_key = None
filter_old_records: bool = True
denormalize_records: bool = False # one record from API response can result in multiple records emitted
raise_on_http_errors: bool = True
granted_scopes: Set = None
properties_scopes: Set = None

@property
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
return None

@property
@abstractmethod
def scopes(self) -> Set[str]:
Expand Down Expand Up @@ -263,12 +257,6 @@ def __init__(self, api: API, start_date: Union[str, pendulum.datetime], credenti
if creds_title in (OAUTH_CREDENTIALS, PRIVATE_APP_CREDENTIALS):
self._authenticator = api.get_authenticator()

def should_retry(self, response: requests.Response) -> bool:
if response.status_code == HTTPStatus.FORBIDDEN:
setattr(self, "raise_on_http_errors", False)
logger.warning("You have not permission to API for this stream. " "Please check your scopes for Hubspot account.")
return super().should_retry(response)

def backoff_time(self, response: requests.Response) -> Optional[float]:
if response.status_code == codes.too_many_requests:
return float(response.headers.get("Retry-After", 3))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ def some_credentials_fixture():
return {"credentials_title": "Private App Credentials", "access_token": "wrong token"}


@pytest.fixture(name="creds_with_wrong_permissions")
def creds_with_wrong_permissions():
return {"credentials_title": "Private App Credentials", "access_token": "THIS-IS-THE-ACCESS_TOKEN"}


@pytest.fixture(name="fake_properties_list")
def fake_properties_list():
return [f"property_number_{i}" for i in range(NUMBER_OF_PROPERTIES)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from source_hubspot.errors import HubspotRateLimited
from source_hubspot.helpers import APIv3Property
from source_hubspot.source import SourceHubspot
from source_hubspot.streams import API, Companies, Deals, Engagements, Products, Stream, Workflows
from source_hubspot.streams import API, Companies, Deals, Engagements, Products, Stream

from .utils import read_full_refresh, read_incremental

Expand Down Expand Up @@ -134,38 +134,30 @@ def test_check_connection_backoff_on_server_error(requests_mock, config):
assert not error


def test_wrong_permissions_api_key(requests_mock, creds_with_wrong_permissions, common_params, caplog):
"""
Error with API Key Permissions to particular stream,
typically this issue raises along with calling `workflows` stream with API Key
that doesn't have required permissions to read the stream.
"""

# Mapping tipical response for mocker
def test_stream_forbidden(requests_mock, config, caplog):
json = {
"status": "error",
"message": f'This hapikey ({creds_with_wrong_permissions.get("api_key")}) does not have proper permissions! (requires any of [automation-access])',
"correlationId": "2fe0a9af-3609-45c9-a4d7-83a1774121aa",
}

# We expect something like this
expected_warining_message = {
"type": "LOG",
"log": {
"level": "WARN",
"message": f'Stream `workflows` cannot be procced. This hapikey ({creds_with_wrong_permissions.get("api_key")}) does not have proper permissions! (requires any of [automation-access])',
},
"message": "This access_token does not have proper permissions!",
}
requests_mock.get("https://api.hubapi.com/automation/v3/workflows", json=json, status_code=403)

# Create test_stream instance
test_stream = Workflows(**common_params)

# Mocking Request
requests_mock.register_uri("GET", test_stream.url, json=json, status_code=403)
records = list(test_stream.read_records(sync_mode=SyncMode.full_refresh))
catalog = ConfiguredAirbyteCatalog.parse_obj({
"streams": [
{
"stream": {
"name": "workflows",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
})

# match logged expected logged warning message with output given from preudo-output
assert expected_warining_message["log"]["message"] in caplog.text
records = list(SourceHubspot().read(logger, config, catalog, {}))
assert json["message"] in caplog.text
records = [r for r in records if r.type == Type.RECORD]
assert not records


Expand Down Expand Up @@ -328,17 +320,6 @@ def configured_catalog_fixture():
return ConfiguredAirbyteCatalog.parse_obj(configured_catalog)


def test_it_should_not_read_quotes_stream_if_it_does_not_exist_in_client(oauth_config, configured_catalog):
"""
If 'quotes' stream is not in the client, it should skip it.
"""
source = SourceHubspot()

all_records = list(source.read(logger, config=oauth_config, catalog=configured_catalog, state=None))
records = [record for record in all_records if record.type == Type.RECORD]
assert not records


def test_search_based_stream_should_not_attempt_to_get_more_than_10k_records(requests_mock, common_params, fake_properties_list):
"""
If there are more than 10,000 records that would be returned by the Hubspot search endpoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=2.0.0
LABEL io.airbyte.version=2.0.1
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from requests import codes, exceptions # type: ignore[import]

from .api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce
from .streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, Describe, IncrementalSalesforceStream, SalesforceStream
from .streams import BulkIncrementalSalesforceStream, BulkSalesforceStream, Describe, IncrementalRestSalesforceStream, RestSalesforceStream


class AirbyteStopSync(AirbyteTracedException):
Expand Down Expand Up @@ -59,17 +59,10 @@ def _get_api_type(cls, stream_name, properties):
properties_not_supported_by_bulk = {
key: value for key, value in properties.items() if value.get("format") == "base64" or "object" in value["type"]
}
properties_length = len(",".join(p for p in properties))

rest_required = stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk
# If we have a lot of properties we can overcome REST API URL length and get an error: "reason: URI Too Long".
# For such cases connector tries to use BULK API because it uses POST request and passes properties in the request body.
bulk_required = properties_length + 2000 > Salesforce.REQUEST_SIZE_LIMITS

if rest_required and not bulk_required:
if rest_required:
return "rest"
if not rest_required:
return "bulk"
return "bulk"

@classmethod
def generate_streams(
Expand All @@ -79,6 +72,7 @@ def generate_streams(
sf_object: Salesforce,
) -> List[Stream]:
""" "Generates a list of stream by their names. It can be used for different tests too"""
logger = logging.getLogger()
authenticator = TokenAuthenticator(sf_object.access_token)
stream_properties = sf_object.generate_schemas(stream_objects)
streams = []
Expand All @@ -88,7 +82,7 @@ def generate_streams(

api_type = cls._get_api_type(stream_name, selected_properties)
if api_type == "rest":
full_refresh, incremental = SalesforceStream, IncrementalSalesforceStream
full_refresh, incremental = RestSalesforceStream, IncrementalRestSalesforceStream
elif api_type == "bulk":
full_refresh, incremental = BulkSalesforceStream, BulkIncrementalSalesforceStream
else:
Expand All @@ -98,10 +92,17 @@ def generate_streams(
pk, replication_key = sf_object.get_pk_and_replication_key(json_schema)
streams_kwargs.update(dict(sf_api=sf_object, pk=pk, stream_name=stream_name, schema=json_schema, authenticator=authenticator))
if replication_key and stream_name not in UNSUPPORTED_FILTERING_STREAMS:
streams.append(incremental(**streams_kwargs, replication_key=replication_key, start_date=config.get("start_date")))
stream = incremental(**streams_kwargs, replication_key=replication_key, start_date=config.get("start_date"))
else:
streams.append(full_refresh(**streams_kwargs))

stream = full_refresh(**streams_kwargs)
if api_type == "rest" and not stream.primary_key and stream.too_many_properties:
logger.warning(
f"Can not instantiate stream {stream_name}. "
f"It is not supported by the BULK API and can not be implemented via REST because the number of its properties "
f"exceeds the limit and it lacks a primary key."
)
continue
streams.append(stream)
return streams

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
Expand Down
Loading

0 comments on commit f1c19d3

Please sign in to comment.