Skip to content

Commit

Permalink
✨Source Salesforce: Migrating to non-deprecated authenticator (#38065)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 authored May 9, 2024
1 parent d26bd10 commit 676b52e
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
from datetime import datetime
from pathlib import Path
from typing import Dict

import pendulum
import pytest
Expand Down Expand Up @@ -39,6 +40,10 @@ def sf(input_sandbox_config):
return sf


def _authentication_headers(salesforce: Salesforce) -> Dict[str, str]:
return {"Authorization": f"Bearer {salesforce.access_token}"}


@pytest.fixture(scope="module")
def stream_name():
return "ContentNote"
Expand Down Expand Up @@ -75,8 +80,8 @@ def get_stream_state():
return {"LastModifiedDate": pendulum.now(tz="UTC").add(days=-1).isoformat(timespec="milliseconds")}


def test_update_for_deleted_record(stream):
headers = stream.authenticator.get_auth_header()
def test_update_for_deleted_record(stream, sf):
headers = _authentication_headers(sf)
stream_state = get_stream_state()
time.sleep(1)
response = create_note(stream, headers)
Expand Down Expand Up @@ -138,8 +143,8 @@ def test_update_for_deleted_record(stream):
assert response.status_code == 404, "Expected an update to a deleted note to return 404"


def test_deleted_record(stream):
headers = stream.authenticator.get_auth_header()
def test_deleted_record(stream, sf):
headers = _authentication_headers(sf)
response = create_note(stream, headers)
assert response.status_code == 201, "Note was note created"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: b117307c-14b6-41aa-9422-947e34922962
dockerImageTag: 2.5.9
dockerImageTag: 2.5.10
dockerRepository: airbyte/source-salesforce
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
githubIssueLabel: source-salesforce
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.5.9"
version = "2.5.10"
name = "source-salesforce"
description = "Source implementation for Salesforce."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField, FinalStateCursor
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from airbyte_protocol.models import FailureType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def _fetch_next_page_for_chunk(
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
request = self._create_prepared_request(
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
headers=dict(request_headers, **self.authenticator.get_auth_header()),
headers=dict(request_headers),
params=self.request_params(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, property_chunk=property_chunk
),
Expand Down Expand Up @@ -365,7 +365,6 @@ def _send_http_request(self, method: str, url: str, json: dict = None, headers:
return self._non_retryable_send_http_request(method, url, json, headers, stream)

def _non_retryable_send_http_request(self, method: str, url: str, json: dict = None, headers: dict = None, stream: bool = False):
headers = self.authenticator.get_auth_header() if not headers else headers | self.authenticator.get_auth_header()
response = self._session.request(method, url=url, headers=headers, json=json, stream=stream)
if response.status_code not in [200, 204]:
self.logger.error(f"error body: {response.text}, sobject options: {self.sobject_options}")
Expand Down Expand Up @@ -706,7 +705,7 @@ def get_standard_instance(self) -> SalesforceStream:
stream_name=self.stream_name,
schema=self.schema,
sobject_options=self.sobject_options,
authenticator=self.authenticator,
authenticator=self._session.auth,
)
new_cls: Type[SalesforceStream] = RestSalesforceStream
if isinstance(self, BulkIncrementalSalesforceStream):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def setUp(self) -> None:
self._http_mocker = HttpMocker()
self._http_mocker.__enter__()

given_authentication(self._http_mocker, _CLIENT_ID, _CLIENT_SECRET, _REFRESH_TOKEN, _INSTANCE_URL)
given_authentication(self._http_mocker, _CLIENT_ID, _CLIENT_SECRET, _REFRESH_TOKEN, _INSTANCE_URL, _ACCESS_TOKEN)

def tearDown(self) -> None:
self._http_mocker.__exit__(None, None, None)
Expand Down Expand Up @@ -197,7 +197,7 @@ def test_given_job_is_failed_when_read_then_switch_to_standard(self):
JobInfoResponseBuilder().with_id(_JOB_ID).with_state("Failed").build(),
)
self._http_mocker.get(
create_standard_http_request(_STREAM_NAME, [_A_FIELD_NAME]),
create_standard_http_request(_STREAM_NAME, [_A_FIELD_NAME], _ACCESS_TOKEN),
create_standard_http_response([_A_FIELD_NAME]),
)
self._mock_delete_job(_JOB_ID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
_STREAM_NAME = UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS[0]


def create_http_request(stream_name: str, field_names: List[str]) -> HttpRequest:
return HttpRequest(f"{_BASE_URL}/queryAll?q=SELECT+{','.join(field_names)}+FROM+{stream_name}+")
def create_http_request(stream_name: str, field_names: List[str], access_token: Optional[str] = None) -> HttpRequest:
return HttpRequest(
f"{_BASE_URL}/queryAll?q=SELECT+{','.join(field_names)}+FROM+{stream_name}+",
headers={"Authorization": f"Bearer {access_token}"} if access_token else None
)


def create_http_response(field_names: List[str], record_count: int = 1) -> HttpResponse:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ def read(
return entrypoint_read(_source(catalog, config, state), config, catalog, state, expecting_exception)


def given_authentication(http_mocker: HttpMocker, client_id: str, client_secret: str, refresh_token: str, instance_url: str) -> None:
def given_authentication(http_mocker: HttpMocker, client_id: str, client_secret: str, refresh_token: str, instance_url: str, access_token: str = "any_access_token") -> None:
http_mocker.post(
HttpRequest(
"https://login.salesforce.com/services/oauth2/token",
query_params=ANY_QUERY_PARAMS,
body=f"grant_type=refresh_token&client_id={client_id}&client_secret={client_secret}&refresh_token={refresh_token}"
),
HttpResponse(json.dumps({"access_token": "any_access_token", "instance_url": instance_url})),
HttpResponse(json.dumps({"access_token": access_token, "instance_url": instance_url})),
)


Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :----------------------------------------------------------------------------------------------------------------------------------- |
| 2.5.10 | 2024-05-09 | [38065](https://github.com/airbytehq/airbyte/pull/38065) | Replace deprecated authentication mechanism to up-to-date one |
| 2.5.9 | 2024-05-02 | [37749](https://github.com/airbytehq/airbyte/pull/37749) | Adding mock server tests for bulk streams |
| 2.5.8 | 2024-04-30 | [37340](https://github.com/airbytehq/airbyte/pull/37340) | Source Salesforce: reduce info logs |
| 2.5.7 | 2024-04-24 | [36657](https://github.com/airbytehq/airbyte/pull/36657) | Schema descriptions |
Expand Down

0 comments on commit 676b52e

Please sign in to comment.