Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tests] Add tests to cancel tasks and handling of jobs in scheduler #16

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 268 additions & 10 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,47 +21,133 @@
# Santiago Dueñas <sduenas@bitergia.com>
#

import copy
import logging
import os.path
import sys
import unittest

if '..' not in sys.path:
sys.path.insert(0, '..')

from arthur.common import Q_CREATION_JOBS
from arthur.common import Q_CREATION_JOBS, CH_PUBSUB
from arthur.errors import NotFoundError
from arthur.tasks import TaskRegistry
from arthur.scheduler import Scheduler
from arthur.scheduler import Scheduler, logger
from arthur.worker import ArthurWorker

from tests import TestBaseRQ


def setup_scheduler(conn, dir, async_mode=False, add_task=0):
args = {
'uri': 'http://example.com/',
'gitpath': os.path.join(dir, 'data/git_log.txt')
}

cache_args = {
'cache_path': None,
'fetch_from_cache': False
}

sched_args = {
'delay': 1,
'max_retries_job': 0
}

registry = TaskRegistry()

if add_task > 0:
for i in range(add_task):
_ = registry.add("mytask" + str(i), 'git', args,
cache_args=cache_args,
sched_args=sched_args)

return Scheduler(conn, registry, async_mode=async_mode)


class TestScheduler(TestBaseRQ):
"""Unit tests for Scheduler class"""

def test_init(self):
"""Test whether attributes are initializated"""

registry = TaskRegistry()
schlr = Scheduler(self.conn, registry, async_mode=False)
self.assertEqual(schlr.conn, self.conn)
self.assertIsNotNone(schlr.registry)
self.assertFalse(schlr.async_mode)
self.assertIsNotNone(schlr._scheduler)
self.assertIsNotNone(schlr._listener)

schlr = Scheduler(self.conn, registry, async_mode=True)
self.assertTrue(schlr.async_mode)

def test_schedule(self):
"""Check that the schedule is started"""

args = {
'uri': 'http://example.com/',
'gitpath': os.path.join(self.dir, 'data/git_log.txt')
}

cache_args = {
'cache_path': None,
'fetch_from_cache': False
}

sched_args = {
'delay': 1,
'max_retries_job': 0
}

registry = TaskRegistry()

registry.add("mytask0", 'git', args,
cache_args=cache_args,
sched_args=sched_args)

schlr = Scheduler(self.conn, registry, async_mode=True)
schlr.schedule()

self.assertTrue(schlr._listener.is_alive())
self.assertTrue(schlr._scheduler.is_alive())
self.assertTrue(schlr._scheduler.do_run)

schlr.stop_listener()
schlr.stop_scheduler()

self.assertFalse(schlr._scheduler.do_run)
self.assertFalse(schlr._listener.is_alive())
self.assertFalse(schlr._scheduler.is_alive())

def test_schedule_task(self):
"""Jobs should be added and executed"""

args = {
'uri': 'http://example.com/',
'gitpath': os.path.join(self.dir, 'data/git_log.txt')
}

cache_args = {
'cache_path': None,
'fetch_from_cache': False
}

sched_args = {
'delay': 0,
'delay': 1,
'max_retries_job': 0
}

registry = TaskRegistry()
task = registry.add('mytask', 'git', args,
cache_args=cache_args,
sched_args=sched_args)

registry.add("mytask0", 'git', args,
cache_args=cache_args,
sched_args=sched_args)

schlr = Scheduler(self.conn, registry, async_mode=False)
job_id = schlr.schedule_task(task.task_id)

job_id = schlr.schedule_task("mytask0")

schlr.schedule()

Expand All @@ -73,13 +159,185 @@ def test_schedule_task(self):
self.assertEqual(result.nitems, 9)

def test_not_found_task(self):
"""Raises an error when the task to schedule does not exist"""
"""Raise an error when the task to schedule does not exist"""

registry = TaskRegistry()
schlr = Scheduler(self.conn, registry, async_mode=False)

self.assertRaises(NotFoundError, schlr.schedule_task, 'mytask-not-found')

def test_cancel_task(self):
"""Check whether tasks are deleted"""

args = {
'uri': 'http://example.com/',
'gitpath': os.path.join(self.dir, 'data/git_log.txt')
}

cache_args = {
'cache_path': None,
'fetch_from_cache': False
}

sched_args = {
'delay': 1,
'max_retries_job': 0
}

registry = TaskRegistry()

registry.add("mytask0", 'git', args,
cache_args=cache_args,
sched_args=sched_args)

schlr = Scheduler(self.conn, registry, async_mode=False)

self.assertEqual(len(schlr.registry.tasks), 1)

schlr.cancel_task("mytask0")
self.assertEqual(len(schlr.registry.tasks), 0)

def test_process_jobs_async(self):
"""Check whether successuful jobs are propertly handled"""

tasks = 5

args = {
'uri': 'http://example.com/',
'gitpath': os.path.join(self.dir, 'data/git_log.txt')
}

cache_args = {
'cache_path': None,
'fetch_from_cache': False
}

sched_args = {
'delay': 1,
'max_retries_job': 0
}

registry = TaskRegistry()

for i in range(tasks):
registry.add("mytask" + str(i), 'git', args, cache_args=cache_args, sched_args=sched_args)

schlr = Scheduler(self.conn, registry, async_mode=True)
w = ArthurWorker(schlr._scheduler._queues[Q_CREATION_JOBS])

job_ids = []
for i in range(tasks):
job_ids.append(schlr.schedule_task("mytask" + str(i)))

schlr.schedule()

while schlr._scheduler._jobs:
self.assertNotEqual(len(schlr._scheduler._jobs), 0)

self.assertEqual(len(schlr._scheduler._jobs.keys()), 0)

schlr.stop_listener()
schlr.stop_scheduler()

def test_handle_successful_job(self):
"""Check whether an exception is thrown when a task id is not found"""

logger.level = logging.INFO
tmp_path = '/tmp/tmp-log.txt'
f = open(tmp_path, 'w')

stream_handler = logging.StreamHandler(f)
logger.addHandler(stream_handler)

args = {
'uri': 'http://example.com/',
'gitpath': os.path.join(self.dir, 'data/git_log.txt')
}

cache_args = {
'cache_path': None,
'fetch_from_cache': False
}

sched_args = {
'delay': 1,
'max_retries_job': 0
}

registry = TaskRegistry()

registry.add("mytask0", 'git', args,
cache_args=cache_args,
sched_args=sched_args)

schlr = Scheduler(self.conn, registry, async_mode=False)
self.assertRaises(NotFoundError, schlr.schedule_task, 'mytask')
job_id = schlr.schedule_task("mytask0")

schlr.schedule()

job = schlr._scheduler._queues[Q_CREATION_JOBS].fetch_job(job_id)

schlr._handle_successful_job(job)

f.close()
logger.removeHandler(stream_handler)

with open(tmp_path, 'r') as f:
content = f.read()
self.assertTrue("(task: mytask0, old job: " + str(job_id) + ") re-scheduled" in content.lower())

def test_handle_successful_job_not_found(self):
"""Check whether an exception is thrown when a task id is not found"""

logger.level = logging.WARNING
tmp_path = '/tmp/tmp-log.txt'
f = open(tmp_path, 'w')

stream_handler = logging.StreamHandler(f)
logger.addHandler(stream_handler)

args = {
'uri': 'http://example.com/',
'gitpath': os.path.join(self.dir, 'data/git_log.txt')
}

cache_args = {
'cache_path': None,
'fetch_from_cache': False
}

sched_args = {
'delay': 1,
'max_retries_job': 0
}

registry = TaskRegistry()

registry.add("mytask0", 'git', args,
cache_args=cache_args,
sched_args=sched_args)

schlr = Scheduler(self.conn, registry, async_mode=False)
job_id = schlr.schedule_task("mytask0")

schlr.schedule()

job = schlr._scheduler._queues[Q_CREATION_JOBS].fetch_job(job_id)

wrong_job = copy.deepcopy(job)
wrong_job.kwargs = dict(job.kwargs)
wrong_job.kwargs['task_id'] = "wrong_task"

schlr._handle_successful_job(wrong_job)

f.close()
logger.removeHandler(stream_handler)

with open(tmp_path, 'r') as f:
content = f.read()
self.assertTrue("task wrong_task not found; related job #" + job_id + " will not be rescheduled"
in content.lower())


if __name__ == "__main__":
unittest.main()
unittest.main()