Skip to content

Commit

Permalink
Merge pull request #482 from kids-first/job-logs
Browse files Browse the repository at this point in the history
✨ Job logs
  • Loading branch information
dankolbman authored Oct 21, 2020
2 parents 6e000e7 + a5bd638 commit 2bc60fd
Show file tree
Hide file tree
Showing 27 changed files with 584 additions and 247 deletions.
145 changes: 145 additions & 0 deletions creator/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import sys
import logging
import pytz
from io import StringIO
from datetime import datetime
from functools import wraps
from rq.utils import make_colorizer

from django.conf import settings
from django.core.files.base import ContentFile
from django_s3_storage.storage import S3Storage

from creator.jobs.models import Job, JobLog
from creator.version_info import VERSION, COMMIT

logger = logging.getLogger(__name__)

green = make_colorizer("green")
yellow = make_colorizer("yellow")
blue = make_colorizer("blue")
grey = make_colorizer("lightgray")
red = make_colorizer("red")


class task:
"""
A decorator to uniformly setup tasks that get enqueued for workers to
execute.
"""

def __init__(self, job=None):
self.job = job
self.logger = logging.getLogger("TaskLogger")
self.start_time = datetime.utcnow()

self.stream = StringIO()
handler = logging.StreamHandler(self.stream)
formatter = logging.Formatter(
"[%(asctime)s] %(levelname)s: %(message)s"
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
# Add the handler to the base module to capture all log output
logging.getLogger("creator").addHandler(handler)

def __call__(self, f):
@wraps(f)
def task_wrapper(*args, **kwargs):
try:
self._job = Job.objects.get(name=self.job)
except Job.DoesNotExist:
logger.info(
f"The {self.job} job does not exist. "
"Registering a new unsceduled-job for it."
)
self._job = Job(name=self.job, active=True, scheduled=False)

if not self._job.active:
logger.info(
f"The {self._job.name} job is not active, will not run"
)
return

self.log_preamble()

self.logger.info(blue("Dropping into the Job process"))
self.logger.info("")

# Used to store any exception that gets raised during execution
exception = None

try:
f(*args, **kwargs)
except Exception as err:
exception = err
self.logger.error("")
logger.error(
red(f"There was a problem running the job: {err}")
)
self.logger.error("")
self._job.failing = True
self._job.last_error = str(err)
else:
self.logger.info("")
self.logger.info(green("Job exited successfully"))
self.logger.info("")
self._job.failing = False
self._job.last_error = ""

self.logger.info("Updating job status")
self._job.last_run = datetime.utcnow()
self._job.last_run = self._job.last_run.replace(tzinfo=pytz.UTC)
self._job.save()

self.close()

# If there was some exception, throw it now after the Job status
# has been updated
if exception:
raise exception

return task_wrapper

def close(self):
self.logger.info("Job complete. Saving log file")

log = JobLog(job=self._job, error=self._job.failing)

self.logger.info(f"Saving as Job Log {yellow(str(log.id))}")

duration = (datetime.utcnow() - self.start_time).total_seconds()
self.logger.info(grey(f"Finished in {duration:.2f}s"))

self.logger.info(f"Uploading log contents, goodbye! 👋")

if (
settings.DEFAULT_FILE_STORAGE
== "django_s3_storage.storage.S3Storage"
):
log.log_file.storage = S3Storage(
aws_s3_bucket_name=settings.LOG_BUCKET
)

name = (
f"{datetime.utcnow().strftime('%Y/%m/%d/')}"
f"{int(datetime.utcnow().timestamp())}_{self._job.name}.log"
)
log.log_file.save(name, ContentFile(self.stream.getvalue()))

log.save()

def log_preamble(self):
"""
Post some info about the codebase to the start of the log.
"""
self.logger.info(blue(f"╔{'═'*48}╗"))
self.logger.info(blue(f"║ {'Study Creator API Worker':<46} ║"))
self.logger.info(blue(f"╠{'═'*48}╣"))
self.logger.info(blue(f"║ Version: {VERSION:<37} ║"))
self.logger.info(blue(f"║ Job: {self.job:<41} ║"))
self.logger.info(
blue(f"║ Date: {datetime.utcnow().isoformat():<40} ║")
)
self.logger.info(blue(f"╚{'═'*48}╝"))
self.logger.info("")
4 changes: 4 additions & 0 deletions creator/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
"list_all_referraltoken",
"add_referraltoken",
"extract_version_config",
"view_job",
"list_all_job",
"view_joblog",
"list_all_joblog",
],
"Developers": [
"view_study",
Expand Down
Empty file added creator/jobs/__init__.py
Empty file.
5 changes: 5 additions & 0 deletions creator/jobs/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from django.apps import AppConfig


class JobsConfig(AppConfig):
name = "jobs"
47 changes: 47 additions & 0 deletions creator/jobs/migrations/0001_move_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Generated by Django 2.2.13 on 2020-09-23 01:31

import creator.jobs.models
from django.db import migrations, models
import django.db.models.deletion
import uuid


class Migration(migrations.Migration):

initial = True

dependencies = [
]

operations = [
migrations.CreateModel(
name='Job',
fields=[
('name', models.CharField(help_text='The name of the scheduled job', max_length=400, primary_key=True, serialize=False)),
('scheduler', models.CharField(default='default', help_text='The scheduler the Job will run on', max_length=400)),
('description', models.TextField(help_text="Description of the Job's role", null=True)),
('active', models.BooleanField(default=True, help_text='If the Job is active')),
('failing', models.BooleanField(default=False, help_text='If the Job is failing')),
('scheduled', models.BooleanField(default=False, help_text='If the Job is a recurring scheduled task')),
('created_on', models.DateTimeField(auto_now_add=True, help_text='Time the Job was created')),
('last_run', models.DateTimeField(help_text='Time of last run', null=True)),
('last_error', models.TextField(help_text='Error message from last failure', null=True)),
],
options={
'permissions': [('list_all_job', 'Can list all jobs'), ('view_settings', 'Can view settings'), ('view_queue', 'Can view queues')],
},
),
migrations.CreateModel(
name='JobLog',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('error', models.BooleanField(default=False, help_text='If there was an error running the Job')),
('log_file', models.FileField(help_text='The location where the log file is stored', max_length=1024, upload_to=creator.jobs.models._get_upload_directory)),
('created_at', models.DateTimeField(auto_now_add=True, help_text='Time the log was created')),
('job', models.ForeignKey(help_text='The Job that this log originated from', on_delete=django.db.models.deletion.CASCADE, related_name='logs', to='jobs.Job')),
],
options={
'permissions': [('list_all_joblog', 'Can list all job logs')],
},
),
]
Empty file.
123 changes: 123 additions & 0 deletions creator/jobs/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import os
import pytz
import uuid
from datetime import datetime
import django_rq
from django.conf import settings
from django.db import models


class Job(models.Model):
"""
Logs the current state of any scheduled, recurrent jobs.
"""

class Meta:
permissions = [
("list_all_job", "Can list all jobs"),
("view_settings", "Can view settings"),
("view_queue", "Can view queues"),
]

name = models.CharField(
primary_key=True,
max_length=400,
null=False,
help_text="The name of the scheduled job",
)
scheduler = models.CharField(
max_length=400,
null=False,
default="default",
help_text="The scheduler the Job will run on",
)
description = models.TextField(
null=True, help_text="Description of the Job's role"
)
active = models.BooleanField(
default=True, help_text="If the Job is active"
)
failing = models.BooleanField(
default=False, help_text="If the Job is failing"
)
scheduled = models.BooleanField(
default=False, help_text="If the Job is a recurring scheduled task"
)
created_on = models.DateTimeField(
auto_now_add=True, null=False, help_text="Time the Job was created"
)
last_run = models.DateTimeField(null=True, help_text="Time of last run")
last_error = models.TextField(
null=True, help_text="Error message from last failure"
)

@property
def enqueued_at(self):
"""
Returns the next scheduled run time for the job or None if it is
not a repeating job.
"""
if not self.scheduled:
return None

try:
scheduler = django_rq.get_scheduler(self.scheduler)
except KeyError:
# The scheduler may no longer exist,
# so assume this is not a scheduled job
return None
ts = scheduler.connection.zscore(
"rq:scheduler:scheduled_jobs", self.name
)
dt = datetime.fromtimestamp(ts)
return dt.replace(tzinfo=pytz.UTC)


def _get_upload_directory(instance, filename):
"""
Resolves the directory where a file should be stored
"""
if settings.DEFAULT_FILE_STORAGE == "django_s3_storage.storage.S3Storage":
prefix = f"{settings.LOG_DIR}/{filename}"
return prefix
else:
return os.path.join(settings.BASE_DIR, settings.LOG_DIR, filename)


class JobLog(models.Model):
"""
Tracks log output for a given Job run
"""

class Meta:
permissions = [("list_all_joblog", "Can list all job logs")]

id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
job = models.ForeignKey(
Job,
related_name="logs",
help_text="The Job that this log originated from",
on_delete=models.CASCADE,
)

error = models.BooleanField(
default=False, help_text="If there was an error running the Job"
)

log_file = models.FileField(
upload_to=_get_upload_directory,
max_length=1024,
help_text="The location where the log file is stored",
)

created_at = models.DateTimeField(
auto_now_add=True, null=False, help_text="Time the log was created"
)

@property
def path(self):
"""
Returns absolute path to log download endpoint
"""
download_url = f"/logs/{self.id}"
return download_url
Loading

0 comments on commit 2bc60fd

Please sign in to comment.