Skip to content

Commit

Permalink
Merge pull request #205 from ckan/github-201-deadlocks
Browse files Browse the repository at this point in the history
Make locking behaviour more robust
  • Loading branch information
ThrawnCA authored Feb 1, 2024
2 parents 3ae7ae3 + e6687a2 commit 58be9be
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 41 deletions.
30 changes: 25 additions & 5 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@
import traceback
import sys

from psycopg2 import errors
from six.moves.urllib.parse import urlsplit
import requests
from rq import get_current_job
import sqlalchemy as sa

from ckan import model
from ckan.plugins.toolkit import get_action, asbool, ObjectNotFound, config
from ckan.plugins.toolkit import get_action, asbool, enqueue_job, ObjectNotFound, config

from . import loader
from . import db
from . import db, loader
from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError
from .utils import set_resource_metadata

Expand All @@ -28,6 +28,8 @@
except ImportError:
get_user_from_token = None

log = logging.getLogger(__name__)

SSL_VERIFY = asbool(config.get('ckanext.xloader.ssl_verify', True))
if not SSL_VERIFY:
requests.packages.urllib3.disable_warnings()
Expand All @@ -37,6 +39,13 @@
CHUNK_SIZE = 16 * 1024 # 16kb
DOWNLOAD_TIMEOUT = 30

RETRYABLE_ERRORS = (
errors.DeadlockDetected,
errors.LockNotAvailable,
errors.ObjectInUse,
)
RETRIED_JOB_TIMEOUT = config.get('ckanext.xloader.job_timeout', '3600')


# input = {
# 'api_key': user['apikey'],
Expand Down Expand Up @@ -80,15 +89,26 @@ def xloader_data_into_datastore(input):
db.mark_job_as_errored(job_id, str(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log = logging.getLogger(__name__)
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
errored = True
except Exception as e:
if isinstance(e, RETRYABLE_ERRORS):
tries = job_dict['metadata'].get('tries', 0)
if tries == 0:
log.info("Job %s failed due to temporary error [%s], retrying", job_id, e)
job_dict['status'] = 'pending'
job_dict['metadata']['tries'] = tries + 1
enqueue_job(
xloader_data_into_datastore,
[input],
rq_kwargs=dict(timeout=RETRIED_JOB_TIMEOUT)
)
return None

db.mark_job_as_errored(
job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log = logging.getLogger(__name__)
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
errored = True
finally:
Expand Down
127 changes: 91 additions & 36 deletions ckanext/xloader/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,49 @@
tabulator_config.CSV_SAMPLE_LINES = CSV_SAMPLE_LINES


def _fields_match(fields, existing_fields, logger):
''' Check whether all columns have the same names and types as previously,
independent of ordering.
'''
# drop the generated '_id' field
for index in range(len(existing_fields)):
if existing_fields[index]['id'] == '_id':
existing_fields.pop(index)
break

# fail fast if number of fields doesn't match
field_count = len(fields)
if field_count != len(existing_fields):
logger.info("Fields do not match; there are now %s fields but previously %s", field_count, len(existing_fields))
return False

# ensure each field is present in both collections with the same type
for index in range(field_count):
field_id = fields[index]['id']
for existing_index in range(field_count):
existing_field_id = existing_fields[existing_index]['id']
if field_id == existing_field_id:
if fields[index]['type'] == existing_fields[existing_index]['type']:
break
else:
logger.info("Fields do not match; new type for %s field is %s but existing type is %s",
field_id, fields[index]["type"], existing_fields[existing_index]['type'])
return False
else:
logger.info("Fields do not match; no existing entry found for %s", field_id)
return False
return True


def _clear_datastore_resource(resource_id):
''' Delete all records from the datastore table, without dropping the table itself.
'''
engine = get_write_engine()
with engine.begin() as conn:
conn.execute("SET LOCAL lock_timeout = '5s'")
conn.execute('TRUNCATE TABLE "{}"'.format(resource_id))


def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
'''Loads a CSV into DataStore. Does not create the indexes.'''

Expand Down Expand Up @@ -85,34 +128,43 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
existing = datastore_resource_exists(resource_id)
existing_info = {}
if existing:
existing_fields = existing.get('fields', [])
existing_info = dict((f['id'], f['info'])
for f in existing.get('fields', [])
for f in existing_fields
if 'info' in f)

'''
Delete existing datastore table before proceeding. Otherwise
the COPY will append to the existing table. And if
the fields have significantly changed, it may also fail.
'''
logger.info('Deleting "{res_id}" from DataStore.'.format(
res_id=resource_id))
delete_datastore_resource(resource_id)

# Columns types are either set (overridden) in the Data Dictionary page
# or default to text type (which is robust)
fields = [
{'id': header_name,
'type': existing_info.get(header_name, {})
.get('type_override') or 'text',
}
for header_name in headers]
# Column types are either set (overridden) in the Data Dictionary page
# or default to text type (which is robust)
fields = [
{'id': header_name,
'type': existing_info.get(header_name, {})
.get('type_override') or 'text',
}
for header_name in headers]

# Maintain data dictionaries from matching column names
if existing_info:
# Maintain data dictionaries from matching column names
for f in fields:
if f['id'] in existing_info:
f['info'] = existing_info[f['id']]

'''
Delete or truncate existing datastore table before proceeding,
depending on whether any fields have changed.
Otherwise the COPY will append to the existing table.
And if the fields have significantly changed, it may also fail.
'''
if _fields_match(fields, existing_fields, logger):
logger.info('Clearing records for "%s" from DataStore.', resource_id)
_clear_datastore_resource(resource_id)
else:
logger.info('Deleting "%s" from DataStore.', resource_id)
delete_datastore_resource(resource_id)
else:
fields = [
{'id': header_name,
'type': 'text'}
for header_name in headers]

logger.info('Fields: %s', fields)

# Create table
Expand Down Expand Up @@ -254,9 +306,10 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None):
existing = datastore_resource_exists(resource_id)
existing_info = None
if existing:
existing_fields = existing.get('fields', [])
existing_info = dict(
(f['id'], f['info'])
for f in existing.get('fields', []) if 'info' in f)
for f in existing_fields if 'info' in f)

# Some headers might have been converted from strings to floats and such.
headers = encode_headers(headers)
Expand Down Expand Up @@ -290,16 +343,6 @@ def row_iterator():
yield data_row
result = row_iterator()

'''
Delete existing datstore resource before proceeding. Otherwise
'datastore_create' will append to the existing datastore. And if
the fields have significantly changed, it may also fail.
'''
if existing:
logger.info('Deleting "{res_id}" from datastore.'.format(
res_id=resource_id))
delete_datastore_resource(resource_id)

headers_dicts = [dict(id=field[0], type=TYPE_MAPPING[str(field[1])])
for field in zip(headers, types)]

Expand All @@ -313,8 +356,21 @@ def row_iterator():
if type_override in list(_TYPE_MAPPING.values()):
h['type'] = type_override

logger.info('Determined headers and types: {headers}'.format(
headers=headers_dicts))
logger.info('Determined headers and types: %s', headers_dicts)

'''
Delete or truncate existing datastore table before proceeding,
depending on whether any fields have changed.
Otherwise 'datastore_create' will append to the existing datastore.
And if the fields have significantly changed, it may also fail.
'''
if existing:
if _fields_match(headers_dicts, existing_fields, logger):
logger.info('Clearing records for "%s" from DataStore.', resource_id)
_clear_datastore_resource(resource_id)
else:
logger.info('Deleting "%s" from datastore.', resource_id)
delete_datastore_resource(resource_id)

logger.info('Copying to database...')
count = 0
Expand All @@ -323,7 +379,7 @@ def row_iterator():
non_empty_types = ['timestamp', 'numeric']
for i, records in enumerate(chunky(result, 250)):
count += len(records)
logger.info('Saving chunk {number}'.format(number=i))
logger.info('Saving chunk %s', i)
for row in records:
for column_index, column_name in enumerate(row):
if headers_dicts[column_index]['type'] in non_empty_types and row[column_name] == '':
Expand All @@ -332,8 +388,7 @@ def row_iterator():
logger.info('...copying done')

if count:
logger.info('Successfully pushed {n} entries to "{res_id}".'.format(
n=count, res_id=resource_id))
logger.info('Successfully pushed %s entries to "%s".', count, resource_id)
else:
# no datastore table is created
raise LoaderError('No entries found - nothing to load')
Expand Down
1 change: 1 addition & 0 deletions ckanext/xloader/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def set_resource_metadata(update_dict):
# better fix

q = model.Session.query(model.Resource). \
with_for_update(of=model.Resource). \
filter(model.Resource.id == update_dict['resource_id'])
resource = q.one()

Expand Down

0 comments on commit 58be9be

Please sign in to comment.