Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check existence of export file with request before resuming sync #62

Merged
merged 3 commits into from
Mar 4, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions tap_marketo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ def request(self, method, url, endpoint_name=None, **kwargs):

return data
else:
if resp.status_code != 200:
# NB: 206 Partial Content returned when checking for file existence
if resp.status_code not in [200, 206]:
raise ApiException("Marketo API returned error: {0.status_code}: {0.content}".format(resp))

return resp
Expand Down Expand Up @@ -238,18 +239,35 @@ def cancel_export(self, stream_type, export_id):
endpoint_name = "{}_cancel".format(stream_type)
self.request("POST", endpoint, endpoint_name=endpoint_name)

def get_existing_export_ids(self, stream_type):
def get_existing_exports(self, stream_type):
endpoint = "bulk/v1/{}/export.json".format(stream_type)
result = self.request(
"GET", endpoint,
params={"status": ["Created", "Queued", "Processing", "Completed"]})
if "result" in result:
return {r["exportId"] for r in result["result"]}
return {r["exportId"]: r for r in result["result"]}
else:
return set()
dmosorast marked this conversation as resolved.
Show resolved Hide resolved

def export_file_exists(self, stream_type, export_id, existing_exports):
if existing_exports.get(export_id, {}).get("status") != "Completed":
# If the export is not finished, return existence and continue polling
return True

# Request 0 bytes to see if the file can be found
endpoint = self.get_bulk_endpoint(stream_type, "file", export_id)
endpoint_name = "{}_stream".format(stream_type)
try:
result = self.request("GET", endpoint, endpoint_name=endpoint_name, stream=True, headers={"Range": "bytes=0-0"})
return True
except requests.exceptions.HTTPError as ex:
if ex.response.status_code == 404:
return False
raise

def export_available(self, stream_type, export_id):
return export_id in self.get_existing_export_ids(stream_type)
existing_exports = self.get_existing_exports(stream_type)
return export_id in existing_exports and self.export_file_exists(stream_type, export_id, existing_exports)
dmosorast marked this conversation as resolved.
Show resolved Hide resolved

def get_export_status(self, stream_type, export_id):
endpoint = self.get_bulk_endpoint(stream_type, "status", export_id)
Expand Down