Skip to content

Commit

Permalink
Remove invalid job ids from state (#57)
Browse files Browse the repository at this point in the history
* First pass at detecting that a bulk job has been deleted in Salesforce

* Fix copy/paste issues and pylint

* Add comment to explain the state changes as a result of resuming a bulk query

* The response in practice is JSON for this call

* Update resumed job bookmark comment

* Update correct portion of the resumed job comment
  • Loading branch information
dmosorast authored Nov 5, 2018
1 parent f63a805 commit 20ca560
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 2 deletions.
12 changes: 10 additions & 2 deletions tap_salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,22 @@ def do_sync(sf, catalog, state):
# Resuming a sync should clear out the remaining state once finished
counter = resume_syncing_bulk_query(sf, catalog_entry, job_id, state, counter)
LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter.value)
# Remove Job info from state once we complete this resumed query. One of a few cases could have occurred:
# 1. The job succeeded, in which case make JobHighestBookmarkSeen the new bookmark
# 2. The job partially completed, in which case make JobHighestBookmarkSeen the new bookmark, or
# existing bookmark if no bookmark exists for the Job.
# 3. The job completely failed, in which case maintain the existing bookmark, or None if no bookmark
state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('JobID', None)
state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('BatchIDs', None)
bookmark = state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('JobHighestBookmarkSeen', None)
bookmark = state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}) \
.pop('JobHighestBookmarkSeen', None)
existing_bookmark = state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}) \
.pop(replication_key, None)
state = singer.write_bookmark(
state,
catalog_entry['tap_stream_id'],
replication_key,
bookmark)
bookmark or existing_bookmark) # If job is removed, reset to existing bookmark or None
singer.write_state(state)
else:
# Tables with a replication_key or an empty bookmark will emit an
Expand Down
19 changes: 19 additions & 0 deletions tap_salesforce/salesforce/bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import tempfile
import singer
from singer import metrics
from requests.exceptions import RequestException

import xmltodict

Expand Down Expand Up @@ -216,6 +217,24 @@ def _poll_on_batch_status(self, job_id, batch_id):

return batch_status

def job_exists(self, job_id):
try:
endpoint = "job/{}".format(job_id)
url = self.bulk_url.format(self.sf.instance_url, endpoint)
headers = self._get_bulk_headers()

with metrics.http_request_timer("get_job"):
self.sf._make_request('GET', url, headers=headers)

return True # requests will raise for a 400 InvalidJob

except RequestException as ex:
if ex.response.headers["Content-Type"] == 'application/json':
exception_code = ex.response.json()['exceptionCode']
if exception_code == 'InvalidJob':
return False
raise

def _get_batches(self, job_id):
endpoint = "job/{}/batch".format(job_id)
url = self.bulk_url.format(self.sf.instance_url, endpoint)
Expand Down
4 changes: 4 additions & 0 deletions tap_salesforce/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ def resume_syncing_bulk_query(sf, catalog_entry, job_id, state, counter):
stream_version = get_stream_version(catalog_entry, state)
schema = catalog_entry['schema']

if not bulk.job_exists(job_id):
LOGGER.info("Found stored Job ID that no longer exists, resetting bookmark and removing JobID from state.")
return counter

# Iterate over the remaining batches, removing them once they are synced
for batch_id in batch_ids[:]:
for rec in bulk.get_batch_results(job_id, batch_id, catalog_entry):
Expand Down

0 comments on commit 20ca560

Please sign in to comment.