Skip to content

Commit

Permalink
feat: refine the event bus implementation to use a pluggable backend-…
Browse files Browse the repository at this point in the history
…based architecture that supports both synchronous and asynchronous execution modes, while adding comprehensive error handling, event logging, and improving the test infrastructure with direct listener registration.
  • Loading branch information
adrianmcphee committed Nov 25, 2024
1 parent 5ba59e2 commit f116cbd
Show file tree
Hide file tree
Showing 12 changed files with 721 additions and 416 deletions.
3 changes: 2 additions & 1 deletion apps/capabilities/product_management/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,9 @@ def create_product(form_data: dict, person: Person, organisation: Organisation =
)
logger.info(f"Assigned {person} as ADMIN for product {product.id}")

# Publish event
event_bus = get_event_bus()
event_bus.emit_event('product.created', {
event_bus.publish('product.created', {
'organisation_id': product.organisation_id if product.organisation else None,
'person_id': product.person_id if product.person else None,
'name': product.name,
Expand Down
156 changes: 100 additions & 56 deletions apps/engagement/docs/notification-system.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
## Core Concepts

### Event-Driven Architecture
- Events are emitted when significant actions occur
- Events are processed asynchronously via Django Q
- Events are published when significant actions occur
- Events are processed via Django Q (configurable sync/async)
- Events are logged in EventLog model with retention period
- Handlers create notifications based on event type and user preferences

### Notification Preferences
Expand Down Expand Up @@ -50,25 +51,38 @@ The event bus handles asynchronous processing and decouples event producers from
- Error handling with fallback notifications
- Preference-based notification routing

### 6. EventTypes Registry
- Centralized registry of all application events
- Provides validation and display names
- Used by EventLog and templates
- Supports test events and product events

## Example Flow: Product Creation Notification

### Step 1: Event Emission
When a product is created, the ProductManagementService emits an event with the product details and ownership information.
### Step 1: Event Publication
When a product is created, the ProductManagementService publishes an event with the product details and ownership information.

### Step 2: Event Processing
The system uses a two-layer approach for event processing:
The system uses a multi-layer approach:

1. **Event Bus**:
- Validates events against EventTypes registry
- Logs events in EventLog model
- Handles sync/async execution configuration
- Manages task queuing and execution

1. **Event Handlers** (`events.py`):
- Receive and validate event payloads
2. **Event Handlers** (`events.py`):
- Handle specific event types (e.g., handle_product_created)
- Identify relevant stakeholders
- Create NotifiableEvent records
- Queue async tasks for notification creation
- Handle organization-specific routing
- Queue notification processing tasks

2. **Task Processors** (`tasks.py`):
- Run asynchronously via Django Q
3. **Task Processors** (`tasks.py`):
- Process NotifiableEvent records
- Create actual notifications based on preferences
- Handle template rendering and error cases
- Check user notification preferences
- Handle template rendering
- Create AppNotification and EmailNotification records
- Include error handling and fallback notifications

### Step 3: Notification Creation
The system:
Expand Down Expand Up @@ -144,9 +158,9 @@ def create_product(form_data: dict, person: Person, organisation: Organisation =
# Create the product
product = Product.objects.create(**product_data)

# Emit event
# Publish event
event_bus = get_event_bus()
event_bus.emit_event('product.created', {
event_bus.publish('product.created', {
'organisation_id': product.organisation_id if product.organisation else None,
'person_id': product.person_id if product.person else None,
'name': product.name,
Expand All @@ -160,50 +174,72 @@ def create_product(form_data: dict, person: Person, organisation: Organisation =
````python`
# apps/engagement/events.py
def handle_product_created(payload: dict) -> None:
"""Handle product.created event"""
logger.info(f"Processing product created event: {payload}")
AppNotification.objects.create(
event=event,
title="Notification",
message=message
)
return event
def handle_product_created(event_data):
"""Handle product created event by creating NotifiableEvents for relevant stakeholders"""
try:
product_id = payload.get('product_id')
product_id = event_data.get('product_id')
if not product_id:
logger.error("No product_id in payload")
return
logger.error("No product_id in event payload")
return False
product = Product.objects.get(id=product_id)
try:
product = Product.objects.get(id=product_id)
except Product.DoesNotExist:
logger.error(f"Product {product_id} not found")
return False
# Get all stakeholders to notify
people_to_notify = set()
params = {
'name': payload.get('name', product.name),
'url': payload.get('url', f'/products/{product.id}/summary/')
}
if product.organisation:
people_to_notify.update(RoleService.get_organisation_managers(product.organisation))
people_to_notify.update(RoleService.get_product_managers(product))
elif product.person:
people_to_notify.add(product.person)
if not people_to_notify:
person_id = event_data.get('person_id')
if person_id:
try:
person = Person.objects.get(id=person_id)
people_to_notify.add(person)
except Person.DoesNotExist:
logger.error(f"Person {person_id} not found")
# Always notify the creator
person_id = payload.get('person_id')
if person_id:
creator = Person.objects.get(id=person_id)
# Create events for each person
events = []
for person in people_to_notify:
event = NotifiableEvent.objects.create(
event_type=NotifiableEvent.EventType.PRODUCT_CREATED,
person=creator,
params=params
event_type=EventTypes.PRODUCT_CREATED,
person=person,
params=event_data
)
_create_notifications_for_event(event)
# If org-owned, also notify org admins
if product.is_owned_by_organisation():
admin_assignments = OrganisationPersonRoleAssignment.objects.filter(
organisation=product.organisation,
role__in=[
OrganisationPersonRoleAssignment.OrganisationRoles.OWNER,
OrganisationPersonRoleAssignment.OrganisationRoles.MANAGER
]
).exclude(person_id=person_id) # Don't double-notify the creator
events.append(event)
for assignment in admin_assignments:
event = NotifiableEvent.objects.create(
event_type=NotifiableEvent.EventType.PRODUCT_CREATED,
person=assignment.person,
params=params
)
_create_notifications_for_event(event)
EventBus().enqueue_task(
'apps.engagement.tasks.process_notification',
{'event_id': event.id},
EventTypes.PRODUCT_CREATED
)
return len(events) > 0
except Exception as e:
logger.error(f"Error in handle_product_created: {e}", exc_info=True)
raise
def handle_product_updated(event_data):
"""Handle product updated event"""
logger.info(f"Handling product updated event: {event_data}")
event = NotifiableEvent.objects.create(
event_type=EventTypes.PRODUCT_UPDATED,
`````

### 3. Creating Notifications
Expand Down Expand Up @@ -279,7 +315,7 @@ class AppNotificationTemplate(models.Model):
1. **Product Creation**
- Product is created via ProductManagementService
- Creator is assigned as admin
- Event is emitted with product details
- Event is published with product details

2. **Event Processing**
- Event is picked up by handler
Expand Down Expand Up @@ -330,10 +366,10 @@ class EmailNotificationTemplate(models.Model):
## Testing Strategy

### 1. Test Categories
- **Unit Tests**: Individual component testing
- **Integration Tests**: End-to-end notification flow
- **Async Tests**: Verify asynchronous processing
- **Preference Tests**: Validate notification routing
- **Event Processing Tests**: Verify event handling and task execution
- **Event Bus Tests**: Test publication and subscription
- **Multiple Listener Tests**: Verify parallel execution
- **Transaction Tests**: Ensure proper transaction handling
- **Error Cases**: System resilience testing

### 2. Test Infrastructure
Expand Down Expand Up @@ -422,3 +458,11 @@ def wait_for_notifications():
- Test missing templates
- Verify fallback notifications
- Check error logging

### Event Logging
Events are logged in the EventLog model with:
- Event type validation against central registry
- JSON payload storage
- Processing status tracking
- Configurable retention period
- Automatic cleanup via delete_at field
Loading

0 comments on commit f116cbd

Please sign in to comment.