From 5cb342557a06301f49793696bcb32681d8198794 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Mon, 9 Dec 2024 23:05:17 +0000 Subject: [PATCH] squash(pr): 223; - Squash of https://github.com/ckan/ckanext-xloader/pull/223 --- CHANGELOG | 3 +++ ckanext/xloader/jobs.py | 14 ++++++++++++++ ckanext/xloader/tests/test_jobs.py | 26 +++++++++++++++++++++++++- test.ini | 6 +++--- 4 files changed, 45 insertions(+), 4 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index e804e0ad..7e6a9c42 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -4,6 +4,9 @@ Feat: * Adds support for ckanext-validation. Config `ckanext.xloader.validation.requires_successful_report` controls whether a resource requires a successful validation report to be XLoadered. By default, a resource would also require a Validation Schema, which can be turned off with `ckanext.xloader.validation.enforce_schema`. +Fix: +* Properly handle REDIS queue timeouts to close/delete any temporary files. + **Full Changelog**: https://github.com/ckan/ckanext-xloader/compare/1.1.2...1.1.3 diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index 85c51936..aa23fb86 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -15,6 +15,7 @@ from six.moves.urllib.parse import urlsplit import requests from rq import get_current_job +from rq.timeouts import JobTimeoutException import sqlalchemy as sa from ckan import model @@ -259,6 +260,13 @@ def tabulator_load(): logger.warning('Load using COPY failed: %s', e) logger.info('Trying again with tabulator') tabulator_load() + except JobTimeoutException as e: + try: + tmp_file.close() + except FileNotFoundError: + pass + logger.warning('Job timed out after %ss', RETRIED_JOB_TIMEOUT) + raise JobError('Job timed out after {}s'.format(RETRIED_JOB_TIMEOUT)) except FileCouldNotBeLoadedError as e: logger.warning('Loading excerpt for this format not supported.') logger.error('Loading file raised an error: %s', e) @@ -357,6 +365,7 @@ def _download_resource_data(resource, data, api_key, logger): response.close() data['datastore_contains_all_records_of_source_file'] = False except requests.exceptions.HTTPError as error: + tmp_file.close() # status code error logger.debug('HTTP error: %s', error) raise HTTPError( @@ -368,6 +377,7 @@ def _download_resource_data(resource, data, api_key, logger): raise JobError('Connection timed out after {}s'.format( DOWNLOAD_TIMEOUT)) except requests.exceptions.RequestException as e: + tmp_file.close() try: err_message = str(e.reason) except AttributeError: @@ -376,6 +386,10 @@ def _download_resource_data(resource, data, api_key, logger): raise HTTPError( message=err_message, status_code=None, request_url=url, response=None) + except JobTimeoutException as e: + tmp_file.close() + logger.warning('Job timed out after %ss', RETRIED_JOB_TIMEOUT) + raise JobError('Job timed out after {}s'.format(RETRIED_JOB_TIMEOUT)) logger.info('Downloaded ok - %s', printable_file_size(length)) file_hash = m.hexdigest() diff --git a/ckanext/xloader/tests/test_jobs.py b/ckanext/xloader/tests/test_jobs.py index e819dad9..4c15fb54 100644 --- a/ckanext/xloader/tests/test_jobs.py +++ b/ckanext/xloader/tests/test_jobs.py @@ -1,5 +1,6 @@ import pytest import io +import os from datetime import datetime @@ -16,6 +17,7 @@ _TEST_FILE_CONTENT = "x, y\n1,2\n2,4\n3,6\n4,8\n5,10" +_TEST_LARGE_FILE_CONTENT = "\n1,2\n2,4\n3,6\n4,8\n5,10" def get_response(download_url, headers): @@ -25,7 +27,6 @@ def get_response(download_url, headers): resp.headers = headers return resp - def get_large_response(download_url, headers): """Mock jobs.get_response() method to fake a large file.""" resp = Response() @@ -33,6 +34,17 @@ def get_large_response(download_url, headers): resp.headers = {'content-length': 2000000000} return resp +def get_large_data_response(download_url, headers): + """Mock jobs.get_response() method.""" + resp = Response() + f_content = _TEST_FILE_CONTENT + (_TEST_LARGE_FILE_CONTENT * 500000) + resp.raw = io.BytesIO(f_content.encode()) + resp.headers = headers + return resp + +def _get_temp_files(dir='/tmp'): + return [os.path.join(dir, f) for f in os.listdir(dir) if os.path.isfile(os.path.join(dir, f))] + @pytest.fixture def apikey(): @@ -74,6 +86,8 @@ def data(create_with_upload, apikey): @pytest.mark.usefixtures("clean_db", "with_plugins") +@pytest.mark.ckan_config("ckanext.xloader.job_timeout", 2) +@pytest.mark.ckan_config("ckan.jobs.timeout", 2) class TestXLoaderJobs(helpers.FunctionalRQTestBase): def test_xloader_data_into_datastore(self, cli, data): @@ -123,6 +137,16 @@ def test_data_max_excerpt_lines_config(self, cli, data): resource = helpers.call_action("resource_show", id=data["metadata"]["resource_id"]) assert resource["datastore_contains_all_records_of_source_file"] is False + def test_data_with_rq_job_timeout(self, cli, data): + file_suffix = 'multiplication_2.csv' + self.enqueue(jobs.xloader_data_into_datastore, [data], rq_kwargs=dict(timeout=2)) + with mock.patch("ckanext.xloader.jobs.get_response", get_large_data_response): + stdout = cli.invoke(ckan, ["jobs", "worker", "--burst"]).output + assert "Job timed out after" in stdout + for f in _get_temp_files(): + # make sure that the tmp file has been closed/deleted in job timeout exception handling + assert file_suffix not in f + @pytest.mark.usefixtures("clean_db") class TestSetResourceMetadata(object): diff --git a/test.ini b/test.ini index 7bfab684..c02827bd 100644 --- a/test.ini +++ b/test.ini @@ -34,13 +34,13 @@ handlers = console [logger_ckan] qualname = ckan -handlers = +handlers = console level = INFO [logger_ckanext_xloader] qualname = ckanext.xloader -handlers = -level = DEBUG +handlers = console +level = WARN [logger_sqlalchemy] handlers =