diff --git a/commcare_connect/events/__init__.py b/commcare_connect/events/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/commcare_connect/events/models..py b/commcare_connect/events/models..py new file mode 100644 index 00000000..1c135cbb --- /dev/null +++ b/commcare_connect/events/models..py @@ -0,0 +1,151 @@ +from django.db import models + +from commcare_connect.users.models import User + + +class Event(models.Model): + + class Type(models.TextChoices): + INVITE_SENT = "is", gettext("Invite Sent") + RECORDS_APPROVED = "rp", gettext("Records Approved") + RECORDS_FLAGGED = "rf", gettext("Records Flagged") + RECORDS_REJECTED = "rj", gettext("Records Rejected") + PAYMENT_APPROVED = "pp", gettext("Payment Approved") + PAYMENT_ACCRUED = "pa", gettext("Payment Accrued") + PAYMENT_TRANSFERRED = "pf", gettext("Payment Transferred") + NOTIFICATIONS_SENT = "nf", gettext("Notification Sent") + ADDITIONAL_BUDGET_ADDED = "ab", gettext("Additional Budget Added") + + date_created = models.DateTimeField(auto_now_add=True, db_index=True) + event_type = models.CharField(max_length='2', choices=Type.choices) + user = models.ForeignKey(User, on_delete=models.CASCADE, null=True) + opportunity = models.ForeignKey(Opportunity, on_delete=models.PROTECT, null=True) + + @classmethod + def track(cls, use_async=True): + """ + To track an event instantiate the object and call this method, + instead of calling save directly. + + If use_async is True, the event is queued in Redis and saved + via celery, otherwise it's saved directly. + """ + from commcare_connect.events.tasks import track_event + track_event(self, use_async=use_async) + + + +@dataclass +class InferredEvent: + user: User + opportunity: Opportunity + date_created: datetime + event_type: str + + +class InferredEventSpec(ABCMeta): + """ + Use this to define an Event that can be inferred + based on other models. + """ + + @abstractproperty + def model_cls(self): + """ + The source model class to infer the event from + for e.g. UserVisit + """ + raise NotImplementedError + + @abstractproperty + def event_type(self): + """ + Should be a tuple to indicate the name + for e.g. "RECORDS_FLAGGED", gettext("Records Flagged") + """ + raise NotImplementedError + + @abstractproperty + def event_filters(self): + """ + Should be a dict of the queryset filters + for e.g. {'flagged': True} for RecordsFlagged for UserVisit + """ + raise NotImplementedError + + + @abstractproperty + def user(self): + """ + The field corresponding to user on source model. + """ + raise NotImplementedError + + @abstractproperty + def date_created(self): + """ + The field corresponding to user date_created. + """ + raise NotImplementedError + + @abstractproperty + def opportunity(self): + """ + The field corresponding to user opportunity, could be None. + """ + raise NotImplementedError + + def get_events(self, user=None, from_date=None, to_date=None): + filters = {} + filters.update(event_filters) + + if user: + filters.update({self.user: user}) + if from_date: + filters.update({f"{self.date_created}__gte": from_date}) + if from_date: + filters.update({f"{self.date_created}__lte": to_date}) + + events = self.model_cls.objects.filter( + **filters + ).values( + self.user, self.opportunity, self.date_created + ).iterator() + for event in events: + yield InferredEvent( + event_type=self.event_type[0], + user=event[self.user], + date_created=event[self.date_created], + opportunity=event.get(self.opportunity, None) + ) + + +class RecordsFlagged(InferredEventSpec): + + event_type = "RECORDS_FLAGGED", gettext("Records Flagged") + model = UserVisit + event_filters = {"flagged": True} + field_mapping = { + "user": "user", + "date_created": "visit_date", + "opportunity": "opportunity", + } + + +INFERRED_EVENT_SPECS = [ + RecordsFlagged +] + + +def get_events(user=None, from_date=None, to_date=None): + filters = { + "user": user, + "date_created__gte": from_date, + "date_created__lte": to_date, + } + filters = {k:v for k,v in filters.items() if v is not None} + raw_events = Events.objects.filter(**filters).all() + inferred_events = [] + for event_spec in INFERRED_EVENT_SPECS: + inferred_events += event_spec.get_events() + return raw_events + inferred_events diff --git a/commcare_connect/events/tasks.py b/commcare_connect/events/tasks.py new file mode 100644 index 00000000..4a408467 --- /dev/null +++ b/commcare_connect/events/tasks.py @@ -0,0 +1,37 @@ +import pickle +from datetime import datetime + +from django.db import transaction +from django_redis import get_redis_connection + +from config import celery_app + +from .models import Event + +REDIS_EVENTS_QUEUE = "events_queue" + + +@celery_app.task +def process_events_batch(): + redis_conn = get_redis_connection("default") + events = redis_conn.lrange(REDIS_EVENTS_QUEUE, 0, -1) + if not events: + return + + with transaction.atomic(): + event_objs = [] + for event in events: + event_objs.append(pickle.loads(event)) + Event.objects.bulk_create(event_objs) + + redis_conn.ltrim(REDIS_EVENTS_QUEUE, len(events), -1) + + +def track_event(event_obj, use_async=True): + event_obj.date_created = datetime.now() + if use_async: + redis_conn = get_redis_connection("default") + serialized_event = pickle.dumps(event_obj) + redis_conn.rpush(REDIS_EVENTS_QUEUE, serialized_event) + else: + event_obj.save() diff --git a/commcare_connect/opportunity/tasks.py b/commcare_connect/opportunity/tasks.py index 86265516..84eafe77 100644 --- a/commcare_connect/opportunity/tasks.py +++ b/commcare_connect/opportunity/tasks.py @@ -14,6 +14,7 @@ from commcare_connect.connect_id_client import fetch_users, send_message, send_message_bulk from commcare_connect.connect_id_client.models import Message +from commcare_connect.events.models import Event from commcare_connect.opportunity.app_xml import get_connect_blocks_for_app, get_deliver_units_for_app from commcare_connect.opportunity.export import ( export_deliver_status_table, @@ -101,6 +102,9 @@ def invite_user(user_id, opportunity_access_id): ), ) send_message(message) + Event(event_type=Event.Type.INVITE_SENT, user=user, opportunity=opportunity_access.opportunity).track( + use_async=False + ) @celery_app.task() diff --git a/commcare_connect/opportunity/views.py b/commcare_connect/opportunity/views.py index 42192784..0d7b0cd1 100644 --- a/commcare_connect/opportunity/views.py +++ b/commcare_connect/opportunity/views.py @@ -20,6 +20,7 @@ from django_tables2.export import TableExport from geopy import distance +from commcare_connect.events.models import Event from commcare_connect.form_receiver.serializers import XFormSerializer from commcare_connect.opportunity.forms import ( AddBudgetExistingUsersForm, @@ -382,6 +383,7 @@ def add_budget_existing_users(request, org_slug=None, pk=None): opportunity.total_budget += ocl.payment_unit.amount * additional_visits opportunity.save() return redirect("opportunity:detail", org_slug, pk) + Event(event_type=Event.Type.ADDITIONAL_BUDGET_ADDED, opportunity=opportunity).track() return render( request, @@ -750,6 +752,7 @@ def approve_visit(request, org_slug=None, pk=None): opp_id = user_visit.opportunity_id access = OpportunityAccess.objects.get(user_id=user_visit.user_id, opportunity_id=opp_id) update_payment_accrued(opportunity=access.opportunity, users=[access.user]) + Event(event_type=Event.Type.RECORDS_APPROVED, user=user_visit.user, opportunity=access.opportunity).track() return redirect("opportunity:user_visits_list", org_slug=org_slug, opp_id=user_visit.opportunity.id, pk=access.id) @@ -760,6 +763,7 @@ def reject_visit(request, org_slug=None, pk=None): user_visit.save() access = OpportunityAccess.objects.get(user_id=user_visit.user_id, opportunity_id=user_visit.opportunity_id) update_payment_accrued(opportunity=access.opportunity, users=[access.user]) + Event(event_type=Event.Type.RECORDS_REJECTED, user=user_visit.user, opportunity=access.opportunity).track() return redirect("opportunity:user_visits_list", org_slug=org_slug, opp_id=user_visit.opportunity_id, pk=access.id) diff --git a/commcare_connect/opportunity/visit_import.py b/commcare_connect/opportunity/visit_import.py index d7e50e43..930d34fa 100644 --- a/commcare_connect/opportunity/visit_import.py +++ b/commcare_connect/opportunity/visit_import.py @@ -7,6 +7,7 @@ from django.db import transaction from tablib import Dataset +from commcare_connect.events.models import Event from commcare_connect.opportunity.models import ( CompletedWork, CompletedWorkStatus, @@ -143,6 +144,7 @@ def update_payment_accrued(opportunity: Opportunity, users): access.payment_accrued += approved_count * completed_work.payment_unit.amount completed_work.save() access.save() + Event(event_type=Event.Type.PAYMENT_ACCRUED, user=access.user, opportunity=access.opportunity).track() def get_status_by_visit_id(dataset) -> dict[int, VisitValidationStatus]: @@ -238,6 +240,7 @@ def _bulk_update_payments(opportunity: Opportunity, imported_data: Dataset) -> P payment = Payment.objects.create(opportunity_access=access, amount=amount) seen_users.add(username) payment_ids.append(payment.pk) + Event(event_type=Event.Type.PAYMENT_TRANSFERRED, user=access.user, opportunity=opportunity).track() missing_users = set(usernames) - seen_users send_payment_notification.delay(opportunity.id, payment_ids) return PaymentImportStatus(seen_users, missing_users) diff --git a/config/settings/base.py b/config/settings/base.py index 50175ddc..83c4db44 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -69,6 +69,7 @@ LOCAL_APPS = [ "commcare_connect.commcarehq_provider", + "commcare_connect.events", "commcare_connect.form_receiver", "commcare_connect.opportunity", "commcare_connect.organization",