Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Marketo: certify GA #17445

Merged
merged 6 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@
documentationUrl: https://docs.airbyte.io/integrations/sources/marketo
icon: marketo.svg
sourceType: api
releaseStage: beta
releaseStage: generally_available
- name: Metabase
sourceDefinitionId: c7cb421b-942e-4468-99ee-e369bcabaec5
dockerRepository: airbyte/source-metabase
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-marketo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_marketo ./source_marketo
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/source-marketo
1 change: 1 addition & 0 deletions airbyte-integrations/connectors/source-marketo/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

TEST_REQUIREMENTS = [
"pytest~=6.1",
"pytest-faker==2.0.0",
"pytest-mock~=3.6.1",
"requests-mock",
"source-acceptance-test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import contextlib
import csv
import datetime
import io
import json
import os
from abc import ABC
from time import sleep
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
Expand Down Expand Up @@ -215,6 +216,16 @@ def sleep_till_export_completed(self, stream_slice: Mapping[str, Any]) -> bool:

sleep(self.poll_interval)

@contextlib.contextmanager
def _csv_response_reader(self, response: requests.Response) -> csv.DictReader:
tmp_file = os.path.realpath(os.path.basename(response.request.url))
with open(tmp_file, "wb") as data_file:
for chunk in response.iter_content(chunk_size=1024):
data_file.write(chunk)
with open(tmp_file, "r") as data:
yield csv.DictReader(data)
os.remove(tmp_file)

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
response.text example:
Expand All @@ -228,22 +239,21 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
default_prop = {"type": ["null", "string"]}
schema = self.get_json_schema()["properties"]

fp = io.StringIO(response.text)
reader = csv.DictReader(fp)
for record in reader:
new_record = {**record}
attributes = json.loads(new_record.pop("attributes", "{}"))
for key, value in attributes.items():
key = clean_string(key)
new_record[key] = value

for key, value in new_record.items():
if key not in schema:
self.logger.warning("Field '%s' not found in stream '%s' spec", key, self.name)
prop = schema.get(key, default_prop)
value = format_value(value, prop)
new_record[key] = value
yield new_record
with self._csv_response_reader(response) as reader:
for record in reader:
new_record = {**record}
attributes = json.loads(new_record.pop("attributes", "{}"))
for key, value in attributes.items():
key = clean_string(key)
new_record[key] = value

for key, value in new_record.items():
if key not in schema:
self.logger.warning("Field '%s' not found in stream '%s' spec", key, self.name)
prop = schema.get(key, default_prop)
value = format_value(value, prop)
new_record[key] = value
yield new_record

def read_records(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import os.path
import sys
import time
import pendulum
import pytest
from source_marketo.source import Activities, MarketoAuthenticator
Expand Down Expand Up @@ -60,3 +63,31 @@ def send_email_stream(config, activity):
stream_name = f"activities_{activity['name']}"
cls = type(stream_name, (Activities,), {"activity": activity})
return cls(config)


@pytest.fixture
def file_generator(faker):
def _generator(min_size: int):
print(f"Generating a test file of {min_size // 1024 ** 2} MB, this could take some time")

def fake_records_gen():
new_line = "\n"
for i in range(1000):
yield f"{str(faker.random_int())},{faker.random_int()},{faker.date_of_birth()},{faker.random_int()}," \
f"{faker.random_int()},{faker.email()},{faker.postcode()}{new_line}"

size, records = 0, 0
path = os.path.realpath(str(time.time()))
with open(path, 'w') as output:
output.write("marketoGUID,leadId,activityDate,activityTypeId,campaignId,primaryAttributeValueId,primaryAttributeValue\n")
while size < min_size:
frg = fake_records_gen()
print("Writing another 1000 records..")
for person in frg:
output.write(person)
records += 1
size += sys.getsizeof(person)
print(f"Finished: {records} records written to {path}")
return path, records

return _generator
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import os
import logging
import tracemalloc
from unittest.mock import ANY, Mock, patch

from functools import partial
import pytest
from airbyte_cdk.models.airbyte_protocol import SyncMode
from source_marketo.source import Activities, Campaigns, MarketoStream, Programs, SourceMarketo
Expand Down Expand Up @@ -98,7 +100,7 @@ def test_activities_schema(activity, expected_schema, config):
"response_text, expected_records",
(
(
"""Campaign Run ID,Choice Number,Has Predictive,Step ID,Test Variant,attributes
b"""Campaign Run ID,Choice Number,Has Predictive,Step ID,Test Variant,attributes
1,3,true,10,15,{"spam": "true"}
2,3,false,11,16,{"spam": "false"}""",
[
Expand All @@ -123,7 +125,49 @@ def test_activities_schema(activity, expected_schema, config):
),
)
def test_export_parse_response(send_email_stream, response_text, expected_records):
assert list(send_email_stream.parse_response(Mock(text=response_text))) == expected_records
def iter_content(*args, **kwargs):
yield response_text
assert list(send_email_stream.parse_response(Mock(iter_content=iter_content, request=Mock(url="/send_email/1")))) == expected_records


def test_memory_usage(send_email_stream, file_generator):
min_file_size = 5 * (1024 ** 2) # 5 MB
big_file_path, records_generated = file_generator(min_size=min_file_size)
small_file_path, _ = file_generator(min_size=1)

def iter_content(chunk_size=1024, file_path=""):
with open(file_path, "rb") as file:
while True:
chunk = file.read(chunk_size)
if not chunk:
break
yield chunk

tracemalloc.start()
records = 0

for _ in send_email_stream.parse_response(
Mock(iter_content=partial(iter_content, file_path=big_file_path), request=Mock(url="/send_email/1"))
):
records += 1
_, big_file_peak = tracemalloc.get_traced_memory()
assert records == records_generated

tracemalloc.reset_peak()
tracemalloc.clear_traces()

for _ in send_email_stream.parse_response(
Mock(iter_content=partial(iter_content, file_path=small_file_path), request=Mock(url="/send_email/1"))
):
pass
_, small_file_peak = tracemalloc.get_traced_memory()

os.remove(big_file_path)
os.remove(small_file_path)
# First we run parse_response() on a large file and track how much memory was consumed.
# Then we do the same with a tiny file. The goal is not to load the whole file into memory when parsing the response,
# so we assert the memory consumed was almost the same for two runs. Allowed delta is 50 KB which is 1% of a big file size.
assert abs(big_file_peak - small_file_peak) < 50 * 1024


@pytest.mark.parametrize(
Expand Down
23 changes: 12 additions & 11 deletions docs/integrations/sources/marketo.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,15 @@ If the 50,000 limit is too stringent, contact Marketo support for a quota increa

## Changelog

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------|
| `0.1.9` | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream sate. |
| `0.1.7` | 2022-08-23 | [15817](https://github.com/airbytehq/airbyte/pull/15817) | Improved unit test coverage |
| `0.1.6` | 2022-08-21 | [15824](https://github.com/airbytehq/airbyte/pull/15824) | Fix semi incremental streams: do not ignore start date, make one api call instead of multiple |
| `0.1.5` | 2022-08-16 | [15683](https://github.com/airbytehq/airbyte/pull/15683) | Retry failed creation of a job instead of skipping it |
| `0.1.4` | 2022-06-20 | [13930](https://github.com/airbytehq/airbyte/pull/13930) | Process failing creation of export jobs |
| `0.1.3` | 2021-12-10 | [8429](https://github.com/airbytehq/airbyte/pull/8578) | Updated titles and descriptions |
| `0.1.2` | 2021-12-03 | [8483](https://github.com/airbytehq/airbyte/pull/8483) | Improve field conversion to conform schema |
| `0.1.1` | 2021-11-29 | [0000](https://github.com/airbytehq/airbyte/pull/0000) | Fix timestamp value format issue |
| `0.1.0` | 2021-09-06 | [5863](https://github.com/airbytehq/airbyte/pull/5863) | Release Marketo CDK Connector |
| Version | Date | Pull Request | Subject |
|:---------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------|
| `0.1.10` | 2022-09-30 | [17445](https://github.com/airbytehq/airbyte/pull/17445) | Optimize memory consumption |
| `0.1.9` | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream sate. |
| `0.1.7` | 2022-08-23 | [15817](https://github.com/airbytehq/airbyte/pull/15817) | Improved unit test coverage |
| `0.1.6` | 2022-08-21 | [15824](https://github.com/airbytehq/airbyte/pull/15824) | Fix semi incremental streams: do not ignore start date, make one api call instead of multiple |
| `0.1.5` | 2022-08-16 | [15683](https://github.com/airbytehq/airbyte/pull/15683) | Retry failed creation of a job instead of skipping it |
| `0.1.4` | 2022-06-20 | [13930](https://github.com/airbytehq/airbyte/pull/13930) | Process failing creation of export jobs |
| `0.1.3` | 2021-12-10 | [8429](https://github.com/airbytehq/airbyte/pull/8578) | Updated titles and descriptions |
| `0.1.2` | 2021-12-03 | [8483](https://github.com/airbytehq/airbyte/pull/8483) | Improve field conversion to conform schema |
| `0.1.1` | 2021-11-29 | [0000](https://github.com/airbytehq/airbyte/pull/0000) | Fix timestamp value format issue |
| `0.1.0` | 2021-09-06 | [5863](https://github.com/airbytehq/airbyte/pull/5863) | Release Marketo CDK Connector |