Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to resume bulk jobs #22

Merged
merged 8 commits into from
Dec 19, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 66 additions & 156 deletions tap_salesforce/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
#!/usr/bin/env python3
import json
import sys
import time
import singer
import singer.metrics as metrics
import singer.utils as singer_utils
from singer import (metadata,
transform,
UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING,
Transformer, _transform_datetime)
from singer import metadata

import tap_salesforce.salesforce
from tap_salesforce.sync import (sync_stream, resume_syncing_bulk_query, get_stream_version)
from tap_salesforce.salesforce import Salesforce
from tap_salesforce.salesforce.exceptions import (
TapSalesforceException, TapSalesforceQuotaExceededException)
Expand All @@ -31,9 +28,6 @@
'start_date': None
}

BLACKLISTED_FIELDS = set(['attributes'])


def get_replication_key(sobject_name, fields):
fields_list = [f['name'] for f in fields]

Expand All @@ -47,11 +41,9 @@ def get_replication_key(sobject_name, fields):
return 'LoginTime'
return None


def stream_is_selected(mdata):
return mdata.get((), {}).get('selected', False)


def build_state(raw_state, catalog):
state = {}

Expand All @@ -63,6 +55,15 @@ def build_state(raw_state, catalog):
tap_stream_id,
'version')

# Preserve state that deals with resuming an incomplete bulk job
if singer.get_bookmark(raw_state, tap_stream_id, 'JobID'):
job_id = singer.get_bookmark(raw_state, tap_stream_id, 'JobID')
batches = singer.get_bookmark(raw_state, tap_stream_id, 'BatchIDs')
current_bookmark = singer.get_bookmark(raw_state, tap_stream_id, 'JobHighestSystemModstamp')
state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id)
state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batches)
state = singer.write_bookmark(state, tap_stream_id, 'JobHighestSystemModstamp', current_bookmark)

if replication_method == 'INCREMENTAL':
replication_key = catalog_entry.get('replication_key')
replication_key_value = singer.get_bookmark(raw_state,
Expand Down Expand Up @@ -255,164 +256,74 @@ def do_discover(sf):
result = {'streams': entries}
json.dump(result, sys.stdout, indent=4)

def do_sync(sf, catalog, state):
starting_stream = state.get("current_stream")

def remove_blacklisted_fields(data):
return {k: v for k, v in data.items() if k not in BLACKLISTED_FIELDS}

# pylint: disable=unused-argument
def transform_bulk_data_hook(data, typ, schema):
result = data
if isinstance(data, dict):
result = remove_blacklisted_fields(data)

# Salesforce Bulk API returns CSV's with empty strings for text fields.
# When the text field is nillable and the data value is an empty string,
# change the data so that it is None.
if data == "" and "null" in schema['type']:
result = None

return result


def get_stream_version(catalog_entry, state):
tap_stream_id = catalog_entry['tap_stream_id']
replication_key = catalog_entry.get('replication_key')

stream_version = (singer.get_bookmark(state, tap_stream_id, 'version') or
int(time.time() * 1000))

if replication_key:
return stream_version

return int(time.time() * 1000)


def do_sync(sf, catalog, state, start_time):
for catalog_entry in catalog['streams']:
mdata = metadata.to_map(catalog_entry['metadata'])
is_selected = stream_is_selected(mdata)

if not is_selected:
continue
if starting_stream:
LOGGER.info("Resuming sync from %s", starting_stream)
else:
LOGGER.info("Starting sync")

for catalog_entry in catalog["streams"]:
stream_version = get_stream_version(catalog_entry, state)
stream = catalog_entry['stream']
schema = catalog_entry['schema']
stream_alias = catalog_entry.get('stream_alias')

replication_key = catalog_entry.get('replication_key')

bookmark_is_empty = state.get(
'bookmarks', {}).get(
catalog_entry['tap_stream_id']) is None
stream_version = get_stream_version(catalog_entry, state)
stream_name = catalog_entry["tap_stream_id"]
activate_version_message = singer.ActivateVersionMessage(
stream=(stream_alias or stream), version=stream_version)
replication_key = catalog_entry.get('replication_key')

LOGGER.info('Syncing Salesforce data for stream %s', stream)
mdata = metadata.to_map(catalog_entry['metadata'])
if not stream_is_selected(mdata):
LOGGER.info("%s: Skipping - not selected", stream_name)
continue

if starting_stream:
if starting_stream == stream_name:
LOGGER.info("%s: Resuming", stream_name)
starting_stream = None
else:
LOGGER.info("%s: Skipping - already synced", stream_name)
continue
else:
LOGGER.info("%s: Starting", stream_name)

state["current_stream"] = stream_name
singer.write_state(state)
singer.write_schema(
stream,
schema,
catalog_entry['schema'],
catalog_entry['key_properties'],
replication_key,
stream_alias)

# Tables with a replication_key or an empty bookmark will emit an
# activate_version at the beginning of their sync
if replication_key or bookmark_is_empty:
singer.write_message(activate_version_message)
state = singer.write_bookmark(state,
catalog_entry['tap_stream_id'],
'version',
stream_version)

chunked_bookmark = singer_utils.strptime_with_tz(sf.get_start_date(state, catalog_entry))
with Transformer(pre_hook=transform_bulk_data_hook) as transformer:
with metrics.job_timer('sync_table') as timer:
timer.tags['stream'] = stream

with metrics.record_counter(stream) as counter:
try:
time_extracted = singer_utils.now()

for rec in sf.query(catalog_entry, state):
counter.increment()
rec = transformer.transform(rec, schema)
rec = fix_record_anytype(rec, schema)
singer.write_message(
singer.RecordMessage(
stream=(
stream_alias or stream),
record=rec,
version=stream_version,
time_extracted=time_extracted))

replication_key_value = replication_key and singer_utils.strptime_with_tz(
rec[replication_key])

if sf.pk_chunking:
if replication_key_value and replication_key_value <= start_time and replication_key_value > chunked_bookmark:
chunked_bookmark = singer_utils.strptime_with_tz(rec[replication_key])
# Before writing a bookmark, make sure Salesforce has not given us a
# record with one outside our range
elif replication_key_value and replication_key_value <= start_time:
state = singer.write_bookmark(
state,
catalog_entry['tap_stream_id'],
replication_key,
rec[replication_key])
singer.write_state(state)

# Tables with no replication_key will send an
# activate_version message for the next sync
if not replication_key:
singer.write_message(activate_version_message)
state = singer.write_bookmark(
state, catalog_entry['tap_stream_id'], 'version', None)

# If pk_chunking is set, only write a bookmark at the end
if sf.pk_chunking:
# Write a bookmark with the highest value we've seen
state = singer.write_bookmark(
state,
catalog_entry['tap_stream_id'],
replication_key,
singer_utils.strptime(chunked_bookmark))

singer.write_state(state)

except TapSalesforceException as ex:
raise type(ex)("Error syncing {}: {}".format(
stream, ex))
except Exception as ex:
raise Exception(
"Unexpected error syncing {}: {}".format(
stream, ex)) from ex


def fix_record_anytype(rec, schema):
"""Modifies a record when the schema has no 'type' element due to a SF type of 'anyType.'
Attempts to set the record's value for that element to an int, float, or string."""
def try_cast(val, coercion):
try:
return coercion(val)
except BaseException:
return val

for k, v in rec.items():
if schema['properties'][k].get("type") is None:
val = v
val = try_cast(v, int)
val = try_cast(v, float)
if v in ["true", "false"]:
val = (v == "true")

if v == "":
val = None

rec[k] = val

return rec
job_id = state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).get('JobID', None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same code as get_bookmark()

if job_id:
with metrics.record_counter(stream) as counter:
# Resuming a sync should clear out the remaining state once finished
counter = resume_syncing_bulk_query(sf, catalog_entry, job_id, state, counter)
LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter.value)
state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('JobID', None)
state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('BatchIDs', None)
state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('JobHighestSystemModstamp', None)
else:
# Tables with a replication_key or an empty bookmark will emit an
# activate_version at the beginning of their sync
bookmark_is_empty = state.get('bookmarks', {}).get(
catalog_entry['tap_stream_id']) is None

if replication_key or bookmark_is_empty:
singer.write_message(activate_version_message)
state = singer.write_bookmark(state,
catalog_entry['tap_stream_id'],
'version',
stream_version)
counter = sync_stream(sf, catalog_entry, state)
LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter.value)

state["current_stream"] = None
singer.write_state(state)
LOGGER.info("Finished sync")

def main_impl():
args = singer_utils.parse_args(REQUIRED_CONFIG_KEYS)
Expand All @@ -435,10 +346,9 @@ def main_impl():
if args.discover:
do_discover(sf)
elif args.properties:
start_time = singer_utils.now()
catalog = args.properties
state = build_state(args.state, catalog)
do_sync(sf, catalog, state, start_time)
do_sync(sf, catalog, state)
finally:
if sf:
if sf.rest_requests_attempted > 0:
Expand Down
13 changes: 10 additions & 3 deletions tap_salesforce/salesforce/bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,20 @@ def _bulk_query(self, catalog_entry, state):

# Set pk_chunking to True to indicate that we should write a bookmark differently
self.sf.pk_chunking = True

# Add the bulk Job ID and its batches to the state so it can be resumed if necessary
state['bookmarks'][catalog_entry['tap_stream_id']]["JobID"] = job_id
state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"] = batch_status['completed']

for completed_batch_id in batch_status['completed']:
for result in self._get_batch_results(job_id, completed_batch_id, catalog_entry):
for result in self.get_batch_results(job_id, completed_batch_id, catalog_entry):
yield result
# Remove the completed batch ID
state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"].remove(completed_batch_id)

raise TapSalesforceException(batch_status['stateMessage'])
else:
for result in self._get_batch_results(job_id, batch_id, catalog_entry):
for result in self.get_batch_results(job_id, batch_id, catalog_entry):
yield result

def _bulk_query_with_pk_chunking(self, catalog_entry, start_date):
Expand Down Expand Up @@ -203,7 +210,7 @@ def _get_batch(self, job_id, batch_id):

return batch['batchInfo']

def _get_batch_results(self, job_id, batch_id, catalog_entry):
def get_batch_results(self, job_id, batch_id, catalog_entry):
"""Given a job_id and batch_id, queries the batches results and reads
CSV lines yielding each line as a record."""
headers = self._get_bulk_headers()
Expand Down
Loading