From 98433617b8aa5aeffcb51a68c8a95d9f4e961c09 Mon Sep 17 00:00:00 2001 From: pablohashescobar <nikhilschacko@gmail.com> Date: Tue, 20 Feb 2024 16:57:40 +0530 Subject: [PATCH] fix: email notification duplicates --- .../plane/bgtasks/email_notification_task.py | 263 ++++++++++-------- 1 file changed, 146 insertions(+), 117 deletions(-) diff --git a/apiserver/plane/bgtasks/email_notification_task.py b/apiserver/plane/bgtasks/email_notification_task.py index 9e9b348e197..617bfcfdc60 100644 --- a/apiserver/plane/bgtasks/email_notification_task.py +++ b/apiserver/plane/bgtasks/email_notification_task.py @@ -1,9 +1,9 @@ from datetime import datetime from bs4 import BeautifulSoup - # Third party imports from celery import shared_task +from sentry_sdk import capture_exception # Django imports from django.utils import timezone @@ -16,6 +16,17 @@ from plane.license.utils.instance_value import get_email_configuration from plane.settings.redis import redis_instance +# acquire and delete redis lock +def acquire_lock(lock_id, expire_time=300): + redis_client = redis_instance() + """Attempt to acquire a lock with a specified expiration time.""" + return redis_client.set(lock_id, 'true', nx=True, ex=expire_time) + +def release_lock(lock_id): + """Release a lock.""" + redis_client = redis_instance() + redis_client.delete(lock_id) + @shared_task def stack_email_notification(): # get all email notifications @@ -142,135 +153,153 @@ def process_html_content(content): processed_content_list.append(processed_content) return processed_content_list + @shared_task def send_email_notification( issue_id, notification_data, receiver_id, email_notification_ids ): + # Convert UUIDs to a sorted, concatenated string + sorted_ids = sorted(email_notification_ids) + ids_str = "_".join(str(id) for id in sorted_ids) + lock_id = f"send_email_notif_{issue_id}_{receiver_id}_{ids_str}" + + # acquire the lock for sending emails try: - ri = redis_instance() - base_api = (ri.get(str(issue_id)).decode()) - data = create_payload(notification_data=notification_data) + if acquire_lock(lock_id=lock_id): + # get the redis instance + ri = redis_instance() + base_api = (ri.get(str(issue_id)).decode()) + data = create_payload(notification_data=notification_data) - # Get email configurations - ( - EMAIL_HOST, - EMAIL_HOST_USER, - EMAIL_HOST_PASSWORD, - EMAIL_PORT, - EMAIL_USE_TLS, - EMAIL_FROM, - ) = get_email_configuration() + # Get email configurations + ( + EMAIL_HOST, + EMAIL_HOST_USER, + EMAIL_HOST_PASSWORD, + EMAIL_PORT, + EMAIL_USE_TLS, + EMAIL_FROM, + ) = get_email_configuration() - receiver = User.objects.get(pk=receiver_id) - issue = Issue.objects.get(pk=issue_id) - template_data = [] - total_changes = 0 - comments = [] - actors_involved = [] - for actor_id, changes in data.items(): - actor = User.objects.get(pk=actor_id) - total_changes = total_changes + len(changes) - comment = changes.pop("comment", False) - mention = changes.pop("mention", False) - actors_involved.append(actor_id) - if comment: - comments.append( - { - "actor_comments": comment, - "actor_detail": { - "avatar_url": actor.avatar, - "first_name": actor.first_name, - "last_name": actor.last_name, - }, - } - ) - if mention: - mention["new_value"] = process_html_content(mention.get("new_value")) - mention["old_value"] = process_html_content(mention.get("old_value")) - comments.append( - { - "actor_comments": mention, - "actor_detail": { - "avatar_url": actor.avatar, - "first_name": actor.first_name, - "last_name": actor.last_name, - }, - } - ) - activity_time = changes.pop("activity_time") - # Parse the input string into a datetime object - formatted_time = datetime.strptime(activity_time, "%Y-%m-%d %H:%M:%S").strftime("%H:%M %p") + receiver = User.objects.get(pk=receiver_id) + issue = Issue.objects.get(pk=issue_id) + template_data = [] + total_changes = 0 + comments = [] + actors_involved = [] + for actor_id, changes in data.items(): + actor = User.objects.get(pk=actor_id) + total_changes = total_changes + len(changes) + comment = changes.pop("comment", False) + mention = changes.pop("mention", False) + actors_involved.append(actor_id) + if comment: + comments.append( + { + "actor_comments": comment, + "actor_detail": { + "avatar_url": actor.avatar, + "first_name": actor.first_name, + "last_name": actor.last_name, + }, + } + ) + if mention: + mention["new_value"] = process_html_content(mention.get("new_value")) + mention["old_value"] = process_html_content(mention.get("old_value")) + comments.append( + { + "actor_comments": mention, + "actor_detail": { + "avatar_url": actor.avatar, + "first_name": actor.first_name, + "last_name": actor.last_name, + }, + } + ) + activity_time = changes.pop("activity_time") + # Parse the input string into a datetime object + formatted_time = datetime.strptime(activity_time, "%Y-%m-%d %H:%M:%S").strftime("%H:%M %p") - if changes: - template_data.append( - { - "actor_detail": { - "avatar_url": actor.avatar, - "first_name": actor.first_name, - "last_name": actor.last_name, - }, - "changes": changes, - "issue_details": { - "name": issue.name, - "identifier": f"{issue.project.identifier}-{issue.sequence_id}", - }, - "activity_time": str(formatted_time), - } - ) + if changes: + template_data.append( + { + "actor_detail": { + "avatar_url": actor.avatar, + "first_name": actor.first_name, + "last_name": actor.last_name, + }, + "changes": changes, + "issue_details": { + "name": issue.name, + "identifier": f"{issue.project.identifier}-{issue.sequence_id}", + }, + "activity_time": str(formatted_time), + } + ) - summary = "Updates were made to the issue by" + summary = "Updates were made to the issue by" - # Send the mail - subject = f"{issue.project.identifier}-{issue.sequence_id} {issue.name}" - context = { - "data": template_data, - "summary": summary, - "actors_involved": len(set(actors_involved)), - "issue": { - "issue_identifier": f"{str(issue.project.identifier)}-{str(issue.sequence_id)}", - "name": issue.name, + # Send the mail + subject = f"{issue.project.identifier}-{issue.sequence_id} {issue.name}" + context = { + "data": template_data, + "summary": summary, + "actors_involved": len(set(actors_involved)), + "issue": { + "issue_identifier": f"{str(issue.project.identifier)}-{str(issue.sequence_id)}", + "name": issue.name, + "issue_url": f"{base_api}/{str(issue.project.workspace.slug)}/projects/{str(issue.project.id)}/issues/{str(issue.id)}", + }, + "receiver": { + "email": receiver.email, + }, "issue_url": f"{base_api}/{str(issue.project.workspace.slug)}/projects/{str(issue.project.id)}/issues/{str(issue.id)}", - }, - "receiver": { - "email": receiver.email, - }, - "issue_url": f"{base_api}/{str(issue.project.workspace.slug)}/projects/{str(issue.project.id)}/issues/{str(issue.id)}", - "project_url": f"{base_api}/{str(issue.project.workspace.slug)}/projects/{str(issue.project.id)}/issues/", - "workspace":str(issue.project.workspace.slug), - "project": str(issue.project.name), - "user_preference": f"{base_api}/profile/preferences/email", - "comments": comments, - } - html_content = render_to_string( - "emails/notifications/issue-updates.html", context - ) - text_content = strip_tags(html_content) - - try: - connection = get_connection( - host=EMAIL_HOST, - port=int(EMAIL_PORT), - username=EMAIL_HOST_USER, - password=EMAIL_HOST_PASSWORD, - use_tls=EMAIL_USE_TLS == "1", + "project_url": f"{base_api}/{str(issue.project.workspace.slug)}/projects/{str(issue.project.id)}/issues/", + "workspace":str(issue.project.workspace.slug), + "project": str(issue.project.name), + "user_preference": f"{base_api}/profile/preferences/email", + "comments": comments, + } + html_content = render_to_string( + "emails/notifications/issue-updates.html", context ) + text_content = strip_tags(html_content) - msg = EmailMultiAlternatives( - subject=subject, - body=text_content, - from_email=EMAIL_FROM, - to=[receiver.email], - connection=connection, - ) - msg.attach_alternative(html_content, "text/html") - msg.send() + try: + connection = get_connection( + host=EMAIL_HOST, + port=int(EMAIL_PORT), + username=EMAIL_HOST_USER, + password=EMAIL_HOST_PASSWORD, + use_tls=EMAIL_USE_TLS == "1", + ) - EmailNotificationLog.objects.filter( - pk__in=email_notification_ids - ).update(sent_at=timezone.now()) - return - except Exception as e: - print(e) + msg = EmailMultiAlternatives( + subject=subject, + body=text_content, + from_email=EMAIL_FROM, + to=[receiver.email], + connection=connection, + ) + msg.attach_alternative(html_content, "text/html") + msg.send() + + EmailNotificationLog.objects.filter( + pk__in=email_notification_ids + ).update(sent_at=timezone.now()) + + # release the lock + release_lock(lock_id=lock_id) + return + except Exception as e: + capture_exception(e) + # release the lock + release_lock(lock_id=lock_id) + return + else: + print("Duplicate task recived. Skipping...") return - except Issue.DoesNotExist: + except (Issue.DoesNotExist, User.DoesNotExist) as e: + release_lock(lock_id=lock_id) return