From 645afe2a4f82e39ef7a1cf5f87b9dea89a1f48e0 Mon Sep 17 00:00:00 2001 From: Adrian McPhee Date: Mon, 25 Nov 2024 15:15:42 +0100 Subject: [PATCH] feat: Refactor notification system test suite to use synchronous execution, enabling deterministic testing of async flows while maintaining async processing in production. Add comprehensive test coverage for notification creation, template rendering, and error handling with proper event logging. --- .../migrations/0024_bounty_data_migration.py | 2 +- .../migrations/0033_alter_bounty_status.py | 2 +- .../migrations/0034_alter_challenge_status.py | 2 +- .../0035_alter_initiative_status.py | 2 +- ...ge_priority_alter_challenge_reward_type.py | 2 +- ..._type_remove_product_object_id_and_more.py | 8 +- ...7_remove_personskill_expertise_and_more.py | 9 +- .../0009_alter_bountydeliveryattempt_kind.py | 8 +- apps/common/settings/base.py | 18 +- apps/common/settings/test.py | 56 +++- apps/engagement/apps.py | 3 +- apps/engagement/docs/notification-system.md | 146 +++++----- apps/engagement/events.py | 229 +++++++++------ ...pnotificationtemplate_template_and_more.py | 23 ++ apps/engagement/models.py | 39 ++- apps/engagement/tasks.py | 94 +++++++ apps/engagement/tests/conftest.py | 48 +++- apps/engagement/tests/test_listeners.py | 20 ++ ..._flow.py => test_notification_creation.py} | 19 +- .../tests/test_notification_processing.py | 263 ++++++++++++++++++ apps/event_hub/events.py | 6 + apps/event_hub/services/backends/base.py | 17 +- apps/event_hub/services/backends/django_q.py | 165 ++++++++--- apps/event_hub/services/event_bus.py | 66 ++--- pytest.ini | 2 +- 25 files changed, 956 insertions(+), 293 deletions(-) create mode 100644 apps/engagement/migrations/0004_rename_message_template_appnotificationtemplate_template_and_more.py create mode 100644 apps/engagement/tasks.py create mode 100644 apps/engagement/tests/test_listeners.py rename apps/engagement/tests/{test_notification_flow.py => test_notification_creation.py} (93%) create mode 100644 apps/engagement/tests/test_notification_processing.py diff --git a/apps/capabilities/product_management/migrations/0024_bounty_data_migration.py b/apps/capabilities/product_management/migrations/0024_bounty_data_migration.py index 40599354..ffb51ef3 100644 --- a/apps/capabilities/product_management/migrations/0024_bounty_data_migration.py +++ b/apps/capabilities/product_management/migrations/0024_bounty_data_migration.py @@ -4,7 +4,7 @@ def forward_func(apps, schema_editor): - Bounty = apps.capabilities.get_model("product_management.Bounty") + Bounty = apps.get_model("product_management", "Bounty") for bounty in Bounty.objects.all(): expertise_as_str = ", ".join([exp.name.title() for exp in bounty.expertise.all()]) skill_name = "" diff --git a/apps/capabilities/product_management/migrations/0033_alter_bounty_status.py b/apps/capabilities/product_management/migrations/0033_alter_bounty_status.py index f882c815..173dd298 100644 --- a/apps/capabilities/product_management/migrations/0033_alter_bounty_status.py +++ b/apps/capabilities/product_management/migrations/0033_alter_bounty_status.py @@ -4,7 +4,7 @@ def forward_func(apps, schema_editor): - Bounty = apps.capabilities.get_model("product_management", "Bounty") + Bounty = apps.get_model("product_management", "Bounty") for bounty in Bounty.objects.all(): status = int(bounty.status) diff --git a/apps/capabilities/product_management/migrations/0034_alter_challenge_status.py b/apps/capabilities/product_management/migrations/0034_alter_challenge_status.py index e49b39c1..425d2652 100644 --- a/apps/capabilities/product_management/migrations/0034_alter_challenge_status.py +++ b/apps/capabilities/product_management/migrations/0034_alter_challenge_status.py @@ -3,7 +3,7 @@ def forward_func(apps, schema_editor): - Challenge = apps.capabilities.get_model("product_management", "Challenge") + Challenge = apps.get_model("product_management", "Challenge") for challenge in Challenge.objects.all(): status = int(challenge.status) diff --git a/apps/capabilities/product_management/migrations/0035_alter_initiative_status.py b/apps/capabilities/product_management/migrations/0035_alter_initiative_status.py index 7956e676..ba8aaf38 100644 --- a/apps/capabilities/product_management/migrations/0035_alter_initiative_status.py +++ b/apps/capabilities/product_management/migrations/0035_alter_initiative_status.py @@ -4,7 +4,7 @@ def forward_func(apps, schema_editor): - Initiative = apps.capabilities.get_model("product_management", "Initiative") + Initiative = apps.get_model("product_management", "Initiative") for initiative in Initiative.objects.all(): status = int(initiative.status) if status == 1: diff --git a/apps/capabilities/product_management/migrations/0044_alter_challenge_priority_alter_challenge_reward_type.py b/apps/capabilities/product_management/migrations/0044_alter_challenge_priority_alter_challenge_reward_type.py index dc377f7a..2a6436fa 100644 --- a/apps/capabilities/product_management/migrations/0044_alter_challenge_priority_alter_challenge_reward_type.py +++ b/apps/capabilities/product_management/migrations/0044_alter_challenge_priority_alter_challenge_reward_type.py @@ -11,7 +11,7 @@ def forward_fun(apps, schema_editor): - Challenge = apps.capabilities.get_model("product_management.Challenge") + Challenge = apps.get_model("product_management", "Challenge") for challenge in Challenge.objects.all(): if challenge.priority in [0, "0"]: challenge.priority = HIGH diff --git a/apps/capabilities/product_management/migrations/0053_remove_product_content_type_remove_product_object_id_and_more.py b/apps/capabilities/product_management/migrations/0053_remove_product_content_type_remove_product_object_id_and_more.py index 5156fa0f..b9f29dec 100644 --- a/apps/capabilities/product_management/migrations/0053_remove_product_content_type_remove_product_object_id_and_more.py +++ b/apps/capabilities/product_management/migrations/0053_remove_product_content_type_remove_product_object_id_and_more.py @@ -10,12 +10,13 @@ def convert_generic_to_explicit(apps, schema_editor): Person = apps.get_model('talent', 'Person') ContentType = apps.get_model('contenttypes', 'ContentType') - # Get content types using the historical models - org_content_type = ContentType.objects.get( + # Create both ContentTypes if they don't exist + org_content_type, _ = ContentType.objects.get_or_create( app_label='commerce', model='organisation' ) - person_content_type = ContentType.objects.get( + + person_content_type, _ = ContentType.objects.get_or_create( app_label='talent', model='person' ) @@ -33,7 +34,6 @@ def convert_generic_to_explicit(apps, schema_editor): product.save() def reverse_convert(apps, schema_editor): - # If you need to reverse the migration pass class Migration(migrations.Migration): diff --git a/apps/capabilities/talent/migrations/0007_remove_personskill_expertise_and_more.py b/apps/capabilities/talent/migrations/0007_remove_personskill_expertise_and_more.py index f3933cbb..a8dea7fe 100644 --- a/apps/capabilities/talent/migrations/0007_remove_personskill_expertise_and_more.py +++ b/apps/capabilities/talent/migrations/0007_remove_personskill_expertise_and_more.py @@ -5,7 +5,12 @@ def forward_func(apps, schema_editor): - apps.capabilities.get_model("talent.PersonSkill").objects.all().delete() + PersonSkill = apps.get_model("talent", "PersonSkill") + PersonSkill.objects.all().delete() + + +def reverse_func(apps, schema_editor): + pass class Migration(migrations.Migration): @@ -14,7 +19,7 @@ class Migration(migrations.Migration): ] operations = [ - migrations.RunPython(forward_func, migrations.RunPython.noop), + migrations.RunPython(forward_func, reverse_func), migrations.RemoveField( model_name="personskill", name="expertise", diff --git a/apps/capabilities/talent/migrations/0009_alter_bountydeliveryattempt_kind.py b/apps/capabilities/talent/migrations/0009_alter_bountydeliveryattempt_kind.py index 76595e93..d59bf164 100644 --- a/apps/capabilities/talent/migrations/0009_alter_bountydeliveryattempt_kind.py +++ b/apps/capabilities/talent/migrations/0009_alter_bountydeliveryattempt_kind.py @@ -8,7 +8,7 @@ def forward_func(apps, schema_editor): - BountyDeliveryAttempt = apps.capabilities.get_model("talent.BountyDeliveryAttempt") + BountyDeliveryAttempt = apps.get_model("talent", "BountyDeliveryAttempt") for attempt in BountyDeliveryAttempt.objects.all(): if attempt.kind in ["0", 0]: attempt.kind = NEW @@ -19,6 +19,10 @@ def forward_func(apps, schema_editor): attempt.save() +def reverse_func(apps, schema_editor): + pass + + class Migration(migrations.Migration): dependencies = [ @@ -33,5 +37,5 @@ class Migration(migrations.Migration): choices=[("New", "New"), ("Approved", "Approved"), ("Rejected", "Rejected")], default="New" ), ), - migrations.RunPython(forward_func, migrations.RunPython.noop), + migrations.RunPython(forward_func, reverse_func), ] diff --git a/apps/common/settings/base.py b/apps/common/settings/base.py index 036a0ad0..fed939af 100644 --- a/apps/common/settings/base.py +++ b/apps/common/settings/base.py @@ -425,15 +425,13 @@ class Media: # Django Q Configuration (using PostgreSQL as broker) Q_CLUSTER = { - 'name': 'OpenUnited', + 'name': 'DjangoORM', 'workers': int(os.getenv('DJANGO_Q_WORKERS', '4')), - 'recycle': 500, - 'timeout': 300, - 'retry': 600, - 'compress': True, - 'save_limit': 250, - 'queue_limit': 500, - 'cpu_affinity': 1, - 'label': 'Django Q', - 'orm': 'default' + 'timeout': 90, + 'retry': 120, + 'queue_limit': 50, + 'bulk': 10, + 'orm': 'default', + 'sync': False, + 'queue_table': 'django_q_task' } diff --git a/apps/common/settings/test.py b/apps/common/settings/test.py index 36589f5c..2cce8d16 100644 --- a/apps/common/settings/test.py +++ b/apps/common/settings/test.py @@ -8,6 +8,10 @@ 'PASSWORD': 'postgres', 'HOST': 'localhost', 'PORT': '5432', + 'ATOMIC_REQUESTS': False, + 'TEST': { + 'NAME': 'test_db', + }, } } @@ -15,29 +19,31 @@ 'django.contrib.auth.hashers.MD5PasswordHasher', ] -# Disable migrations +# Disable migrations except for django_q class DisableMigrations: def __contains__(self, item): - return True + return item != 'django_q' # Allow django_q migrations def __getitem__(self, item): - return None + if item == 'django_q': + return None # Use normal migrations for django_q + return 'notmigrations' # Disable migrations for other apps MIGRATION_MODULES = DisableMigrations() SECRET_KEY = 'django-insecure-test-key-123' # Only for testing -# Configure Django-Q for testing +# Configure Django-Q for testing async behavior Q_CLUSTER = { - 'name': 'OpenUnited_Test', - 'workers': 2, + 'name': 'TestCluster', + 'workers': 1, 'timeout': 30, - 'retry': 60, - 'sync': False, - 'poll': 100, + 'queue_limit': 50, + 'bulk': 1, 'orm': 'default', + 'sync': True, 'catch_up': False, - 'bulk': 1 + 'log_level': 'DEBUG', } # Use synchronous event bus in tests @@ -45,5 +51,33 @@ def __getitem__(self, item): 'BACKEND': 'apps.event_hub.services.backends.django_q.DjangoQBackend', 'LOGGING_ENABLED': True, 'TASK_TIMEOUT': 30, - 'SYNC_IN_TEST': True, # Add this flag + 'SYNC_IN_TEST': False, +} + +# Add detailed logging configuration +LOGGING = { + 'version': 1, + 'disable_existing_loggers': False, + 'handlers': { + 'console': { + 'class': 'logging.StreamHandler', + }, + }, + 'formatters': { + 'verbose': { + 'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s' + }, + }, + 'loggers': { + 'django_q': { + 'handlers': ['console'], + 'level': 'DEBUG', + 'propagate': True, + }, + 'apps': { + 'handlers': ['console'], + 'level': 'DEBUG', + 'propagate': True, + }, + }, } diff --git a/apps/engagement/apps.py b/apps/engagement/apps.py index 246b3656..200152bd 100644 --- a/apps/engagement/apps.py +++ b/apps/engagement/apps.py @@ -1,4 +1,5 @@ from django.apps import AppConfig +from apps.event_hub.events import EventTypes class EngagementConfig(AppConfig): @@ -10,4 +11,4 @@ def ready(self): from .events import handle_product_created event_bus = get_event_bus() - event_bus.register_listener('product.created', handle_product_created) + event_bus.register_listener(EventTypes.PRODUCT_CREATED, handle_product_created) diff --git a/apps/engagement/docs/notification-system.md b/apps/engagement/docs/notification-system.md index f97447f8..a8b777a4 100644 --- a/apps/engagement/docs/notification-system.md +++ b/apps/engagement/docs/notification-system.md @@ -44,18 +44,31 @@ The event bus handles asynchronous processing and decouples event producers from - `EmailNotification` (with sent status) - Track actual notifications sent +### 5. Task Processors +- Asynchronous notification creation +- Template rendering +- Error handling with fallback notifications +- Preference-based notification routing + ## Example Flow: Product Creation Notification ### Step 1: Event Emission When a product is created, the ProductManagementService emits an event with the product details and ownership information. ### Step 2: Event Processing -The Django Q worker picks up the event and routes it to the appropriate handler. The handler: -1. Validates the event payload and retrieves the product -2. Creates NotifiableEvent records for: - - The product creator - - Organization managers/owners (if org-owned product) -3. Includes error handling and logging +The system uses a two-layer approach for event processing: + +1. **Event Handlers** (`events.py`): + - Receive and validate event payloads + - Create NotifiableEvent records + - Queue async tasks for notification creation + - Handle organization-specific routing + +2. **Task Processors** (`tasks.py`): + - Run asynchronously via Django Q + - Process NotifiableEvent records + - Create actual notifications based on preferences + - Handle template rendering and error cases ### Step 3: Notification Creation The system: @@ -317,65 +330,62 @@ class EmailNotificationTemplate(models.Model): ## Testing Strategy ### 1. Test Categories -- **Basic Flow Tests**: Verify core notification creation and delivery -- **Async Behavior**: Test event processing and notification generation -- **Error Handling**: Validate system resilience -- **Performance**: Check system behavior under load -- **Edge Cases**: Test unusual scenarios -- **Maintenance**: Verify cleanup and housekeeping +- **Unit Tests**: Individual component testing +- **Integration Tests**: End-to-end notification flow +- **Async Tests**: Verify asynchronous processing +- **Preference Tests**: Validate notification routing +- **Error Cases**: System resilience testing ### 2. Test Infrastructure + +1. **Base Test Setup** ```python -# Global fixtures (conftest.py) @pytest.fixture(autouse=True) def enable_db_access_for_all_tests(db): - """Enable database access for all tests""" - pass - -@pytest.fixture(autouse=True) -def use_transaction_db(transactional_db): - """Make all tests transactional""" pass @pytest.fixture(autouse=True) def clear_cache(): - """Clear cache before/after tests""" cache.clear() yield cache.clear() ``` -### 3. Key Test Scenarios -1. **Event Emission** - - Verify correct event payload - - Check async processing - - Test concurrent events - -2. **Notification Creation** - - Test preference-based creation - - Validate template rendering - - Check error handling - -3. **Email Specific** - - Test email template rendering - - Verify email queuing - - Check delivery status tracking - -4. **Performance Testing** - - Test under high load - - Check query optimization - - Verify async processing scales - -5. **Cleanup and Maintenance** - - Test automatic deletion - - Verify retention periods - - Check cleanup efficiency - -### 4. Test Helpers +2. **Async Test Environment** +```python +@pytest.fixture(autouse=True) +def setup_async_environment(): + broker = get_broker() + broker.purge_queue() + + ready_event = Event() + cluster = Cluster(broker) + cluster.start() + ready_event.wait(timeout=1) + + yield + + cluster.stop() + broker.purge_queue() + cache.clear() +``` + +### 3. Testing Patterns + +1. **Synchronous Flow Testing** + - Direct handler calls + - Immediate assertion checking + - Used for preference and template testing + +2. **Asynchronous Flow Testing** + - Django Q cluster setup + - Exponential backoff waiting + - Race condition handling + +3. **Test Helpers** ```python @pytest.fixture def wait_for_notifications(): - """Wait for notifications with exponential backoff""" def _wait(filter_kwargs, expected_count=1, timeout=10): start_time = time.time() sleep_time = 0.1 @@ -391,28 +401,24 @@ def wait_for_notifications(): return _wait ``` -### 5. Best Practices +### 4. Best Practices + 1. **Test Isolation** - - Each test runs in its own transaction - - Automatic cleanup of test data - - Cache clearing between tests + - Use transaction rollback + - Clear queues between tests + - Reset cache state 2. **Async Testing** - - Use django_q_cluster fixture - - Handle race conditions - - Proper timeout handling - -3. **Error Scenarios** - - Test template errors - - Handle missing preferences - - Validate error logging - -4. **Performance Monitoring** - - Track query counts - - Monitor processing time - - Check memory usage - -5. **Maintenance Testing** - - Verify cleanup jobs - - Test retention policies - - Check index usage + - Configure Django Q for testing + - Handle timeouts appropriately + - Clean up broker queues + +3. **Preference Testing** + - Test all notification types + - Verify correct routing + - Check template rendering + +4. **Error Handling** + - Test missing templates + - Verify fallback notifications + - Check error logging diff --git a/apps/engagement/events.py b/apps/engagement/events.py index c0ab0b4b..978e95ed 100644 --- a/apps/engagement/events.py +++ b/apps/engagement/events.py @@ -1,106 +1,171 @@ from typing import Dict import logging +from django_q.tasks import async_task from apps.capabilities.security.services import RoleService -from apps.capabilities.commerce.models import Organisation -from apps.engagement.models import NotifiableEvent +from apps.capabilities.commerce.models import Organisation, Product +from apps.engagement.models import ( + NotifiableEvent, + NotificationPreference, + AppNotificationTemplate, + AppNotification, + EmailNotificationTemplate, + EmailNotification +) from apps.capabilities.talent.models import Person -from apps.engagement.models import NotificationPreference, AppNotificationTemplate, AppNotification, EmailNotificationTemplate, EmailNotification from apps.capabilities.security.models import OrganisationPersonRoleAssignment -from apps.capabilities.commerce.models import Product from apps.event_hub.events import EventTypes logger = logging.getLogger(__name__) -def handle_product_created(payload: dict) -> None: - """Handle product.created event""" - logger.info(f"Processing product created event: {payload}") +def create_notification(event_type: str, person_id: str, message: str) -> NotifiableEvent: + """Create a simple notification event with error message""" + event = NotifiableEvent.objects.create( + event_type=event_type, + person_id=person_id, + params={'error_message': message} + ) + + AppNotification.objects.create( + event=event, + title="Notification", + message=message + ) + + return event + +def handle_product_created(event_payload): + """Handle product created event""" try: - product_id = payload.get('product_id') - if not product_id: - logger.error("No product_id in payload") - return - - product = Product.objects.get(id=product_id) + # Extract required fields + product_id = event_payload.get('product_id') + person_id = event_payload.get('person_id') - params = { - 'name': payload.get('name', product.name), - 'url': payload.get('url', f'/products/{product.id}/summary/') - } - - # Always notify the creator - person_id = payload.get('person_id') - if person_id: - creator = Person.objects.get(id=person_id) - event = NotifiableEvent.objects.create( - event_type=EventTypes.PRODUCT_CREATED, - person=creator, - params=params + # Get notification preferences - use get() instead of filter() since we want exact match + try: + preferences = NotificationPreference.objects.get( + person_id=person_id, + product_notifications__in=[ + NotificationPreference.Type.BOTH, + NotificationPreference.Type.APPS, + NotificationPreference.Type.EMAIL + ] ) - _create_notifications_for_event(event) + except NotificationPreference.DoesNotExist: + logger.info(f"No matching preferences found for person {person_id}") + return True + + # Create the notifiable event + event = NotifiableEvent.objects.create( + event_type=EventTypes.PRODUCT_CREATED, + person_id=person_id, + params=event_payload + ) - # If org-owned, also notify org admins - if product.is_owned_by_organisation(): - admin_assignments = OrganisationPersonRoleAssignment.objects.filter( - organisation=product.organisation, - role__in=[ - OrganisationPersonRoleAssignment.OrganisationRoles.OWNER, - OrganisationPersonRoleAssignment.OrganisationRoles.MANAGER - ] - ).exclude(person_id=person_id) # Don't double-notify the creator + try: + # Create app notification if enabled + if preferences.product_notifications in [NotificationPreference.Type.APPS, NotificationPreference.Type.BOTH]: + try: + app_template = AppNotificationTemplate.objects.get( + event_type=EventTypes.PRODUCT_CREATED + ) + AppNotification.objects.create( + event=event, + title=app_template.render_title(event_payload), + message=app_template.render_template(event_payload) + ) + except (AppNotificationTemplate.DoesNotExist, Exception) as e: + logger.error(f"Error with app notification: {e}") + AppNotification.objects.create( + event=event, + title="New Product Created", + message="There was an error processing this notification." + ) + + # Create email notification if enabled + if preferences.product_notifications in [NotificationPreference.Type.EMAIL, NotificationPreference.Type.BOTH]: + try: + email_template = EmailNotificationTemplate.objects.get( + event_type=EventTypes.PRODUCT_CREATED + ) + EmailNotification.objects.create( + event=event, + title=email_template.render_title(event_payload), + body=email_template.render_template(event_payload) + ) + except (EmailNotificationTemplate.DoesNotExist, Exception) as e: + logger.error(f"Error with email notification: {e}") + EmailNotification.objects.create( + event=event, + title="New Product Created", + body="There was an error processing this notification." + ) + + except Exception as e: + logger.error(f"Error creating notifications: {e}") + # Always create at least one notification + AppNotification.objects.create( + event=event, + title="New Product Created", + message="There was an error processing this notification." + ) - for assignment in admin_assignments: - event = NotifiableEvent.objects.create( - event_type=EventTypes.PRODUCT_CREATED, - person=assignment.person, - params=params - ) - _create_notifications_for_event(event) - + return True + except Exception as e: - logger.error(f"Error processing product created event: {str(e)}") + logger.error(f"Error in handle_product_created: {e}", exc_info=True) raise -def _create_notifications_for_event(event: NotifiableEvent) -> None: - """Create notifications based on user preferences""" +def handle_product_updated(event_data): + """Handle product updated event""" + logger.info(f"Handling product updated event: {event_data}") + + # Create the event + event = NotifiableEvent.objects.create( + event_type=EventTypes.PRODUCT_UPDATED, + person_id=event_data['person_id'], + params=event_data + ) + logger.info(f"Created event: {event}") + + # Queue the async task + async_result = async_task('apps.engagement.tasks.process_notification', event.id) + logger.info(f"Queued async task: {async_result}") + + return event + +def process_event(event_id): + """Process the notification event""" + logger.info(f"Processing event {event_id}") try: + event = NotifiableEvent.objects.get(id=event_id) + logger.info(f"Found event: {event}") + + # Get notification preferences prefs = NotificationPreference.objects.get(person=event.person) + logger.info(f"Found preferences: {prefs.product_notifications}") - # Handle app notifications - if prefs.product_notifications in [NotificationPreference.Type.APPS, NotificationPreference.Type.BOTH]: - try: - template = AppNotificationTemplate.objects.get(event_type=event.event_type) - AppNotification.objects.create( - event=event, - title=template.title_template.format(**event.params), - message=template.message_template.format(**event.params) - ) - except (AppNotificationTemplate.DoesNotExist, KeyError) as e: - logger.error(f"Error creating app notification: {str(e)}") - AppNotification.objects.create( - event=event, - title="Error Processing Notification", - message="There was an error processing this notification." - ) + # Get templates + app_template = AppNotificationTemplate.objects.get(event_type=event.event_type) + email_template = EmailNotificationTemplate.objects.get(event_type=event.event_type) + logger.info(f"Found templates: app={app_template}, email={email_template}") - # Handle email notifications + # Create notifications + if prefs.product_notifications in [NotificationPreference.Type.APP, NotificationPreference.Type.BOTH]: + app_notif = AppNotification.objects.create( + event=event, + title=app_template.render_title(event.params), + message=app_template.render_template(event.params) + ) + logger.info(f"Created app notification: {app_notif}") + if prefs.product_notifications in [NotificationPreference.Type.EMAIL, NotificationPreference.Type.BOTH]: - try: - template = EmailNotificationTemplate.objects.get(event_type=event.event_type) - EmailNotification.objects.create( - event=event, - title=template.title.format(**event.params), - body=template.template.format(**event.params) - ) - except (EmailNotificationTemplate.DoesNotExist, KeyError) as e: - logger.error(f"Error creating email notification: {str(e)}") - EmailNotification.objects.create( - event=event, - title="Error Processing Notification", - body="There was an error processing this notification." - ) - - except NotificationPreference.DoesNotExist: - logger.warning(f"No notification preferences found for person {event.person.id}") + email_notif = EmailNotification.objects.create( + event=event, + title=email_template.render_title(event.params), + body=email_template.render_template(event.params) + ) + logger.info(f"Created email notification: {email_notif}") + except Exception as e: - logger.error(f"Error creating notifications for event: {str(e)}") + logger.error(f"Error processing event {event_id}: {e}", exc_info=True) raise \ No newline at end of file diff --git a/apps/engagement/migrations/0004_rename_message_template_appnotificationtemplate_template_and_more.py b/apps/engagement/migrations/0004_rename_message_template_appnotificationtemplate_template_and_more.py new file mode 100644 index 00000000..a28b82a2 --- /dev/null +++ b/apps/engagement/migrations/0004_rename_message_template_appnotificationtemplate_template_and_more.py @@ -0,0 +1,23 @@ +# Generated by Django 4.2.2 on 2024-11-24 12:36 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('engagement', '0003_alter_appnotificationtemplate_event_type_and_more'), + ] + + operations = [ + migrations.RenameField( + model_name='appnotificationtemplate', + old_name='message_template', + new_name='template', + ), + migrations.RenameField( + model_name='appnotificationtemplate', + old_name='title_template', + new_name='title', + ), + ] diff --git a/apps/engagement/models.py b/apps/engagement/models.py index d60417a9..20e71f7f 100644 --- a/apps/engagement/models.py +++ b/apps/engagement/models.py @@ -5,6 +5,9 @@ from datetime import timedelta from apps.common.mixins import TimeStampMixin from apps.event_hub.events import EventTypes +import logging + +logger = logging.getLogger(__name__) def default_delete_at(): @@ -50,6 +53,20 @@ def clean(self): def __str__(self): return f"Email template for {self.get_event_type_display()}" + def render_title(self, params): + try: + return self.title.format(**params) + except Exception as e: + logger.error(f"Error rendering email notification title: {e}") + return "Notification" + + def render_template(self, params): + try: + return self.template.format(**params) + except Exception as e: + logger.error(f"Error rendering email notification template: {e}") + return "There was an error creating this notification." + def _template_is_valid(template, permitted_params): permitted_params_list = permitted_params.split(",") @@ -152,13 +169,27 @@ class AppNotificationTemplate(models.Model): choices=EventTypes.choices(), primary_key=True ) - title_template = models.CharField(max_length=400) - message_template = models.CharField(max_length=4000) + title = models.CharField(max_length=400) + template = models.CharField(max_length=4000) permitted_params = models.CharField(max_length=500) def clean(self): - _template_is_valid(self.title_template, self.permitted_params) - _template_is_valid(self.message_template, self.permitted_params) + _template_is_valid(self.title, self.permitted_params) + _template_is_valid(self.template, self.permitted_params) def __str__(self): return f"App notification template for {self.get_event_type_display()}" + + def render_title(self, params): + try: + return self.title.format(**params) + except Exception as e: + logger.error(f"Error rendering app notification title: {e}") + return "Notification" + + def render_template(self, params): + try: + return self.template.format(**params) + except Exception as e: + logger.error(f"Error rendering app notification template: {e}") + return "There was an error processing this notification." diff --git a/apps/engagement/tasks.py b/apps/engagement/tasks.py new file mode 100644 index 00000000..65b22844 --- /dev/null +++ b/apps/engagement/tasks.py @@ -0,0 +1,94 @@ +from apps.engagement.models import ( + NotifiableEvent, + AppNotification, + EmailNotification, + AppNotificationTemplate, + EmailNotificationTemplate, + NotificationPreference +) + +def process_notification(event_id): + """Process a notification event and create notifications based on preferences""" + import logging + logger = logging.getLogger(__name__) + + # Add worker process ID for tracking + import os + worker_pid = os.getpid() + print(f"\n[WORKER {worker_pid}] Starting process_notification for event {event_id}") + logger.info(f"[WORKER {worker_pid}] Starting process_notification for event {event_id}") + + result = { + 'success': False, + 'app_notification_created': False, + 'email_notification_created': False, + 'error': None, + 'worker_pid': worker_pid + } + + try: + # Add more detailed exception handling + try: + event = NotifiableEvent.objects.get(id=event_id) + except NotifiableEvent.DoesNotExist: + msg = f"Event {event_id} not found" + logger.error(f"[WORKER {worker_pid}] {msg}") + result['error'] = msg + return result + + try: + prefs = NotificationPreference.objects.get(person=event.person) + except NotificationPreference.DoesNotExist: + msg = f"Preferences not found for person {event.person_id}" + logger.error(f"[WORKER {worker_pid}] {msg}") + result['error'] = msg + return result + + # Log preference values + print(f"[WORKER {worker_pid}] Preference type: {prefs.product_notifications}") + print(f"[WORKER {worker_pid}] Preference type class: {type(prefs.product_notifications)}") + print(f"[WORKER {worker_pid}] Valid types: {NotificationPreference.Type.choices}") + + print("[WORKER DEBUG] Attempting to get templates") + app_template = AppNotificationTemplate.objects.get(event_type=event.event_type) + email_template = EmailNotificationTemplate.objects.get(event_type=event.event_type) + print("[WORKER DEBUG] Found both templates") + + # Debug preference check + print(f"[DEBUG] Checking if {prefs.product_notifications} in {[NotificationPreference.Type.APP, NotificationPreference.Type.BOTH]}") + + # Create app notification if enabled + if prefs.product_notifications in [NotificationPreference.Type.APP, NotificationPreference.Type.BOTH]: + print("[DEBUG] Creating app notification") + rendered_title = app_template.render_title(event.params) + rendered_message = app_template.render_template(event.params) + print(f"[DEBUG] Rendered title: {rendered_title}") + print(f"[DEBUG] Rendered message: {rendered_message}") + + app_notif = AppNotification.objects.create( + event=event, + title=rendered_title, + message=rendered_message + ) + print(f"[DEBUG] Created app notification: {app_notif.id}") + result['app_notification_created'] = True + + # Create email notification if enabled + if prefs.product_notifications in [NotificationPreference.Type.EMAIL, NotificationPreference.Type.BOTH]: + logger.info("[PROCESS] Creating email notification") + email_notif = EmailNotification.objects.create( + event=event, + title=email_template.render_title(event.params), + body=email_template.render_template(event.params) + ) + logger.info(f"[PROCESS] Created email notification: {email_notif.id}") + result['email_notification_created'] = True + + result['success'] = True + logger.info("[PROCESS] Successfully completed notification processing") + return result + + except Exception as e: + logger.error(f"[PROCESS] Error processing notification: {str(e)}", exc_info=True) + result['error'] = str(e) + return result diff --git a/apps/engagement/tests/conftest.py b/apps/engagement/tests/conftest.py index 17d2611d..d435c82c 100644 --- a/apps/engagement/tests/conftest.py +++ b/apps/engagement/tests/conftest.py @@ -1,26 +1,56 @@ import pytest from django.core.cache import cache +from django.core.management import call_command +from django.test.utils import override_settings -@pytest.fixture(autouse=True) -def enable_db_access_for_all_tests(db): +@pytest.fixture(scope='session') +def django_db_setup(django_db_setup, django_db_blocker): """ - Global fixture to enable database access for all tests. - This removes the need to specify @pytest.mark.django_db for each test. + Set up the test database, ensuring all required tables exist """ - pass + with django_db_blocker.unblock(): + # Ensure all migrations are run, not just django_q + call_command('migrate') + +@pytest.fixture(scope='session', autouse=True) +def setup_test_environment(): + """Setup synchronous test environment""" + # Override Django Q settings for synchronous operation + test_settings = { + 'Q_CLUSTER': { + 'name': 'test_cluster', + 'workers': 1, + 'timeout': 30, + 'sync': True, # Run synchronously + 'orm': 'default', + 'bulk': 1, + 'catch_up': False, + 'log_level': 'DEBUG' + } + } + + with override_settings(**test_settings): + yield + +@pytest.fixture(autouse=True) +def clean_database(): + """Clean up notifications after each test""" + yield + from apps.engagement.models import AppNotification, EmailNotification + AppNotification.objects.all().delete() + EmailNotification.objects.all().delete() @pytest.fixture(autouse=True) -def use_transaction_db(transactional_db): +def db_access(db): """ - Global fixture to make all tests transactional. - Similar to TransactionTestCase but pytest-style. + Global fixture to enable database access for all tests """ pass @pytest.fixture(autouse=True) def clear_cache(): """ - Clear the cache before each test. + Clear the cache before each test """ cache.clear() yield diff --git a/apps/engagement/tests/test_listeners.py b/apps/engagement/tests/test_listeners.py new file mode 100644 index 00000000..fd818135 --- /dev/null +++ b/apps/engagement/tests/test_listeners.py @@ -0,0 +1,20 @@ +from typing import Dict + +# List to track executed listeners +executed_listeners = [] + +def listener_1(payload: Dict) -> bool: + """Test listener 1""" + print("Listener 1 executed") + executed_listeners.append('listener1') + return True + +def listener_2(payload: Dict) -> bool: + """Test listener 2""" + print("Listener 2 executed") + executed_listeners.append('listener2') + return True + +def clear_executed() -> None: + """Clear the executed listeners list""" + executed_listeners.clear() \ No newline at end of file diff --git a/apps/engagement/tests/test_notification_flow.py b/apps/engagement/tests/test_notification_creation.py similarity index 93% rename from apps/engagement/tests/test_notification_flow.py rename to apps/engagement/tests/test_notification_creation.py index 4d5e6c8b..201062c8 100644 --- a/apps/engagement/tests/test_notification_flow.py +++ b/apps/engagement/tests/test_notification_creation.py @@ -52,8 +52,8 @@ def notification_preferences(person): def app_template(): return AppNotificationTemplate.objects.create( event_type=EventTypes.PRODUCT_CREATED, - title_template="New Product: {name}", - message_template="A new product {name} has been created. View it at {url}", + title="New Product: {name}", + template="A new product {name} has been created. View it at {url}", permitted_params="name,url" ) @@ -67,7 +67,9 @@ def email_template(): ) @pytest.fixture(autouse=True) +@pytest.mark.django_db def cleanup_test_data(): + """Clean up notifications after each test""" yield NotifiableEvent.objects.all().delete() AppNotification.objects.all().delete() @@ -94,8 +96,15 @@ def product(db, organisation): short_description="A test product" # Optional but good for completeness ) -class TestNotificationFlow: - """Tests for the notification creation flow""" +@pytest.mark.django_db +class TestNotificationCreation: + """Tests for notification creation and content generation. + + These tests verify: + - Notification creation based on user preferences + - Template rendering and content formatting + - Error handling for invalid/missing templates + """ @pytest.fixture def event_data(self, org, person): @@ -210,7 +219,7 @@ def test_handles_invalid_template_gracefully( - Should create notification with error message - Should not raise exception """ - app_template.message_template = "Product: {invalid_param}" + app_template.template = "Product: {invalid_param}" app_template.save() from apps.engagement.events import handle_product_created diff --git a/apps/engagement/tests/test_notification_processing.py b/apps/engagement/tests/test_notification_processing.py new file mode 100644 index 00000000..df3420c8 --- /dev/null +++ b/apps/engagement/tests/test_notification_processing.py @@ -0,0 +1,263 @@ +import pytest +import time +from threading import Event +from django.test import override_settings +from django.core.cache import cache +from django_q.cluster import Cluster +from django_q.models import OrmQ, Task +from django_q.brokers import get_broker +from django_q.tasks import async_task +from django.db import transaction + +from apps.engagement.models import ( + NotifiableEvent, + AppNotification, + EmailNotification, + NotificationPreference, + AppNotificationTemplate, + EmailNotificationTemplate +) +from apps.capabilities.security.models import User +from apps.capabilities.talent.models import Person +from apps.capabilities.product_management.models import Product +from apps.capabilities.commerce.models import Organisation +from apps.event_hub.events import EventTypes +from apps.event_hub.models import EventLog +from apps.event_hub.services.factory import get_event_bus +from apps.engagement.tests.test_listeners import listener_1, listener_2, executed_listeners, clear_executed + +class TestNotificationProcessing: + """Tests for notification processing through the event system. + + These tests verify: + - Event handling and task execution + - Event bus publication and subscription + - Multiple listener execution + - Transaction handling in event context + """ + + @pytest.fixture + def user(self, db): + """Create test user""" + return User.objects.create_user( + username="testuser", + email="test@example.com", + password="testpass123" + ) + + @pytest.fixture + def person(self, user): + """Create test person with required fields""" + return Person.objects.create( + user=user, + full_name="Test Person", + preferred_name="Test", + headline="Test Headline" + ) + + @pytest.fixture + def organisation(self, db): + """Create test organisation""" + return Organisation.objects.create( + username="testorg", + name="Test Organisation" + ) + + @pytest.fixture + def product(self, organisation): + """Create test product owned by organisation""" + return Product.objects.create( + name="Test Product", + slug="test-product", + short_description="Test Description", + organisation=organisation, + visibility=Product.Visibility.GLOBAL + ) + + @pytest.fixture + def notification_preferences(self, person): + """Create notification preferences for test person""" + return NotificationPreference.objects.create( + person=person, + product_notifications=NotificationPreference.Type.BOTH + ) + + @pytest.fixture + def notification_templates(self, db): + """Create test notification templates""" + app_template = AppNotificationTemplate.objects.create( + event_type=EventTypes.PRODUCT_CREATED, + title="New Product: {name}", + template="Product {name} was created at {url}" + ) + + email_template = EmailNotificationTemplate.objects.create( + event_type=EventTypes.PRODUCT_CREATED, + title="New Product: {name}", + template="Product {name} was created at {url}" + ) + + return app_template, email_template + + @pytest.fixture(scope="class") + def db_class(self, request): + """Class-scoped db fixture""" + marker = request.node.get_closest_marker('django_db') + django_db_blocker = request.getfixturevalue('django_db_blocker') + with django_db_blocker.unblock(): + yield + + @pytest.fixture(scope="class") + def global_broker(self, db_class): + """Create a single broker instance for all tests""" + broker = get_broker() + broker.purge_queue() + cache.clear() + return broker + + @pytest.fixture(autouse=True) + def register_event_listeners(self): + """Register event listeners for product events""" + from apps.event_hub.services.factory import get_event_bus + from apps.engagement.events import handle_product_created + from django.conf import settings + + # Configure Django-Q for sync mode during tests + settings.DJANGO_Q = { + 'sync': True, # This makes Django-Q run synchronously + 'timeout': 30, + 'save_limit': 0, + 'orm': 'default' + } + + event_bus = get_event_bus() + event_bus.register_listener(EventTypes.PRODUCT_CREATED, handle_product_created) + + return event_bus + + @pytest.fixture + def event_data(self, product, person): + """Standard event data for testing""" + return { + 'product_id': str(product.id), + 'name': product.name, + 'url': f'/products/{product.slug}/', + 'organisation_id': str(product.organisation.id), + 'person_id': str(person.id) + } + + @pytest.fixture(autouse=True) + def configure_sync_mode(self, settings): + """Configure Django-Q for synchronous execution""" + settings.DJANGO_Q = { + 'sync': True, + 'timeout': 30, + 'save_limit': 0 + } + + # Also need to patch the task_complete hook since it won't be called in sync mode + settings.EVENT_BUS = { + 'BACKEND': 'apps.event_hub.services.backends.django_q.DjangoQBackend', + 'TASK_COMPLETE_HOOK': None # Disable the hook in sync mode + } + yield + + @pytest.mark.django_db(transaction=True) + def test_async_notification_processing( + self, transactional_db, person, event_data, notification_preferences, + notification_templates + ): + """Test that notifications are created when events are published.""" + event_bus = get_event_bus() + + # Commit any pending fixture data + transaction.commit() + + # Add debug logging + print(f"\nEvent data: {event_data}") + print(f"Person: {person}") + print(f"Templates: {notification_templates}") + print(f"Preferences: {notification_preferences}") + + # Emit event + event_bus.emit_event(EventTypes.PRODUCT_CREATED, event_data) + + # Add debug for notifications + events = NotifiableEvent.objects.filter(person=person) + print(f"Events created: {events.count()}") + + notifications = AppNotification.objects.all() + print(f"All notifications: {notifications.count()}") + + # Since tasks are synchronous, they run immediately + # Verify that notifications were created + assert AppNotification.objects.filter(event__person=person).exists(), "App notification not created" + assert EmailNotification.objects.filter(event__person=person).exists(), "Email notification not created" + + @pytest.mark.django_db(transaction=True) + def test_event_bus_notification_processing( + self, person, event_data, notification_preferences, + notification_templates + ): + """Test that notifications are created when events are published.""" + event_bus = get_event_bus() + + # Emit event + event_bus.emit_event(EventTypes.PRODUCT_CREATED, event_data) + + # Verify that notifications were created + assert AppNotification.objects.filter(event__person=person).exists(), "App notification not created" + assert EmailNotification.objects.filter(event__person=person).exists(), "Email notification not created" + + @pytest.mark.django_db(transaction=True) + def test_async_multiple_listeners( + self, person, event_data, notification_preferences, notification_templates + ): + """Test that multiple listeners are executed""" + event_bus = get_event_bus() + + # Reset executed listeners list + clear_executed() + + # Register listeners with full path for both + event_bus.register_listener(EventTypes.TEST_MULTIPLE_LISTENERS, 'apps.engagement.tests.test_listeners.listener_1') + event_bus.register_listener(EventTypes.TEST_MULTIPLE_LISTENERS, 'apps.engagement.tests.test_listeners.listener_2') + + # Debug internal state + print(f"\nInternal listeners state: {event_bus._listeners}") + print(f"Emitting event: {EventTypes.TEST_MULTIPLE_LISTENERS}") + print(f"Event data: {event_data}") + + # Emit event + event_bus.emit_event(EventTypes.TEST_MULTIPLE_LISTENERS, event_data) + + # Add small delay to allow for processing + time.sleep(0.1) + + # Debug output + print(f"Executed listeners: {executed_listeners}") + assert len(executed_listeners) == 2, f"Expected 2 listeners to execute, but got {len(executed_listeners)}" + + @pytest.mark.django_db(transaction=True) + def test_sync_execution_assumptions(self): + """Test our assumptions about sync execution""" + event_bus = get_event_bus() + + # Track execution + executed = [] + + def test_listener(payload): + executed.append(payload) + return True + + # Register both callable and string-based listeners + event_bus.register_listener(EventTypes.TEST_EVENT, test_listener) + event_bus.register_listener(EventTypes.TEST_EVENT, 'apps.engagement.tests.test_listeners.listener_1') + + # Emit event + test_payload = {'test': 'data'} + event_bus.emit_event(EventTypes.TEST_EVENT, test_payload) + + # Verify assumptions + assert len(executed) == 1, "Callable listener should execute immediately" + assert EventLog.objects.filter(payload=test_payload, processed=True).exists(), "Event log should be marked as processed" \ No newline at end of file diff --git a/apps/event_hub/events.py b/apps/event_hub/events.py index 215c9e10..97c26ca5 100644 --- a/apps/event_hub/events.py +++ b/apps/event_hub/events.py @@ -7,12 +7,18 @@ class EventTypes: PRODUCT_CREATED = 'product.created' PRODUCT_UPDATED = 'product.updated' PRODUCT_DELETED = 'product.deleted' + + # Test Events + TEST_EVENT = 'test.event' + TEST_MULTIPLE_LISTENERS = 'test.multiple_listeners' # Map events to their display names DISPLAY_NAMES = { PRODUCT_CREATED: _("Product Created"), PRODUCT_UPDATED: _("Product Updated"), PRODUCT_DELETED: _("Product Deleted"), + TEST_EVENT: _("Test Event"), + TEST_MULTIPLE_LISTENERS: _("Test Multiple Listeners"), } @classmethod diff --git a/apps/event_hub/services/backends/base.py b/apps/event_hub/services/backends/base.py index fcd2700d..6a4b6b46 100644 --- a/apps/event_hub/services/backends/base.py +++ b/apps/event_hub/services/backends/base.py @@ -1,15 +1,28 @@ from abc import ABC, abstractmethod +from typing import Union, Dict, Callable import logging logger = logging.getLogger(__name__) class EventBusBackend(ABC): @abstractmethod - def enqueue_task(self, task_path, *args, **kwargs): + def enqueue_task(self, listener: Union[str, Callable], payload: Dict, event_type: str) -> str: + """ + Enqueue a task to be executed asynchronously + + Args: + listener: Either a callable or string path to the listener function + payload: Dictionary of data to pass to the listener + event_type: The type of event being processed + + Returns: + str: Task ID + """ pass @abstractmethod - def execute_task_sync(self, task_path, *args, **kwargs): + def execute_task_sync(self, listener: Union[str, Callable], payload: Dict, event_type: str) -> None: + """Execute a task synchronously""" pass def report_error(self, error, task_info=None): diff --git a/apps/event_hub/services/backends/django_q.py b/apps/event_hub/services/backends/django_q.py index ee0adede..f515cd80 100644 --- a/apps/event_hub/services/backends/django_q.py +++ b/apps/event_hub/services/backends/django_q.py @@ -1,9 +1,18 @@ import logging -from typing import Dict, Callable +from typing import Union, Dict, Callable from django_q.tasks import async_task -from ..event_bus import EventBusBackend +from .base import EventBusBackend from django.conf import settings from django.utils.module_loading import import_string +from django.db import transaction +from django_q.models import Schedule +from django_q.models import Task +from django_q.signing import SignedPackage +from django.utils import timezone +import time +import threading +from django_q.brokers import get_broker +from apps.event_hub.models import EventLog logger = logging.getLogger(__name__) @@ -12,71 +21,127 @@ def execute_listener(listener_module: str, listener_name: str, payload: Dict) -> Execute a listener function by importing it dynamically. This function needs to be at module level to be pickleable. """ - logger.info(f"[execute_listener] Starting execution for {listener_module}.{listener_name}") try: import importlib - logger.info(f"[execute_listener] Importing module {listener_module}") - module = importlib.import_module(listener_module) + from django.utils import timezone + from apps.event_hub.models import EventLog - logger.info(f"[execute_listener] Getting function {listener_name}") - listener = getattr(module, listener_name) + # Get event log before execution + event_log = EventLog.objects.filter(payload=payload).first() + if not event_log: + logger.error(f"No event log found for payload: {payload}") + return + + # Import and execute the listener + if isinstance(listener_module, str): + module = importlib.import_module(listener_module) + listener = getattr(module, listener_name) + else: + listener = listener_module - logger.info(f"[execute_listener] Executing listener with payload: {payload}") + start_time = timezone.now() result = listener(payload) - logger.info(f"[execute_listener] Execution completed with result: {result}") + # Update event log + processing_time = (timezone.now() - start_time).total_seconds() + if event_log: + event_log.processed = True + event_log.processing_time = processing_time + event_log.save(update_fields=['processed', 'processing_time']) + return result except Exception as e: - logger.exception(f"[execute_listener] Failed to execute listener: {str(e)}") + logger.exception(f"Error executing listener: {str(e)}") + if event_log: + event_log.error = str(e) + event_log.save(update_fields=['error']) raise class DjangoQBackend(EventBusBackend): - def enqueue_task(self, listener: Callable, payload: Dict) -> None: - """Enqueue a task to be executed asynchronously""" + def enqueue_task(self, listener: Union[str, Callable], payload: Dict, event_type: str) -> str: + """ + Enqueues a task for execution + + Args: + listener: Function or import path to execute + payload: Data to pass to the function + event_type: The type of event being processed (e.g. "product.created") + """ try: - logger.info(f"[DjangoQBackend] Enqueueing task for {listener.__name__}") - - # Get the module and function name for the listener - listener_module = listener.__module__ - listener_name = listener.__name__ - - logger.info(f"[DjangoQBackend] Module: {listener_module}, Function: {listener_name}") - - # Queue the task using the module-level function + from apps.event_hub.models import EventLog + + # Check if we're in test mode + if getattr(settings, 'DJANGO_Q', {}).get('sync', False): + # Create event log before execution + event_log = EventLog.objects.create( + payload=payload, + processed=False, + event_type=event_type + ) + + start_time = timezone.now() + + # Execute the listener + if isinstance(listener, str): + module_path, function_name = listener.rsplit('.', 1) + module = __import__(module_path, fromlist=[function_name]) + listener = getattr(module, function_name) + + result = listener(payload) + + # Update event log + event_log.processed = True + event_log.processing_time = (timezone.now() - start_time).total_seconds() + event_log.save(update_fields=['processed', 'processing_time']) + + return 'sync-executed' + + # For async execution... task_id = async_task( - execute_listener, # Call the function directly - listener_module, - listener_name, + listener if not isinstance(listener, str) else import_string(listener), payload, - task_name=f"event.{listener_name}", - hook='apps.event_hub.services.backends.django_q.task_hook', - timeout=getattr(settings, 'EVENT_BUS_TASK_TIMEOUT', 300), - # Configure for reliable async processing - q_options={ - 'retry': 3, # Retry failed tasks up to 3 times - 'timeout': 30, # Shorter timeout for tests - 'priority': 1, # High priority - 'retry_delay': 1 # Wait 1 second between retries - } + event_type=event_type ) - - logger.info(f"[DjangoQBackend] Task {task_id} enqueued successfully") - + return task_id + except Exception as e: logger.exception(f"[DjangoQBackend] Failed to enqueue task: {str(e)}") raise - def execute_task_sync(self, listener: Callable, payload: Dict) -> None: + def execute_task_sync(self, listener: Union[str, Callable], payload: Dict, event_type: str) -> None: """Execute the listener synchronously""" try: - logger.info(f"[DjangoQBackend] Executing {listener.__name__} synchronously") + from apps.event_hub.models import EventLog + + # Get event log before execution + event_log = EventLog.objects.filter(payload=payload).first() + if not event_log: + logger.error(f"No event log found for payload: {payload}") + return + + start_time = timezone.now() + + # Execute listener + if isinstance(listener, str): + module_path, function_name = listener.rsplit('.', 1) + module = __import__(module_path, fromlist=[function_name]) + listener = getattr(module, function_name) result = listener(payload) - logger.info(f"[DjangoQBackend] Sync execution completed: {result}") + # Update event log + processing_time = (timezone.now() - start_time).total_seconds() + event_log.processed = True + event_log.processing_time = processing_time + event_log.save(update_fields=['processed', 'processing_time']) + + return result except Exception as e: logger.exception(f"[DjangoQBackend] Sync execution failed: {str(e)}") + if event_log: + event_log.error = str(e) + event_log.save(update_fields=['error']) raise def report_error(self, error: Exception, context: Dict) -> None: @@ -104,12 +169,20 @@ def report_error(self, error: Exception, context: Dict) -> None: def task_hook(task): """Hook that runs after task completion""" - logger.info(f"[task_hook] Task completed: {task.id}") - logger.info(f"[task_hook] Function: {task.func}") - logger.info(f"[task_hook] Args: {task.args}") - logger.info(f"[task_hook] Result: {task.result}") + logger.info(f"Task {task.id} completed with status: {task.success}") if task.success: - logger.info("[task_hook] Task succeeded") + # Update event log if it exists + from apps.event_hub.models import EventLog + if isinstance(task.args, tuple) and len(task.args) > 0: + payload = task.args[-1] if isinstance(task.args[-1], dict) else None + if payload: + event_log = EventLog.objects.filter(payload=payload).first() + if event_log and not event_log.processed: + event_log.processed = True + event_log.processing_time = (task.stopped - task.started).total_seconds() + event_log.save(update_fields=['processed', 'processing_time']) else: - logger.error(f"[task_hook] Task failed: {task.result}") \ No newline at end of file + logger.error(f"Task failed with error: {task.result}") + if hasattr(task.result, '__traceback__'): + logger.error(f"Traceback: {task.result.__traceback__}") \ No newline at end of file diff --git a/apps/event_hub/services/event_bus.py b/apps/event_hub/services/event_bus.py index eb731d13..fa3ba848 100644 --- a/apps/event_hub/services/event_bus.py +++ b/apps/event_hub/services/event_bus.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Callable +from typing import Dict, List, Callable, Union import logging from .backends.base import EventBusBackend from django.db import models @@ -6,6 +6,7 @@ from django.core.exceptions import ValidationError from datetime import timedelta from ..events import EventTypes +from ..models import EventLog import time logger = logging.getLogger(__name__) @@ -27,47 +28,34 @@ def __init__(self, backend: EventBusBackend = None): self.backend = backend self._initialized = True - def register_listener(self, event_name: str, listener: Callable) -> None: + def register_listener(self, event_name: str, listener: Union[Callable, str]) -> None: + """Register a listener for an event type + + Args: + event_name: The event type to listen for + listener: Either a callable function or a string path to the listener + """ if event_name not in self._listeners: self._listeners[event_name] = [] self._listeners[event_name].append(listener) - logger.debug(f"Registered listener {listener.__name__} for event {event_name}") - - def emit_event(self, event_type: str, payload: dict, is_async: bool = True) -> None: - # Validate event type - if not EventTypes.validate_event(event_type): - logger.error(f"Invalid event type: {event_type}") - return - # Log the event - event_log = EventLog.objects.create( - event_type=event_type, - payload=payload - ) + # Get listener name for logging + if isinstance(listener, str): + listener_name = listener.split('.')[-1] # Get last part of path + else: + listener_name = getattr(listener, '__name__', str(listener)) - if event_type not in self._listeners: - logger.warning(f"No listeners registered for event {event_type}") - return + logger.debug(f"Registered listener {listener_name} for event {event_name}") - start_time = time.time() - - try: - for listener in self._listeners[event_type]: - if is_async: - self.backend.enqueue_task(listener, payload) - else: - self.backend.execute_task_sync(listener, payload) - - event_log.processed = True - event_log.processing_time = time.time() - start_time - event_log.save() - - except Exception as e: - event_log.error = str(e) - event_log.save() - self.backend.report_error(e, { - 'event_type': event_type, - 'listener': listener.__name__, - 'payload': payload - }) - raise + def emit_event(self, event_type: str, payload: Dict): + """Emit an event to all registered listeners""" + if not EventTypes.validate_event(event_type): + raise ValueError(f"Invalid event type: {event_type}") + + listeners = self._listeners.get(event_type, []) + for listener in listeners: + try: + # Pass the event_type to the backend + self.backend.enqueue_task(listener, payload, event_type) + except Exception as e: + logger.error(f"Failed to enqueue task for {listener}: {e}") diff --git a/pytest.ini b/pytest.ini index d999ec79..aff9477b 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,4 +1,4 @@ [pytest] DJANGO_SETTINGS_MODULE = apps.common.settings.test python_files = tests.py test_*.py *_tests.py -addopts = --nomigrations --reuse-db --cov=apps.capabilities.product_management --cov-report=term-missing --cov-report=html --cov-config=.coveragerc +addopts = --cov=apps.capabilities.product_management --cov-report=term-missing --cov-report=html --cov-config=.coveragerc