Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve status functionality and update question status according to service usage events #65

Open
7 tasks
thclark opened this issue Feb 6, 2024 · 1 comment

Comments

@thclark
Copy link
Contributor

thclark commented Feb 6, 2024

Feature request

Use Case

We want to update the status of Question entries based on the service usage events that we handle, so the status is always correct.

Current state

A number of status updates are missing

Proposed Solution

  • Update to 'in-progress' status on ask()
  • If ask() fails then update the exception status
  • On receipt of result, update to success status
  • On timeout (no heartbeats), change result to failed
  • Add and raise a signal explicitly for status change (eg to which an app could hook in order to issue update through a socket)
  • Document the use of statuses
  • Test the update of statuses

An example of a workaround in the wild

import logging
from django_gcp.tasks import OnDemandTask

from projects.models import PowerLossQuestion
from projects.models.questions import ERROR_STATUS, IN_PROGRESS_STATUS


logger = logging.getLogger(__name__)


class PowerLossQuestionTask(OnDemandTask):
    """An on-demand task to prepare and ask a question to the power-loss service

    This process is done in an async task, rather than immediately in a request,
    because some pre-preparation of the data is undertaken within `PowerLossQuestion.ask()`
    that can take several seconds of processing time.
    """

    def run(self, question_id=None):

        logger.info("Running PowerLossQuestionTask with question_id=%s", question_id)
        if question_id is None:
            raise ValueError("Task not supplied a question ID")

        # Ensure the question is fetched along with related items required for quick preparation
        question = PowerLossQuestion.objects.select_related(
            "configuration",
            "configuration__environment",
        ).get(id=question_id)
        try:
            question.calculation_status = IN_PROGRESS_STATUS
            question.save()
            question.ask()
        except Exception as e:
            question.calculation_status = ERROR_STATUS
            question.save()
            raise e
import logging
from django.db.models.signals import post_save
from django.dispatch import receiver
from django_twined.models import ServiceUsageEvent
from django_twined.signals import exception_received, monitor_message_received, result_received

from projects.models.questions import ERROR_STATUS, IN_PROGRESS_STATUS, SUCCESS_STATUS, PowerLossQuestion


logger = logging.getLogger(__name__)


@receiver(result_received, sender=ServiceUsageEvent)
def update_question_output_values(sender, service_usage_event, **kwargs):
    # TODO make a migration to flush all prior analyses into the ServiceUsageEvents queue as question_asked and question_repsonse_updated__result events
    # then stop copying data to output_values like this
    logger.info("Result received for question %s", service_usage_event.question_id)
    # Note: the related question is the base class Question, not PowerLossQuestion. So get the instance as the subclass
    question = service_usage_event.question.as_subclass()
    question.output_values = service_usage_event.data["output_values"]
    question.answered = service_usage_event.publish_time
    question.calculation_status = SUCCESS_STATUS
    question.save()
@thclark
Copy link
Contributor Author

thclark commented May 21, 2024

This is not quite so simple as above, because there are lots of failure modes out in the wild, and because it's possible for a question to fail without issuing a SUE... so it's not trivially the case that we can update from a SUE.

Here's an approach which runs a periodic task to clean up. This (or most of it) should be abstracted into Django Twined and instructions given to run it either as a periodic task or on demand with delay after a question has been asked.

import logging
from datetime import timedelta
from django.db.models import Q
from django.utils import timezone
from django_gcp.tasks import OnDemandTask, PeriodicTask
from django_twined.models.questions import (
    BAD_INPUT_STATUS,
    ERROR_STATUS,
    IN_PROGRESS_STATUS,
    NO_STATUS,
    SUCCESS_STATUS,
    TIMEOUT_STATUS,
)

from projects.models import PowerLossQuestion
from projects.models.questions import PowerLossQuestion


logger = logging.getLogger(__name__)


class QuestionStatusTask(PeriodicTask):
    """A periodic task to clean up status of dead or excepted questions"""

    run_every = "*/3 * * * *"

    def run(self, **_):

        logger.info("Running QuestionStatusTask")

        # Get the current time
        now = timezone.now()

        # Define the time thresholds
        asked_threshold = now - timedelta(minutes=15)
        acknowledgement_threshold = now - timedelta(minutes=15)
        heartbeat_threshold = now - timedelta(minutes=3)

        # There's no point working with questions asked more recently than the most recent threshold
        latest_threshold = max(asked_threshold, acknowledgement_threshold, heartbeat_threshold)

        # Get a queryset of questions where:
        #   - Question is asked
        #   - It was asked before the most recent threshold (questions asked later wouldn't be updated in any circumstance)
        #   - It is marked as still in progress
        conditions = Q(asked__isnull=False) & Q(asked__lt=latest_threshold) & Q(status=IN_PROGRESS_STATUS)
        qs = PowerLossQuestion.objects.filter(conditions)
        logger.info("Verifying status of %s questions", qs.count())

        def verify_status(q):
            if q.answered is not None:
                # Answered
                # TODO REFACTOR REQUEST the answered field and the conditions under which it's populated
                # depend on the answer to the following question:
                #       Do we send any kind of a ‘finalise’ event, like “everything got done”
                #       at the end of a question? I’ve realised for django_twined I’m using the ‘result’
                #       event to populate an ‘answered’ field in the questions database table… but after
                #       the result is sent, the ouptut manifest gets uploaded - so it’s possible for a
                #       question to have a result event but to still have failed.
                # Right now for the purposes of windquest, we need to manually pick over these questions to
                # determine whether their status should be error, timeout, or success
                if q.result is not None:
                    if len(q.exceptions) > 0:
                        # Occurs (possibly not exclusively) when a result is sent but an exception occurs
                        # during other operations that happen following that result but still within the
                        # scope of the service exception handler (for example in uploading files that form
                        # part of the output manifest)
                        return "answered_result_exceptions"
                    else:
                        # Not seen so far
                        # Could occur when a result is sent but an exception occurs
                        # during operations that happen after the scope of the service exception handler
                        # (for example in uploading files that form part of the output manifest)
                        return "answered_result_noexceptions"

                else:
                    # Not seen so far
                    return "answered_noresult"

            elif q.latest_heartbeat is not None:
                # Heartbeating
                if q.latest_heartbeat.publish_time < heartbeat_threshold:
                    # Heartbeating but heartbeat died more than heartbeat_threshold ago
                    if len(q.exceptions) > 0:
                        # Occurs (possibly not exclusively) when an exception is correctly handled in the
                        # question but the ServiceUsageEvent handler has a bug preventing the IN_PROGRESS_STATUS
                        # from being updated on receipt of the exception SUE.
                        return "heartbeat_timeout_exceptions"

                    else:
                        # Occurs (possibly not exclusively) when the question completes, but an exception
                        # occurs outside the question handler. This exception will appear
                        # in the log stream but not the structured exceptions (whose content
                        # should relate to service code only)
                        return "heartbeat_timeout_noexceptions"

                return "ok"

            elif q.delivery_acknowledgement is not None:
                # Acknowledged but not heartbeating (or crashed prior to first beat)
                if q.delivery_acknowledgement.publish_time < acknowledgement_threshold:
                    # Acknowledged but acknowledgement was more than acknowledgement_threshold ago
                    if len(q.exceptions) > 0:
                        # Not seen so far
                        return "ack_timeout_exceptions"
                    else:
                        # Occurs (possibly not exclusively) when there is a fatal error in the application
                        # (ie one not handled by the exception handler) occurring prior to the first heartbeat.
                        # This could be within the analysis or the wider application. This has been seen
                        # when attempting to load files at the beginning of an analysis, eg
                        # some kind of unhandled file io or memory error occurred
                        return "ack_timeout_noexceptions"

            elif q.asked < asked_threshold:
                # Asked but never acknowledged
                # Occurs when the question never makes it to a service, or there's an error in booting the
                # application prior to ack.
                if len(q.exceptions) > 0:
                    return "asked_timeout_exceptions"
                else:
                    return "asked_timeout_noexceptions"

            else:
                return "ok"

        # Iterate through the queryset to update questions sequentially
        for q in qs.all():
            scenario = verify_status(q)
            if scenario in ["asked_timeout_noexceptions", "ack_timeout_noexceptions", "heartbeat_timeout_noexceptions"]:
                logger.info(
                    "Question timed out with no exceptions (scenario %s) - set status TIMEOUT_STATUS on question %s",
                    scenario,
                    str(q.id),
                )
                q.status = TIMEOUT_STATUS
                q.save()

            if scenario in ["asked_timeout_exceptions", "heartbeat_timeout_exceptions", "ack_timeout_exceptions"]:
                logger.info(
                    "Question timed out with exceptions (scenario %s) - set status ERROR_STATUS on question %s",
                    scenario,
                    str(q.id),
                )
                q.status = ERROR_STATUS
                q.save()

            if scenario in ["answered_noresult", "answered_result_noexceptions", "answered_result_exceptions"]:
                logger.warning(
                    "Question answered with incomplete data (scenario %s) - manually check status of question %s",
                    scenario,
                    str(q.id),
                )

REFACTOR REQUEST

The latest_heartbeat and delivery_acknowledgement should be 1:1 related fields, NOT properties, which allows simpler querying above

# Part of the above could then be rewritten as (for example):
# qsa = qs.filter(Q(latest_heartbeat__isnull=False) & Q(latest_heartbeat__lt=heartbeat_threshold))
# qsb = qs.filter(
#     Q(latest_heartbeat__isnull=True)
#     & Q(delivery_acknowledgement__isnull=False)
#     & Q(delivery_acknowledgement__lt=acknowledgement_threshold)
# )
# qsc = qs.filter(
#     Q(latest_heartbeat__isnull=True) & Q(delivery_acknowledgement__isnull=True) & Q(asked__lt=asked_threshold)
# )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant