Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry crop generator if failed #127

Merged
merged 7 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions django_project/core/celery.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Tomorrow Now GAP."""
from __future__ import absolute_import, unicode_literals

import os
import logging
import os

from celery import Celery, signals
from celery.result import AsyncResult
Expand All @@ -11,7 +11,6 @@
from celery.worker.control import inspect_command
from django.utils import timezone


logger = logging.getLogger(__name__)

# set the default Django settings module for the 'celery' program.
Expand Down Expand Up @@ -50,6 +49,10 @@
# Run everyday at 01:30 UTC or 04:30 EAT
'schedule': crontab(minute='30', hour='1'),
},
'retry-crop-plan-generators': {
'task': 'retry_crop_plan_generators',
'schedule': crontab(minute='0', hour='*'),
},
'salient-collector-session': {
'task': 'salient_collector_session',
# Run every Monday 02:00 UTC
Expand Down Expand Up @@ -297,7 +300,7 @@ def task_failure_handler(


@signals.task_revoked.connect
def task_revoked_handler(sender, request = None, **kwargs):
def task_revoked_handler(sender, request=None, **kwargs):
"""Handle a cancelled task.

:param sender: task sender
Expand Down
32 changes: 23 additions & 9 deletions django_project/core/models/background_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@

"""


import uuid
import logging
from traceback import format_tb
import uuid
from ast import literal_eval as make_tuple
from django.db import models
from django.utils.translation import gettext_lazy as _
from traceback import format_tb

from django.conf import settings
from django.db import models
from django.utils import timezone

from django.utils.translation import gettext_lazy as _

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -137,10 +136,25 @@
blank=True
)

running_states = [
TaskStatus.PENDING, TaskStatus.QUEUED, TaskStatus.RUNNING
]

def __str__(self):
"""Get string representation."""
return str(self.uuid)

@staticmethod
def running_tasks():
"""Return all running tasks."""
return BackgroundTask.objects.filter(
status__in=BackgroundTask.running_states
)

def is_running(self):
"""Check if task is running."""
return self.status in BackgroundTask.running_states

Check warning on line 156 in django_project/core/models/background_task.py

View check run for this annotation

Codecov / codecov/patch

django_project/core/models/background_task.py#L156

Added line #L156 was not covered by tests

@property
def requester_name(self):
"""Get the requester name."""
Expand Down Expand Up @@ -284,7 +298,7 @@
update_fields=['last_update', 'progress_text']
)

def is_possible_interrupted(self, delta = 1800):
def is_possible_interrupted(self, delta=1800):
"""Check whether the task is stuck or being interrupted.

This requires the task to send an update to BackgroundTask.
Expand All @@ -294,8 +308,8 @@
:rtype: bool
"""
if (
self.status == TaskStatus.QUEUED or
self.status == TaskStatus.RUNNING
self.status == TaskStatus.QUEUED or
self.status == TaskStatus.RUNNING
):
# check if last_update is more than 30mins than current date time
if self.last_update:
Expand Down
30 changes: 22 additions & 8 deletions django_project/gap/admin/crop_insight.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
def generate_insight_report_action(modeladmin, request, queryset):
"""Generate insight report."""
for query in queryset:
generate_insight_report(query.id)
generate_insight_report.delay(query.id)

Check warning on line 107 in django_project/gap/admin/crop_insight.py

View check run for this annotation

Codecov / codecov/patch

django_project/gap/admin/crop_insight.py#L107

Added line #L107 was not covered by tests
modeladmin.message_user(
request,
'Process will be started in background!',
Expand All @@ -116,18 +116,17 @@
class CropInsightRequestAdmin(admin.ModelAdmin):
"""Admin for CropInsightRequest."""

list_display = ('requested_at', 'farm_list', 'file_url')
list_display = (
'requested_at', 'farm_count', 'file_url', 'last_task_status',
'background_tasks'
)
filter_horizontal = ('farms',)
actions = (generate_insight_report_action,)
readonly_fields = ('file',)

def farm_list(self, obj: CropInsightRequest):
def farm_count(self, obj: CropInsightRequest):
"""Return farm list."""
return [farm.unique_id for farm in obj.farms.all()]

def file(self, obj: CropInsightRequest):
"""Return file path."""
return [farm.unique_id for farm in obj.farms.all()]
return obj.farms.count()

Check warning on line 129 in django_project/gap/admin/crop_insight.py

View check run for this annotation

Codecov / codecov/patch

django_project/gap/admin/crop_insight.py#L129

Added line #L129 was not covered by tests

def file_url(self, obj):
"""Return file url."""
Expand All @@ -137,3 +136,18 @@
f'target="__blank__">{obj.file.url}</a>'
)
return '-'

def last_task_status(self, obj: CropInsightRequest):
"""Return task status."""
bg_task = obj.last_background_task
if bg_task:
return bg_task.status
return None

Check warning on line 145 in django_project/gap/admin/crop_insight.py

View check run for this annotation

Codecov / codecov/patch

django_project/gap/admin/crop_insight.py#L142-L145

Added lines #L142 - L145 were not covered by tests

def background_tasks(self, obj: CropInsightRequest):
"""Return ids of background tasks that are running."""
url = (

Check warning on line 149 in django_project/gap/admin/crop_insight.py

View check run for this annotation

Codecov / codecov/patch

django_project/gap/admin/crop_insight.py#L149

Added line #L149 was not covered by tests
f"/admin/core/backgroundtask/?context_id__exact={obj.id}&"
f"task_name__in={','.join(CropInsightRequest.task_names)}"
)
return format_html(f'<a target="_blank" href={url}>link</a>')

Check warning on line 153 in django_project/gap/admin/crop_insight.py

View check run for this annotation

Codecov / codecov/patch

django_project/gap/admin/crop_insight.py#L153

Added line #L153 was not covered by tests
100 changes: 97 additions & 3 deletions django_project/gap/models/crop_insight.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import json
import uuid
from datetime import date
from datetime import date, timedelta

from django.conf import settings
from django.contrib.auth import get_user_model
Expand All @@ -17,6 +17,7 @@
from django.utils import timezone

from core.group_email_receiver import crop_plan_receiver
from core.models.background_task import BackgroundTask, TaskStatus
from core.models.common import Definition
from gap.models import Farm
from gap.models.lookup import RainfallClassification
Expand Down Expand Up @@ -340,7 +341,6 @@
spw = FarmSuitablePlantingWindowSignal.objects.filter(
farm=self.farm,
generated_date=self.generated_date

).first()
if spw:
try:
Expand Down Expand Up @@ -408,6 +408,93 @@
null=True, blank=True
)

task_names = ['generate_insight_report', 'generate_crop_plan']

@property
def last_background_task(self) -> BackgroundTask:
"""Return background task."""
return BackgroundTask.objects.filter(
context_id=self.id,
task_name__in=self.task_names
).last()

@property
def background_tasks(self):
"""Return background task."""
return BackgroundTask.objects.filter(
context_id=self.id,
task_name__in=self.task_names
)

@property
def background_task_running(self):
"""Return background task that is running."""
return BackgroundTask.running_tasks().filter(
context_id=self.id,
task_name__in=self.task_names
)

@staticmethod
def today_reports():
"""Return query of today reports."""
now = timezone.now()
return CropInsightRequest.objects.filter(
requested_at__gte=now.date(),
requested_at__lte=now.date() + timedelta(days=1),
)

@property
def skip_run(self):
"""Skip run process."""
background_task_running = self.background_task_running
last_running_background_task = background_task_running.last()
last_background_task = self.last_background_task

# This rule is based on the second task that basically
# is already running
# So we need to check of other task is already running

# If there are already complete task
# Skip it
if self.background_tasks.filter(status=TaskStatus.COMPLETED):
return True

# If the last running background task is
# not same with last background task
# We skip it as the last running one is other task
if last_running_background_task and (
last_running_background_task.id != last_background_task.id
):
return True

# If there are already running task 2,
# the current task is skipped
if background_task_running.count() >= 2:
return True

now = timezone.now()
try:
if self.requested_at.date() != now.date():
return True
except AttributeError:
if self.requested_at != now.date():
return True

Check warning on line 481 in django_project/gap/models/crop_insight.py

View check run for this annotation

Codecov / codecov/patch

django_project/gap/models/crop_insight.py#L478-L481

Added lines #L478 - L481 were not covered by tests
return False

def update_note(self, message):
"""Update the notes."""
if self.last_background_task:
self.last_background_task.progress_text = message
self.last_background_task.save()

Check warning on line 488 in django_project/gap/models/crop_insight.py

View check run for this annotation

Codecov / codecov/patch

django_project/gap/models/crop_insight.py#L487-L488

Added lines #L487 - L488 were not covered by tests
self.notes = message
self.save()

def run(self):
"""Run the generate report."""
if self.skip_run:
return
self._generate_report()

@property
def title(self) -> str:
"""Return the title of the request."""
Expand All @@ -419,8 +506,9 @@
f"({east_africa_timezone})"
)

def generate_report(self):
def _generate_report(self):
"""Generate reports."""
from spw.generator.crop_insight import CropInsightFarmGenerator
output = []

# If farm is empty, put empty farm
Expand All @@ -430,6 +518,11 @@

# Get farms
for farm in farms:
# If it has farm id, generate spw
if farm.pk:
self.update_note('Generating SPW for farm: {}'.format(farm))
CropInsightFarmGenerator(farm).generate_spw()

data = CropPlanData(
farm, self.requested_at.date(),
forecast_fields=[
Expand All @@ -444,6 +537,7 @@
output.append([val for key, val in data.items()])

# Render csv
self.update_note('Generate CSV')
csv_content = ''

# Replace header
Expand Down
28 changes: 16 additions & 12 deletions django_project/gap/tasks/crop_insight.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,26 @@ def generate_spw(farms_id: list):
def generate_insight_report(_id: list):
"""Generate insight report."""
request = CropInsightRequest.objects.get(id=_id)
request.generate_report()
request.run()


@app.task(name="generate_crop_plan")
def generate_crop_plan():
"""Generate crop plan for registered farms."""
farms = Farm.objects.all().order_by('id')
# generate crop insight for all farms
for farm in farms:
CropInsightFarmGenerator(farm).generate_spw()
# create report request
request = CropInsightRequest.objects.create(
requested_by=User.objects.filter(
is_superuser=True
).first()
)
request.farms.set(farms)
user = User.objects.filter(is_superuser=True).first()
request = CropInsightRequest.objects.create(requested_by=user)
request.farms.set(Farm.objects.all().order_by('id'))
# generate report
request.generate_report()
request.run()


@app.task(name="retry_crop_plan_generators")
def retry_crop_plan_generators():
"""Retry crop plan generator.

This will run the crop plan generators but just run the is cancelled.
If it already has spw data, it will also be skipped.
"""
for request in CropInsightRequest.today_reports():
request.run()
Loading
Loading