diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index 3ac8ebba..44afa108 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -14,7 +14,7 @@ from psycopg2 import errors from six.moves.urllib.parse import urlsplit import requests -from rq import get_current_job +from rq import get_current_job, timeouts as rq_timeouts import sqlalchemy as sa from ckan import model @@ -245,10 +245,21 @@ def tabulator_load(): logger.info("'use_type_guessing' mode is: %s", use_type_guessing) try: if use_type_guessing: - tabulator_load() + try: + tabulator_load() + except rq_timeouts.JobTimeoutException as e: + tmp_file.close() + timeout = config.get('ckanext.xloader.job_timeout', '3600') + logger.warning('Job time out after %ss', timeout) + raise JobError('Job timed out after {}s'.format(timeout)) else: try: direct_load() + except rq_timeouts.JobTimeoutException as e: + tmp_file.close() + timeout = config.get('ckanext.xloader.job_timeout', '3600') + logger.warning('Job time out after %ss', timeout) + raise JobError('Job timed out after {}s'.format(timeout)) except JobError as e: logger.warning('Load using COPY failed: %s', e) logger.info('Trying again with tabulator') @@ -351,6 +362,7 @@ def _download_resource_data(resource, data, api_key, logger): response.close() data['datastore_contains_all_records_of_source_file'] = False except requests.exceptions.HTTPError as error: + tmp_file.close() # status code error logger.debug('HTTP error: %s', error) raise HTTPError( @@ -362,6 +374,7 @@ def _download_resource_data(resource, data, api_key, logger): raise JobError('Connection timed out after {}s'.format( DOWNLOAD_TIMEOUT)) except requests.exceptions.RequestException as e: + tmp_file.close() try: err_message = str(e.reason) except AttributeError: @@ -370,6 +383,11 @@ def _download_resource_data(resource, data, api_key, logger): raise HTTPError( message=err_message, status_code=None, request_url=url, response=None) + except rq_timeouts.JobTimeoutException as e: + tmp_file.close() + timeout = config.get('ckanext.xloader.job_timeout', '3600') + logger.warning('Job time out after %ss', timeout) + raise JobError('Job timed out after {}s'.format(timeout)) logger.info('Downloaded ok - %s', printable_file_size(length)) file_hash = m.hexdigest() diff --git a/ckanext/xloader/tests/test_jobs.py b/ckanext/xloader/tests/test_jobs.py index e819dad9..795c36a8 100644 --- a/ckanext/xloader/tests/test_jobs.py +++ b/ckanext/xloader/tests/test_jobs.py @@ -1,5 +1,6 @@ import pytest import io +import os from datetime import datetime @@ -16,6 +17,7 @@ _TEST_FILE_CONTENT = "x, y\n1,2\n2,4\n3,6\n4,8\n5,10" +_TEST_LARGE_FILE_CONTENT = "\n1,2\n2,4\n3,6\n4,8\n5,10" def get_response(download_url, headers): @@ -25,7 +27,6 @@ def get_response(download_url, headers): resp.headers = headers return resp - def get_large_response(download_url, headers): """Mock jobs.get_response() method to fake a large file.""" resp = Response() @@ -33,6 +34,17 @@ def get_large_response(download_url, headers): resp.headers = {'content-length': 2000000000} return resp +def get_large_data_response(download_url, headers): + """Mock jobs.get_response() method.""" + resp = Response() + f_content = _TEST_FILE_CONTENT + (_TEST_LARGE_FILE_CONTENT * 500000) + resp.raw = io.BytesIO(f_content.encode()) + resp.headers = headers + return resp + +def _get_temp_files(): + return [os.path.join('/tmp', f) for f in os.listdir('/tmp') if os.path.isfile(os.path.join('/tmp', f))] + @pytest.fixture def apikey(): @@ -74,6 +86,8 @@ def data(create_with_upload, apikey): @pytest.mark.usefixtures("clean_db", "with_plugins") +@pytest.mark.ckan_config("ckanext.xloader.job_timeout", 15) +@pytest.mark.ckan_config("ckan.jobs.timeout", 15) class TestXLoaderJobs(helpers.FunctionalRQTestBase): def test_xloader_data_into_datastore(self, cli, data): @@ -123,6 +137,17 @@ def test_data_max_excerpt_lines_config(self, cli, data): resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) assert resource["datastore_contains_all_records_of_source_file"] is False + def test_data_with_rq_job_timeout(self, cli, data): + for f in _get_temp_files(): + os.remove(f) + assert len(_get_temp_files()) == 0 + self.enqueue(jobs.xloader_data_into_datastore, [data], rq_kwargs=dict(timeout=15)) + with mock.patch("ckanext.xloader.jobs.get_response", get_large_data_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Job timed out after" in stdout + assert len(_get_temp_files()) == 0 + + @pytest.mark.usefixtures("clean_db") class TestSetResourceMetadata(object):