Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Source Salesforce: Migrating to non-deprecated authenticator #38065

Merged
merged 4 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
from datetime import datetime
from pathlib import Path
from typing import Dict

import pendulum
import pytest
Expand Down Expand Up @@ -39,6 +40,10 @@ def sf(input_sandbox_config):
return sf


def _authentication_headers(salesforce: Salesforce) -> Dict[str, str]:
return {"Authorization": f"Bearer {salesforce.access_token}"}


@pytest.fixture(scope="module")
def stream_name():
return "ContentNote"
Expand Down Expand Up @@ -75,8 +80,8 @@ def get_stream_state():
return {"LastModifiedDate": pendulum.now(tz="UTC").add(days=-1).isoformat(timespec="milliseconds")}


def test_update_for_deleted_record(stream):
headers = stream.authenticator.get_auth_header()
def test_update_for_deleted_record(stream, sf):
headers = _authentication_headers(sf)
stream_state = get_stream_state()
time.sleep(1)
response = create_note(stream, headers)
Expand Down Expand Up @@ -138,8 +143,8 @@ def test_update_for_deleted_record(stream):
assert response.status_code == 404, "Expected an update to a deleted note to return 404"


def test_deleted_record(stream):
headers = stream.authenticator.get_auth_header()
def test_deleted_record(stream, sf):
headers = _authentication_headers(sf)
response = create_note(stream, headers)
assert response.status_code == 201, "Note was note created"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: b117307c-14b6-41aa-9422-947e34922962
dockerImageTag: 2.5.9
dockerImageTag: 2.5.10
dockerRepository: airbyte/source-salesforce
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
githubIssueLabel: source-salesforce
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.5.9"
version = "2.5.10"
name = "source-salesforce"
description = "Source implementation for Salesforce."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField, FinalStateCursor
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from airbyte_protocol.models import FailureType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def _fetch_next_page_for_chunk(
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
request = self._create_prepared_request(
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
headers=dict(request_headers, **self.authenticator.get_auth_header()),
headers=dict(request_headers),
params=self.request_params(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, property_chunk=property_chunk
),
Expand Down Expand Up @@ -365,7 +365,6 @@ def _send_http_request(self, method: str, url: str, json: dict = None, headers:
return self._non_retryable_send_http_request(method, url, json, headers, stream)

def _non_retryable_send_http_request(self, method: str, url: str, json: dict = None, headers: dict = None, stream: bool = False):
headers = self.authenticator.get_auth_header() if not headers else headers | self.authenticator.get_auth_header()
response = self._session.request(method, url=url, headers=headers, json=json, stream=stream)
if response.status_code not in [200, 204]:
self.logger.error(f"error body: {response.text}, sobject options: {self.sobject_options}")
Expand Down Expand Up @@ -706,7 +705,7 @@ def get_standard_instance(self) -> SalesforceStream:
stream_name=self.stream_name,
schema=self.schema,
sobject_options=self.sobject_options,
authenticator=self.authenticator,
authenticator=self._session.auth,
)
new_cls: Type[SalesforceStream] = RestSalesforceStream
if isinstance(self, BulkIncrementalSalesforceStream):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def setUp(self) -> None:
self._http_mocker = HttpMocker()
self._http_mocker.__enter__()

given_authentication(self._http_mocker, _CLIENT_ID, _CLIENT_SECRET, _REFRESH_TOKEN, _INSTANCE_URL)
given_authentication(self._http_mocker, _CLIENT_ID, _CLIENT_SECRET, _REFRESH_TOKEN, _INSTANCE_URL, _ACCESS_TOKEN)

def tearDown(self) -> None:
self._http_mocker.__exit__(None, None, None)
Expand Down Expand Up @@ -197,7 +197,7 @@ def test_given_job_is_failed_when_read_then_switch_to_standard(self):
JobInfoResponseBuilder().with_id(_JOB_ID).with_state("Failed").build(),
)
self._http_mocker.get(
create_standard_http_request(_STREAM_NAME, [_A_FIELD_NAME]),
create_standard_http_request(_STREAM_NAME, [_A_FIELD_NAME], _ACCESS_TOKEN),
create_standard_http_response([_A_FIELD_NAME]),
)
self._mock_delete_job(_JOB_ID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
_STREAM_NAME = UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS[0]


def create_http_request(stream_name: str, field_names: List[str]) -> HttpRequest:
return HttpRequest(f"{_BASE_URL}/queryAll?q=SELECT+{','.join(field_names)}+FROM+{stream_name}+")
def create_http_request(stream_name: str, field_names: List[str], access_token: Optional[str] = None) -> HttpRequest:
return HttpRequest(
f"{_BASE_URL}/queryAll?q=SELECT+{','.join(field_names)}+FROM+{stream_name}+",
headers={"Authorization": f"Bearer {access_token}"} if access_token else None
)


def create_http_response(field_names: List[str], record_count: int = 1) -> HttpResponse:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ def read(
return entrypoint_read(_source(catalog, config, state), config, catalog, state, expecting_exception)


def given_authentication(http_mocker: HttpMocker, client_id: str, client_secret: str, refresh_token: str, instance_url: str) -> None:
def given_authentication(http_mocker: HttpMocker, client_id: str, client_secret: str, refresh_token: str, instance_url: str, access_token: str = "any_access_token") -> None:
http_mocker.post(
HttpRequest(
"https://login.salesforce.com/services/oauth2/token",
query_params=ANY_QUERY_PARAMS,
body=f"grant_type=refresh_token&client_id={client_id}&client_secret={client_secret}&refresh_token={refresh_token}"
),
HttpResponse(json.dumps({"access_token": "any_access_token", "instance_url": instance_url})),
HttpResponse(json.dumps({"access_token": access_token, "instance_url": instance_url})),
)


Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :----------------------------------------------------------------------------------------------------------------------------------- |
| 2.5.10 | 2024-05-09 | [38065](https://github.com/airbytehq/airbyte/pull/38065) | Replace deprecated authentication mechanism to up-to-date one |
| 2.5.9 | 2024-05-02 | [37749](https://github.com/airbytehq/airbyte/pull/37749) | Adding mock server tests for bulk streams |
| 2.5.8 | 2024-04-30 | [37340](https://github.com/airbytehq/airbyte/pull/37340) | Source Salesforce: reduce info logs |
| 2.5.7 | 2024-04-24 | [36657](https://github.com/airbytehq/airbyte/pull/36657) | Schema descriptions |
Expand Down
Loading