From 5a0d7deb347832b985136d4169e26075331079f6 Mon Sep 17 00:00:00 2001 From: Dan Mosora Date: Wed, 4 Mar 2020 20:28:47 +0000 Subject: [PATCH 1/3] Check existence of export file with request before resuming sync --- tap_marketo/client.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/tap_marketo/client.py b/tap_marketo/client.py index 336eb8e..ad7dd84 100644 --- a/tap_marketo/client.py +++ b/tap_marketo/client.py @@ -207,7 +207,8 @@ def request(self, method, url, endpoint_name=None, **kwargs): return data else: - if resp.status_code != 200: + # NB: 206 Partial Content returned when checking for file existence + if resp.status_code not in [200, 206]: raise ApiException("Marketo API returned error: {0.status_code}: {0.content}".format(resp)) return resp @@ -238,18 +239,35 @@ def cancel_export(self, stream_type, export_id): endpoint_name = "{}_cancel".format(stream_type) self.request("POST", endpoint, endpoint_name=endpoint_name) - def get_existing_export_ids(self, stream_type): + def get_existing_exports(self, stream_type): endpoint = "bulk/v1/{}/export.json".format(stream_type) result = self.request( "GET", endpoint, params={"status": ["Created", "Queued", "Processing", "Completed"]}) if "result" in result: - return {r["exportId"] for r in result["result"]} + return {r["exportId"]: r for r in result["result"]} else: return set() + def export_file_exists(self, stream_type, export_id, existing_exports): + if existing_exports.get(export_id, {}).get("status") != "Completed": + # If the export is not finished, return existence and continue polling + return True + + # Request 0 bytes to see if the file can be found + endpoint = self.get_bulk_endpoint(stream_type, "file", export_id) + endpoint_name = "{}_stream".format(stream_type) + try: + result = self.request("GET", endpoint, endpoint_name=endpoint_name, stream=True, headers={"Range": "bytes=0-0"}) + return True + except requests.exceptions.HTTPError as ex: + if ex.response.status_code == 404: + return False + raise + def export_available(self, stream_type, export_id): - return export_id in self.get_existing_export_ids(stream_type) + existing_exports = self.get_existing_exports(stream_type) + return export_id in existing_exports and self.export_file_exists(stream_type, export_id, existing_exports) def get_export_status(self, stream_type, export_id): endpoint = self.get_bulk_endpoint(stream_type, "status", export_id) From 94915b8f654bc6f065f7299eee8fe00e64a16ba6 Mon Sep 17 00:00:00 2001 From: Dan Mosora Date: Wed, 4 Mar 2020 20:46:18 +0000 Subject: [PATCH 2/3] Fix pylint --- tap_marketo/client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tap_marketo/client.py b/tap_marketo/client.py index ad7dd84..42f30f0 100644 --- a/tap_marketo/client.py +++ b/tap_marketo/client.py @@ -258,7 +258,8 @@ def export_file_exists(self, stream_type, export_id, existing_exports): endpoint = self.get_bulk_endpoint(stream_type, "file", export_id) endpoint_name = "{}_stream".format(stream_type) try: - result = self.request("GET", endpoint, endpoint_name=endpoint_name, stream=True, headers={"Range": "bytes=0-0"}) + # Range described here: https://developers.marketo.com/rest-api/bulk-extract/#crayon-5e600bb5f1a53663868461 + self.request("GET", endpoint, endpoint_name=endpoint_name, stream=True, headers={"Range": "bytes=0-0"}) return True except requests.exceptions.HTTPError as ex: if ex.response.status_code == 404: From 778b91f5d324399b25879ca0e87a1dd8318edb5b Mon Sep 17 00:00:00 2001 From: Dan Mosora Date: Wed, 4 Mar 2020 21:27:22 +0000 Subject: [PATCH 3/3] Code review changes --- tap_marketo/client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tap_marketo/client.py b/tap_marketo/client.py index 42f30f0..dd339de 100644 --- a/tap_marketo/client.py +++ b/tap_marketo/client.py @@ -247,7 +247,7 @@ def get_existing_exports(self, stream_type): if "result" in result: return {r["exportId"]: r for r in result["result"]} else: - return set() + return dict() def export_file_exists(self, stream_type, export_id, existing_exports): if existing_exports.get(export_id, {}).get("status") != "Completed": @@ -267,8 +267,10 @@ def export_file_exists(self, stream_type, export_id, existing_exports): raise def export_available(self, stream_type, export_id): + # NB: Marketo may return that an export is Completed, but the file doesn't exist, so we need to check both. existing_exports = self.get_existing_exports(stream_type) - return export_id in existing_exports and self.export_file_exists(stream_type, export_id, existing_exports) + export_id_exists = export_id in existing_exports + return export_id_exists and self.export_file_exists(stream_type, export_id, existing_exports) def get_export_status(self, stream_type, export_id): endpoint = self.get_bulk_endpoint(stream_type, "status", export_id)