diff --git a/tap_salesforce/__init__.py b/tap_salesforce/__init__.py index eabae6d5..bbd500c1 100644 --- a/tap_salesforce/__init__.py +++ b/tap_salesforce/__init__.py @@ -1,16 +1,13 @@ #!/usr/bin/env python3 import json import sys -import time import singer import singer.metrics as metrics import singer.utils as singer_utils -from singer import (metadata, - transform, - UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING, - Transformer, _transform_datetime) +from singer import metadata import tap_salesforce.salesforce +from tap_salesforce.sync import (sync_stream, resume_syncing_bulk_query, get_stream_version) from tap_salesforce.salesforce import Salesforce from tap_salesforce.salesforce.exceptions import ( TapSalesforceException, TapSalesforceQuotaExceededException) @@ -31,9 +28,6 @@ 'start_date': None } -BLACKLISTED_FIELDS = set(['attributes']) - - def get_replication_key(sobject_name, fields): fields_list = [f['name'] for f in fields] @@ -47,11 +41,9 @@ def get_replication_key(sobject_name, fields): return 'LoginTime' return None - def stream_is_selected(mdata): return mdata.get((), {}).get('selected', False) - def build_state(raw_state, catalog): state = {} @@ -63,6 +55,15 @@ def build_state(raw_state, catalog): tap_stream_id, 'version') + # Preserve state that deals with resuming an incomplete bulk job + if singer.get_bookmark(raw_state, tap_stream_id, 'JobID'): + job_id = singer.get_bookmark(raw_state, tap_stream_id, 'JobID') + batches = singer.get_bookmark(raw_state, tap_stream_id, 'BatchIDs') + current_bookmark = singer.get_bookmark(raw_state, tap_stream_id, 'JobHighestBookmarkSeen') + state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id) + state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batches) + state = singer.write_bookmark(state, tap_stream_id, 'JobHighestBookmarkSeen', current_bookmark) + if replication_method == 'INCREMENTAL': replication_key = catalog_entry.get('replication_key') replication_key_value = singer.get_bookmark(raw_state, @@ -255,164 +256,74 @@ def do_discover(sf): result = {'streams': entries} json.dump(result, sys.stdout, indent=4) +def do_sync(sf, catalog, state): + starting_stream = state.get("current_stream") -def remove_blacklisted_fields(data): - return {k: v for k, v in data.items() if k not in BLACKLISTED_FIELDS} - -# pylint: disable=unused-argument -def transform_bulk_data_hook(data, typ, schema): - result = data - if isinstance(data, dict): - result = remove_blacklisted_fields(data) - - # Salesforce Bulk API returns CSV's with empty strings for text fields. - # When the text field is nillable and the data value is an empty string, - # change the data so that it is None. - if data == "" and "null" in schema['type']: - result = None - - return result - - -def get_stream_version(catalog_entry, state): - tap_stream_id = catalog_entry['tap_stream_id'] - replication_key = catalog_entry.get('replication_key') - - stream_version = (singer.get_bookmark(state, tap_stream_id, 'version') or - int(time.time() * 1000)) - - if replication_key: - return stream_version - - return int(time.time() * 1000) - - -def do_sync(sf, catalog, state, start_time): - for catalog_entry in catalog['streams']: - mdata = metadata.to_map(catalog_entry['metadata']) - is_selected = stream_is_selected(mdata) - - if not is_selected: - continue + if starting_stream: + LOGGER.info("Resuming sync from %s", starting_stream) + else: + LOGGER.info("Starting sync") + for catalog_entry in catalog["streams"]: + stream_version = get_stream_version(catalog_entry, state) stream = catalog_entry['stream'] - schema = catalog_entry['schema'] stream_alias = catalog_entry.get('stream_alias') - - replication_key = catalog_entry.get('replication_key') - - bookmark_is_empty = state.get( - 'bookmarks', {}).get( - catalog_entry['tap_stream_id']) is None - stream_version = get_stream_version(catalog_entry, state) + stream_name = catalog_entry["tap_stream_id"] activate_version_message = singer.ActivateVersionMessage( stream=(stream_alias or stream), version=stream_version) + replication_key = catalog_entry.get('replication_key') - LOGGER.info('Syncing Salesforce data for stream %s', stream) + mdata = metadata.to_map(catalog_entry['metadata']) + if not stream_is_selected(mdata): + LOGGER.info("%s: Skipping - not selected", stream_name) + continue + + if starting_stream: + if starting_stream == stream_name: + LOGGER.info("%s: Resuming", stream_name) + starting_stream = None + else: + LOGGER.info("%s: Skipping - already synced", stream_name) + continue + else: + LOGGER.info("%s: Starting", stream_name) + + state["current_stream"] = stream_name + singer.write_state(state) singer.write_schema( stream, - schema, + catalog_entry['schema'], catalog_entry['key_properties'], replication_key, stream_alias) - # Tables with a replication_key or an empty bookmark will emit an - # activate_version at the beginning of their sync - if replication_key or bookmark_is_empty: - singer.write_message(activate_version_message) - state = singer.write_bookmark(state, - catalog_entry['tap_stream_id'], - 'version', - stream_version) - - chunked_bookmark = singer_utils.strptime_with_tz(sf.get_start_date(state, catalog_entry)) - with Transformer(pre_hook=transform_bulk_data_hook) as transformer: - with metrics.job_timer('sync_table') as timer: - timer.tags['stream'] = stream - - with metrics.record_counter(stream) as counter: - try: - time_extracted = singer_utils.now() - - for rec in sf.query(catalog_entry, state): - counter.increment() - rec = transformer.transform(rec, schema) - rec = fix_record_anytype(rec, schema) - singer.write_message( - singer.RecordMessage( - stream=( - stream_alias or stream), - record=rec, - version=stream_version, - time_extracted=time_extracted)) - - replication_key_value = replication_key and singer_utils.strptime_with_tz( - rec[replication_key]) - - if sf.pk_chunking: - if replication_key_value and replication_key_value <= start_time and replication_key_value > chunked_bookmark: - chunked_bookmark = singer_utils.strptime_with_tz(rec[replication_key]) - # Before writing a bookmark, make sure Salesforce has not given us a - # record with one outside our range - elif replication_key_value and replication_key_value <= start_time: - state = singer.write_bookmark( - state, - catalog_entry['tap_stream_id'], - replication_key, - rec[replication_key]) - singer.write_state(state) - - # Tables with no replication_key will send an - # activate_version message for the next sync - if not replication_key: - singer.write_message(activate_version_message) - state = singer.write_bookmark( - state, catalog_entry['tap_stream_id'], 'version', None) - - # If pk_chunking is set, only write a bookmark at the end - if sf.pk_chunking: - # Write a bookmark with the highest value we've seen - state = singer.write_bookmark( - state, - catalog_entry['tap_stream_id'], - replication_key, - singer_utils.strptime(chunked_bookmark)) - - singer.write_state(state) - - except TapSalesforceException as ex: - raise type(ex)("Error syncing {}: {}".format( - stream, ex)) - except Exception as ex: - raise Exception( - "Unexpected error syncing {}: {}".format( - stream, ex)) from ex - - -def fix_record_anytype(rec, schema): - """Modifies a record when the schema has no 'type' element due to a SF type of 'anyType.' - Attempts to set the record's value for that element to an int, float, or string.""" - def try_cast(val, coercion): - try: - return coercion(val) - except BaseException: - return val - - for k, v in rec.items(): - if schema['properties'][k].get("type") is None: - val = v - val = try_cast(v, int) - val = try_cast(v, float) - if v in ["true", "false"]: - val = (v == "true") - - if v == "": - val = None - - rec[k] = val - - return rec + job_id = singer.get_bookmark(state, catalog_entry['tap_stream_id'], 'JobID') + if job_id: + with metrics.record_counter(stream) as counter: + # Resuming a sync should clear out the remaining state once finished + counter = resume_syncing_bulk_query(sf, catalog_entry, job_id, state, counter) + LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter.value) + state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('JobID', None) + state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('BatchIDs', None) + state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('JobHighestBookmarkSeen', None) + else: + # Tables with a replication_key or an empty bookmark will emit an + # activate_version at the beginning of their sync + bookmark_is_empty = state.get('bookmarks', {}).get( + catalog_entry['tap_stream_id']) is None + + if replication_key or bookmark_is_empty: + singer.write_message(activate_version_message) + state = singer.write_bookmark(state, + catalog_entry['tap_stream_id'], + 'version', + stream_version) + counter = sync_stream(sf, catalog_entry, state) + LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter.value) + state["current_stream"] = None + singer.write_state(state) + LOGGER.info("Finished sync") def main_impl(): args = singer_utils.parse_args(REQUIRED_CONFIG_KEYS) @@ -435,10 +346,9 @@ def main_impl(): if args.discover: do_discover(sf) elif args.properties: - start_time = singer_utils.now() catalog = args.properties state = build_state(args.state, catalog) - do_sync(sf, catalog, state, start_time) + do_sync(sf, catalog, state) finally: if sf: if sf.rest_requests_attempted > 0: diff --git a/tap_salesforce/salesforce/bulk.py b/tap_salesforce/salesforce/bulk.py index 7213e209..8b0e4e9e 100644 --- a/tap_salesforce/salesforce/bulk.py +++ b/tap_salesforce/salesforce/bulk.py @@ -83,13 +83,21 @@ def _bulk_query(self, catalog_entry, state): # Set pk_chunking to True to indicate that we should write a bookmark differently self.sf.pk_chunking = True + + # Add the bulk Job ID and its batches to the state so it can be resumed if necessary + tap_stream_id = catalog_entry['tap_stream_id'] + state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id) + state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed']) + for completed_batch_id in batch_status['completed']: - for result in self._get_batch_results(job_id, completed_batch_id, catalog_entry): + for result in self.get_batch_results(job_id, completed_batch_id, catalog_entry): yield result + # Remove the completed batch ID + state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"].remove(completed_batch_id) raise TapSalesforceException(batch_status['stateMessage']) else: - for result in self._get_batch_results(job_id, batch_id, catalog_entry): + for result in self.get_batch_results(job_id, batch_id, catalog_entry): yield result def _bulk_query_with_pk_chunking(self, catalog_entry, start_date): @@ -203,7 +211,7 @@ def _get_batch(self, job_id, batch_id): return batch['batchInfo'] - def _get_batch_results(self, job_id, batch_id, catalog_entry): + def get_batch_results(self, job_id, batch_id, catalog_entry): """Given a job_id and batch_id, queries the batches results and reads CSV lines yielding each line as a record.""" headers = self._get_bulk_headers() diff --git a/tap_salesforce/sync.py b/tap_salesforce/sync.py new file mode 100644 index 00000000..b4b46ea5 --- /dev/null +++ b/tap_salesforce/sync.py @@ -0,0 +1,185 @@ +import time +import singer +import singer.metrics as metrics +import singer.utils as singer_utils +from singer import Transformer +from tap_salesforce.salesforce.bulk import Bulk +from tap_salesforce.salesforce.exceptions import TapSalesforceException + +LOGGER = singer.get_logger() + +BLACKLISTED_FIELDS = set(['attributes']) + +def remove_blacklisted_fields(data): + return {k: v for k, v in data.items() if k not in BLACKLISTED_FIELDS} + +# pylint: disable=unused-argument +def transform_bulk_data_hook(data, typ, schema): + result = data + if isinstance(data, dict): + result = remove_blacklisted_fields(data) + + # Salesforce Bulk API returns CSV's with empty strings for text fields. + # When the text field is nillable and the data value is an empty string, + # change the data so that it is None. + if data == "" and "null" in schema['type']: + result = None + + return result + +def get_stream_version(catalog_entry, state): + tap_stream_id = catalog_entry['tap_stream_id'] + replication_key = catalog_entry.get('replication_key') + + stream_version = (singer.get_bookmark(state, tap_stream_id, 'version') or + int(time.time() * 1000)) + if replication_key: + return stream_version + return int(time.time() * 1000) + +def resume_syncing_bulk_query(sf, catalog_entry, job_id, state, counter): + bulk = Bulk(sf) + current_bookmark = singer.get_bookmark(state, catalog_entry['tap_stream_id'], 'JobHighestBookmarkSeen') or sf.get_start_date(state, catalog_entry) + current_bookmark = singer_utils.strptime_with_tz(current_bookmark) + batch_ids = singer.get_bookmark(state, catalog_entry['tap_stream_id'], 'BatchIDs') + + start_time = singer_utils.now() + stream = catalog_entry['stream'] + stream_alias = catalog_entry.get('stream_alias') + replication_key = catalog_entry.get('replication_key') + stream_version = get_stream_version(catalog_entry, state) + schema = catalog_entry['schema'] + + # Iterate over the remaining batches, removing them once they are synced + with Transformer(pre_hook=transform_bulk_data_hook) as transformer: + for batch_id in batch_ids[:]: + for rec in bulk.get_batch_results(job_id, batch_id, catalog_entry): + counter.increment() + rec = transformer.transform(rec, schema) + rec = fix_record_anytype(rec, schema) + singer.write_message( + singer.RecordMessage( + stream=( + stream_alias or stream), + record=rec, + version=stream_version, + time_extracted=start_time)) + + # Update bookmark if necessary + replication_key_value = replication_key and singer_utils.strptime_with_tz(rec[replication_key]) + if replication_key_value and replication_key_value <= start_time and replication_key_value > current_bookmark: + current_bookmark = singer_utils.strptime_with_tz(rec[replication_key]) + + state = singer.write_bookmark(state, + catalog_entry['tap_stream_id'], + 'JobHighestBookmarkSeen', + singer_utils.strftime(current_bookmark)) + batch_ids.remove(batch_id) + singer.write_state(state) + + return counter + +def sync_stream(sf, catalog_entry, state): + stream = catalog_entry['stream'] + + with metrics.record_counter(stream) as counter: + try: + sync_records(sf, catalog_entry, state, counter) + singer.write_state(state) + except TapSalesforceException as ex: + raise type(ex)("Error syncing {}: {}".format( + stream, ex)) + except Exception as ex: + raise Exception( + "Unexpected error syncing {}: {}".format( + stream, ex)) from ex + + return counter + +def sync_records(sf, catalog_entry, state, counter): + chunked_bookmark = singer_utils.strptime_with_tz(sf.get_start_date(state, catalog_entry)) + stream = catalog_entry['stream'] + schema = catalog_entry['schema'] + stream_alias = catalog_entry.get('stream_alias') + replication_key = catalog_entry.get('replication_key') + stream_version = get_stream_version(catalog_entry, state) + activate_version_message = singer.ActivateVersionMessage(stream=(stream_alias or stream), + version=stream_version) + + start_time = singer_utils.now() + + LOGGER.info('Syncing Salesforce data for stream %s', stream) + with Transformer(pre_hook=transform_bulk_data_hook) as transformer: + for rec in sf.query(catalog_entry, state): + counter.increment() + rec = transformer.transform(rec, schema) + rec = fix_record_anytype(rec, schema) + singer.write_message( + singer.RecordMessage( + stream=( + stream_alias or stream), + record=rec, + version=stream_version, + time_extracted=start_time)) + + replication_key_value = replication_key and singer_utils.strptime_with_tz(rec[replication_key]) + + if sf.pk_chunking: + if replication_key_value and replication_key_value <= start_time and replication_key_value > chunked_bookmark: + # Replace the highest seen bookmark and save the state in case we need to resume later + chunked_bookmark = singer_utils.strptime_with_tz(rec[replication_key]) + state = singer.write_bookmark( + state, + catalog_entry['tap_stream_id'], + 'JobHighestBookmarkSeen', + singer_utils.strptime(chunked_bookmark)) + singer.write_state(state) + # Before writing a bookmark, make sure Salesforce has not given us a + # record with one outside our range + elif replication_key_value and replication_key_value <= start_time: + state = singer.write_bookmark( + state, + catalog_entry['tap_stream_id'], + replication_key, + rec[replication_key]) + singer.write_state(state) + + # Tables with no replication_key will send an + # activate_version message for the next sync + if not replication_key: + singer.write_message(activate_version_message) + state = singer.write_bookmark( + state, catalog_entry['tap_stream_id'], 'version', None) + + # If pk_chunking is set, only write a bookmark at the end + if sf.pk_chunking: + # Write a bookmark with the highest value we've seen + state = singer.write_bookmark( + state, + catalog_entry['tap_stream_id'], + replication_key, + singer_utils.strptime(chunked_bookmark)) + +def fix_record_anytype(rec, schema): + """Modifies a record when the schema has no 'type' element due to a SF type of 'anyType.' + Attempts to set the record's value for that element to an int, float, or string.""" + def try_cast(val, coercion): + try: + return coercion(val) + except BaseException: + return val + + for k, v in rec.items(): + if schema['properties'][k].get("type") is None: + val = v + val = try_cast(v, int) + val = try_cast(v, float) + if v in ["true", "false"]: + val = (v == "true") + + if v == "": + val = None + + rec[k] = val + + return rec