Skip to content

Commit

Permalink
ensure that 'datastore_active' matches the actual datastore state, ck…
Browse files Browse the repository at this point in the history
…an#161, QOL-9327

- On resource update, compare the flag to the actual state of the datastore, and update the flag if they don't match
  • Loading branch information
ThrawnCA committed Sep 6, 2022
1 parent ed5c271 commit d821a54
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 80 deletions.
64 changes: 7 additions & 57 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
from rq import get_current_job
import sqlalchemy as sa

import ckan.model as model
from ckan import model
from ckan.plugins.toolkit import get_action, asbool, ObjectNotFound, config, check_ckan_version
import ckan.lib.search as search

from . import loader
from . import db
from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError
from .utils import set_resource_metadata

try:
from ckan.lib.api_token import get_user_from_token
Expand Down Expand Up @@ -380,9 +380,12 @@ def set_datastore_active(data, resource, logger):

data['datastore_active'] = True
logger.info('Setting resource.datastore_active = True')
contains_all_records = data.get(
'datastore_contains_all_records_of_source_file', True)
data['datastore_contains_all_records_of_source_file'] = contains_all_records
logger.info(
'Setting resource.datastore_contains_all_records_of_source_file = {}'
.format(data.get('datastore_contains_all_records_of_source_file')))
'Setting resource.datastore_contains_all_records_of_source_file = %s',
contains_all_records)
set_resource_metadata(update_dict=data)


Expand Down Expand Up @@ -414,59 +417,6 @@ def callback_xloader_hook(result_url, api_key, job_dict):
return result.status_code == requests.codes.ok


def set_resource_metadata(update_dict):
'''
Set appropriate datastore_active flag on CKAN resource.
Called after creation or deletion of DataStore table.
'''
from ckan import model
# We're modifying the resource extra directly here to avoid a
# race condition, see issue #3245 for details and plan for a
# better fix
update_dict.update({
'datastore_active': update_dict.get('datastore_active', True),
'datastore_contains_all_records_of_source_file':
update_dict.get('datastore_contains_all_records_of_source_file', True)
})

q = model.Session.query(model.Resource). \
filter(model.Resource.id == update_dict['resource_id'])
resource = q.one()

# update extras in database for record
extras = resource.extras
extras.update(update_dict)
q.update({'extras': extras}, synchronize_session=False)

# TODO: Remove resource_revision_table when dropping support for 2.8
if hasattr(model, 'resource_revision_table'):
model.Session.query(model.resource_revision_table).filter(
model.ResourceRevision.id == update_dict['resource_id'],
model.ResourceRevision.current is True
).update({'extras': extras}, synchronize_session=False)
model.Session.commit()

# get package with updated resource from solr
# find changed resource, patch it and reindex package
psi = search.PackageSearchIndex()
solr_query = search.PackageSearchQuery()
q = {
'q': 'id:"{0}"'.format(resource.package_id),
'fl': 'data_dict',
'wt': 'json',
'fq': 'site_id:"%s"' % config.get('ckan.site_id'),
'rows': 1
}
for record in solr_query.run(q)['results']:
solr_data_dict = json.loads(record['data_dict'])
for resource in solr_data_dict['resources']:
if resource['id'] == update_dict['resource_id']:
resource.update(update_dict)
psi.index_package(solr_data_dict)
break


def validate_input(input):
# Especially validate metadata which is provided by the user
if 'metadata' not in input:
Expand Down
62 changes: 39 additions & 23 deletions ckanext/xloader/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@

import logging

from ckan.plugins.toolkit import config
import ckan.plugins as plugins
import ckan.plugins.toolkit as toolkit
from ckan import plugins
from ckan.plugins import toolkit

from ckanext.xloader import action, auth
import ckanext.xloader.helpers as xloader_helpers
from ckanext.xloader.loader import fulltext_function_exists, get_write_engine
from . import action, auth, helpers as xloader_helpers, utils
from .loader import fulltext_function_exists, get_write_engine

log = logging.getLogger(__name__)

Expand All @@ -33,7 +31,7 @@ class XLoaderFormats(object):
@classmethod
def is_it_an_xloader_format(cls, format_):
if cls.formats is None:
cls._formats = config.get("ckanext.xloader.formats")
cls._formats = toolkit.config.get("ckanext.xloader.formats")
if cls._formats is not None:
cls._formats = cls._formats.lower().split()
else:
Expand Down Expand Up @@ -135,29 +133,47 @@ def notify(self, resource):
self._submit_to_xloader(resource_dict)

# IResourceController
if toolkit.check_ckan_version("2.10"):

def after_resource_create(self, context, resource_dict):
self._submit_to_xloader(resource_dict)
def after_resource_create(self, context, resource_dict):
self._submit_to_xloader(resource_dict)

def before_resource_show(self, resource_dict):
resource_dict[
"datastore_contains_all_records_of_source_file"
] = toolkit.asbool(
resource_dict.get("datastore_contains_all_records_of_source_file")
)
def before_resource_show(self, resource_dict):
resource_dict[
"datastore_contains_all_records_of_source_file"
] = toolkit.asbool(
resource_dict.get("datastore_contains_all_records_of_source_file")
)

else:
def after_resource_update(self, context, resource_dict):
""" Check whether the datastore is out of sync with the
'datastore_active' flag. This can occur due to race conditions
like https://github.com/ckan/ckan/issues/4663
"""
datastore_active = resource_dict.get('datastore_active', False)
try:
context = {'ignore_auth': True}
if toolkit.get_action('datastore_info')(
context=context, data_dict={'id': resource_dict['id']}):
datastore_exists = True
except toolkit.ObjectNotFound:
datastore_exists = False

if datastore_active != datastore_exists:
# datastore does exist; update flag
utils.set_resource_metadata(
{'resource_id': resource_dict['id'],
'datastore_active': datastore_exists})

if not toolkit.check_ckan_version("2.10"):

def after_create(self, context, resource_dict):
self._submit_to_xloader(resource_dict)
self.after_resource_create(context, resource_dict)

def before_show(self, resource_dict):
resource_dict[
"datastore_contains_all_records_of_source_file"
] = toolkit.asbool(
resource_dict.get("datastore_contains_all_records_of_source_file")
)
self.before_resource_show(resource_dict)

def after_update(self, context, resource_dict):
self.after_resource_update(context, resource_dict)

def _submit_to_xloader(self, resource_dict):
context = {"ignore_auth": True, "defer_commit": True}
Expand Down
53 changes: 53 additions & 0 deletions ckanext/xloader/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# encoding: utf-8

import json

from ckan import model
from ckan.lib import search
import ckan.plugins as p


Expand Down Expand Up @@ -57,3 +63,50 @@ def get_xloader_user_apitoken():

site_user = p.toolkit.get_action('get_site_user')({'ignore_auth': True}, {})
return site_user["apikey"]


def set_resource_metadata(update_dict):
'''
Set appropriate datastore_active flag on CKAN resource.
Called after creation or deletion of DataStore table.
'''
# We're modifying the resource extra directly here to avoid a
# race condition, see issue #3245 for details and plan for a
# better fix

q = model.Session.query(model.Resource). \
filter(model.Resource.id == update_dict['resource_id'])
resource = q.one()

# update extras in database for record
extras = resource.extras
extras.update(update_dict)
q.update({'extras': extras}, synchronize_session=False)

# TODO: Remove resource_revision_table when dropping support for 2.8
if hasattr(model, 'resource_revision_table'):
model.Session.query(model.resource_revision_table).filter(
model.ResourceRevision.id == update_dict['resource_id'],
model.ResourceRevision.current is True
).update({'extras': extras}, synchronize_session=False)
model.Session.commit()

# get package with updated resource from solr
# find changed resource, patch it and reindex package
psi = search.PackageSearchIndex()
solr_query = search.PackageSearchQuery()
q = {
'q': 'id:"{0}"'.format(resource.package_id),
'fl': 'data_dict',
'wt': 'json',
'fq': 'site_id:"%s"' % p.toolkit.config.get('ckan.site_id'),
'rows': 1
}
for record in solr_query.run(q)['results']:
solr_data_dict = json.loads(record['data_dict'])
for resource in solr_data_dict['resources']:
if resource['id'] == update_dict['resource_id']:
resource.update(update_dict)
psi.index_package(solr_data_dict)
break

0 comments on commit d821a54

Please sign in to comment.