Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Salesforce: fix response encoding #17314

Merged
merged 6 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@
- name: Salesforce
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
dockerRepository: airbyte/source-salesforce
dockerImageTag: 1.0.18
dockerImageTag: 1.0.19
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
icon: salesforce.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9763,7 +9763,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-salesforce:1.0.18"
- dockerImage: "airbyte/source-salesforce:1.0.19"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.0.18
LABEL io.airbyte.version=1.0.19

LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ tests:
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"

discovery:
- config_path: "secrets/config.json"
basic_read:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,6 @@ def __init__(
self.schema: Mapping[str, Any] = schema # type: ignore[assignment]
self.sobject_options = sobject_options

def decode(self, chunk):
"""
Most Salesforce instances use UTF-8, but some use ISO-8859-1.
By default, we'll decode using UTF-8, and fallback to ISO-8859-1 if it doesn't work.
See implementation considerations for more details https://developer.salesforce.com/docs/atlas.en-us.api.meta/api/implementation_considerations.htm
"""
if self.encoding == DEFAULT_ENCODING:
try:
decoded = chunk.decode(self.encoding)
return decoded
except UnicodeDecodeError as e:
self.encoding = "ISO-8859-1"
self.logger.info(f"Could not decode chunk. Falling back to {self.encoding} encoding. Error: {e}")
return self.decode(chunk)
else:
return chunk.decode(self.encoding)

@property
def name(self) -> str:
return self.stream_name
Expand Down Expand Up @@ -206,6 +189,11 @@ def create_stream_job(self, query: str, url: str) -> Optional[str]:
f"Cannot receive data for stream '{self.name}' ,"
f"sobject options: {self.sobject_options}, error message: '{error_message}'"
)
elif error.response.status_code == codes.FORBIDDEN and error_code == "REQUEST_LIMIT_EXCEEDED":
self.logger.error(
f"Cannot receive data for stream '{self.name}' ,"
f"sobject options: {self.sobject_options}, Error message: '{error_data.get('message')}'"
)
elif error.response.status_code == codes.BAD_REQUEST and error_message.endswith("does not support query"):
self.logger.error(
f"The stream '{self.name}' is not queryable, "
Expand Down Expand Up @@ -281,34 +269,34 @@ def filter_null_bytes(self, b: bytes):
self.logger.warning("Filter 'null' bytes from string, size reduced %d -> %d chars", len(b), len(res))
return res

def download_data(self, url: str, chunk_size: float = 1024) -> os.PathLike:
def download_data(self, url: str, chunk_size: int = 1024) -> tuple[str, str]:
"""
Retrieves binary data result from successfully `executed_job`, using chunks, to avoid local memory limitaions.
Retrieves binary data result from successfully `executed_job`, using chunks, to avoid local memory limitations.
@ url: string - the url of the `executed_job`
@ chunk_size: float - the buffer size for each chunk to fetch from stream, in bytes, default: 1024 bytes

Returns the string with file path of downloaded binary data. Saved temporarily.
@ chunk_size: int - the buffer size for each chunk to fetch from stream, in bytes, default: 1024 bytes
Return the tuple containing string with file path of downloaded binary data (Saved temporarily) and file encoding.
"""
# set filepath for binary data from response
tmp_file = os.path.realpath(os.path.basename(url))
with closing(self._send_http_request("GET", f"{url}/results", stream=True)) as response:
with open(tmp_file, "wb") as data_file:
for chunk in response.iter_content(chunk_size=chunk_size):
data_file.write(self.filter_null_bytes(chunk))
with closing(self._send_http_request("GET", f"{url}/results", stream=True)) as response, open(tmp_file, "wb") as data_file:
response_encoding = response.encoding or response.apparent_encoding or self.encoding
for chunk in response.iter_content(chunk_size=chunk_size):
data_file.write(self.filter_null_bytes(chunk))
# check the file exists
if os.path.isfile(tmp_file):
return tmp_file
return tmp_file, response_encoding
else:
raise TmpFileIOError(f"The IO/Error occured while verifying binary data. Stream: {self.name}, file {tmp_file} doesn't exist.")

def read_with_chunks(self, path: str = None, chunk_size: int = 100) -> Iterable[Tuple[int, Mapping[str, Any]]]:
def read_with_chunks(self, path: str, file_encoding: str, chunk_size: int = 100) -> Iterable[Tuple[int, Mapping[str, Any]]]:
"""
Reads the downloaded binary data, using lines chunks, set by `chunk_size`.
@ path: string - the path to the downloaded temporarily binary data.
@ file_encoding: string - encoding for binary data file according to Standard Encodings from codecs module
@ chunk_size: int - the number of lines to read at a time, default: 100 lines / time.
"""
try:
with open(path, "r", encoding=self.encoding) as data:
with open(path, "r", encoding=file_encoding) as data:
chunks = pd.read_csv(data, chunksize=chunk_size, iterator=True, dialect="unix")
for chunk in chunks:
chunk = chunk.replace({nan: None}).to_dict(orient="records")
Expand Down Expand Up @@ -382,7 +370,7 @@ def read_records(

count = 0
record: Mapping[str, Any] = {}
for record in self.read_with_chunks(self.download_data(url=job_full_url)):
for record in self.read_with_chunks(*self.download_data(url=job_full_url)):
count += 1
yield record
self.delete_job(url=job_full_url)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import pytest
import requests_mock
from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode, Type
from conftest import generate_stream
from conftest import encoding_symbols_parameters, generate_stream
from requests.exceptions import HTTPError
from source_salesforce.source import SourceSalesforce
from source_salesforce.streams import (
Expand Down Expand Up @@ -184,14 +184,29 @@ def test_download_data_filter_null_bytes(stream_config, stream_api):

with requests_mock.Mocker() as m:
m.register_uri("GET", f"{job_full_url}/results", content=b"\x00")
res = list(stream.read_with_chunks(stream.download_data(url=job_full_url)))
res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url)))
assert res == []

m.register_uri("GET", f"{job_full_url}/results", content=b'"Id","IsDeleted"\n\x00"0014W000027f6UwQAI","false"\n\x00\x00')
res = list(stream.read_with_chunks(stream.download_data(url=job_full_url)))
res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url)))
assert res == [{"Id": "0014W000027f6UwQAI", "IsDeleted": False}]


@pytest.mark.parametrize(
"chunk_size, content_type, content, expected_result",
encoding_symbols_parameters(),
ids=[f"charset: {x[1]}, chunk_size: {x[0]}" for x in encoding_symbols_parameters()],
)
def test_encoding_symbols(stream_config, stream_api, chunk_size, content_type, content, expected_result):
job_full_url: str = "https://fase-account.salesforce.com/services/data/v52.0/jobs/query/7504W00000bkgnpQAA"
stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api)

with requests_mock.Mocker() as m:
m.register_uri("GET", f"{job_full_url}/results", headers={"Content-Type": f"text/html; charset={content_type}"}, content=content)
res = list(stream.read_with_chunks(*stream.download_data(url=job_full_url, chunk_size=chunk_size)))
assert res == expected_result


@pytest.mark.parametrize(
"login_status_code, login_json_resp, discovery_status_code, discovery_resp_json, expected_error_msg",
(
Expand Down Expand Up @@ -415,7 +430,7 @@ def test_csv_reader_dialect_unix():

with requests_mock.Mocker() as m:
m.register_uri("GET", url + "/results", text=text)
result = [i for i in stream.read_with_chunks(stream.download_data(url))]
result = [i for i in stream.read_with_chunks(*stream.download_data(url))]
assert result == data


Expand Down Expand Up @@ -513,10 +528,3 @@ def test_convert_to_standard_instance(stream_config, stream_api):
bulk_stream = generate_stream("Account", stream_config, stream_api)
rest_stream = bulk_stream.get_standard_instance()
assert isinstance(rest_stream, IncrementalSalesforceStream)


def test_decoding(stream_config, stream_api):
stream_name = "AcceptedEventRelation"
stream = generate_stream(stream_name, stream_config, stream_api)
assert stream.decode(b"\xe9\x97\xb4\xe5\x8d\x95\xe7\x9a\x84\xe8\xaf\xb4 \xf0\x9f\xaa\x90") == "间单的说 🪐"
assert stream.decode(b"0\xe5") == "0å"
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,15 @@ def stream_api_v2(stream_config):

def generate_stream(stream_name, stream_config, stream_api):
return SourceSalesforce.generate_streams(stream_config, {stream_name: None}, stream_api)[0]


def encoding_symbols_parameters():
return [(x, "ISO-8859-1", b'"\xc4"\n,"4"\n\x00,"\xca \xfc"', [{"Ä": "4"}, {"Ä": "Ê ü"}]) for x in range(1, 11)] + [
(
x,
"utf-8",
b'"\xd5\x80"\n "\xd5\xaf","\xd5\xaf"\n\x00,"\xe3\x82\x82 \xe3\x83\xa4 \xe3\x83\xa4 \xf0\x9d\x9c\xb5"',
[{"Հ": "կ"}, {"Հ": "も ヤ ヤ 𝜵"}],
)
for x in range(1, 11)
]
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
(
(1000, 0.4, 1),
(10000, 1, 2),
(100000, 4, 7),
(200000, 7, 12),
(100000, 4, 9),
(200000, 7, 19),
),
ids=[
"1k recods",
Expand All @@ -36,7 +36,7 @@ def test_memory_download_data(stream_config, stream_api, n_records, first_size,
with requests_mock.Mocker() as m:
m.register_uri("GET", f"{job_full_url}/results", content=content)
tracemalloc.start()
for x in stream.read_with_chunks(stream.download_data(url=job_full_url)):
for x in stream.read_with_chunks(*stream.download_data(url=job_full_url)):
pass
fs, fp = tracemalloc.get_traced_memory()
first_size_in_mb, first_peak_in_mb = fs / 1024**2, fp / 1024**2
Expand Down