Skip to content

Commit

Permalink
Merge pull request #239 from JVickery-TBS/squash/rq-timeouts
Browse files Browse the repository at this point in the history
RQ Job Timeout Handling
  • Loading branch information
duttonw authored Dec 9, 2024
2 parents e104e5b + 5cb3425 commit a85bc19
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 4 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ Feat:
* Adds Strip White Space fields to the Data Dictionary (defualts to `True` for each field). This will strip surrounding white space from data values prior to inserting them into the database.
* Adds support for ckanext-validation. Config `ckanext.xloader.validation.requires_successful_report` controls whether a resource requires a successful validation report to be XLoadered. By default, a resource would also require a Validation Schema, which can be turned off with `ckanext.xloader.validation.enforce_schema`.

Fix:
* Properly handle REDIS queue timeouts to close/delete any temporary files.


**Full Changelog**: https://github.com/ckan/ckanext-xloader/compare/1.1.2...1.1.3

Expand Down
14 changes: 14 additions & 0 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from six.moves.urllib.parse import urlsplit
import requests
from rq import get_current_job
from rq.timeouts import JobTimeoutException
import sqlalchemy as sa

from ckan import model
Expand Down Expand Up @@ -259,6 +260,13 @@ def tabulator_load():
logger.warning('Load using COPY failed: %s', e)
logger.info('Trying again with tabulator')
tabulator_load()
except JobTimeoutException as e:
try:
tmp_file.close()
except FileNotFoundError:
pass
logger.warning('Job timed out after %ss', RETRIED_JOB_TIMEOUT)
raise JobError('Job timed out after {}s'.format(RETRIED_JOB_TIMEOUT))
except FileCouldNotBeLoadedError as e:
logger.warning('Loading excerpt for this format not supported.')
logger.error('Loading file raised an error: %s', e)
Expand Down Expand Up @@ -357,6 +365,7 @@ def _download_resource_data(resource, data, api_key, logger):
response.close()
data['datastore_contains_all_records_of_source_file'] = False
except requests.exceptions.HTTPError as error:
tmp_file.close()
# status code error
logger.debug('HTTP error: %s', error)
raise HTTPError(
Expand All @@ -368,6 +377,7 @@ def _download_resource_data(resource, data, api_key, logger):
raise JobError('Connection timed out after {}s'.format(
DOWNLOAD_TIMEOUT))
except requests.exceptions.RequestException as e:
tmp_file.close()
try:
err_message = str(e.reason)
except AttributeError:
Expand All @@ -376,6 +386,10 @@ def _download_resource_data(resource, data, api_key, logger):
raise HTTPError(
message=err_message, status_code=None,
request_url=url, response=None)
except JobTimeoutException as e:
tmp_file.close()
logger.warning('Job timed out after %ss', RETRIED_JOB_TIMEOUT)
raise JobError('Job timed out after {}s'.format(RETRIED_JOB_TIMEOUT))

logger.info('Downloaded ok - %s', printable_file_size(length))
file_hash = m.hexdigest()
Expand Down
26 changes: 25 additions & 1 deletion ckanext/xloader/tests/test_jobs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest
import io
import os

from datetime import datetime

Expand All @@ -16,6 +17,7 @@


_TEST_FILE_CONTENT = "x, y\n1,2\n2,4\n3,6\n4,8\n5,10"
_TEST_LARGE_FILE_CONTENT = "\n1,2\n2,4\n3,6\n4,8\n5,10"


def get_response(download_url, headers):
Expand All @@ -25,14 +27,24 @@ def get_response(download_url, headers):
resp.headers = headers
return resp


def get_large_response(download_url, headers):
"""Mock jobs.get_response() method to fake a large file."""
resp = Response()
resp.raw = io.BytesIO(_TEST_FILE_CONTENT.encode())
resp.headers = {'content-length': 2000000000}
return resp

def get_large_data_response(download_url, headers):
"""Mock jobs.get_response() method."""
resp = Response()
f_content = _TEST_FILE_CONTENT + (_TEST_LARGE_FILE_CONTENT * 500000)
resp.raw = io.BytesIO(f_content.encode())
resp.headers = headers
return resp

def _get_temp_files(dir='/tmp'):
return [os.path.join(dir, f) for f in os.listdir(dir) if os.path.isfile(os.path.join(dir, f))]


@pytest.fixture
def apikey():
Expand Down Expand Up @@ -74,6 +86,8 @@ def data(create_with_upload, apikey):


@pytest.mark.usefixtures("clean_db", "with_plugins")
@pytest.mark.ckan_config("ckanext.xloader.job_timeout", 2)
@pytest.mark.ckan_config("ckan.jobs.timeout", 2)
class TestXLoaderJobs(helpers.FunctionalRQTestBase):

def test_xloader_data_into_datastore(self, cli, data):
Expand Down Expand Up @@ -123,6 +137,16 @@ def test_data_max_excerpt_lines_config(self, cli, data):
resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"])
assert resource["datastore_contains_all_records_of_source_file"] is False

def test_data_with_rq_job_timeout(self, cli, data):
file_suffix = 'multiplication_2.csv'
self.enqueue(jobs.xloader_data_into_datastore, [data], rq_kwargs=dict(timeout=2))
with mock.patch("ckanext.xloader.jobs.get_response", get_large_data_response):
stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output
assert "Job timed out after" in stdout
for f in _get_temp_files():
# make sure that the tmp file has been closed/deleted in job timeout exception handling
assert file_suffix not in f


@pytest.mark.usefixtures("clean_db")
class TestSetResourceMetadata(object):
Expand Down
6 changes: 3 additions & 3 deletions test.ini
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ handlers = console

[logger_ckan]
qualname = ckan
handlers =
handlers = console
level = INFO

[logger_ckanext_xloader]
qualname = ckanext.xloader
handlers =
level = DEBUG
handlers = console
level = WARN

[logger_sqlalchemy]
handlers =
Expand Down

0 comments on commit a85bc19

Please sign in to comment.