Skip to content

Commit

Permalink
feat: improve event bus and notification system
Browse files Browse the repository at this point in the history
- Refactor event bus to use atomic transactions and improve error handling
- Update notification system to use proper URL reversing
- Add transaction safety to event processing
- Improve template handling with fallbacks
- Add migration for notification templates
- Clean up event logging and processing
- Update tests for transaction safety
- Update documentation for notification system architecture
  • Loading branch information
adrianmcphee committed Nov 26, 2024
1 parent ed94970 commit ad7a522
Show file tree
Hide file tree
Showing 27 changed files with 993 additions and 450 deletions.
68 changes: 34 additions & 34 deletions apps/capabilities/product_management/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,31 +309,36 @@ def get_product_ideas_and_bugs(product: Product) -> Tuple[QuerySet[Idea], QueryS
class ProductManagementService:
@staticmethod
def create_product(form_data: dict, person: Person, organisation: Organisation = None) -> Product:
"""
Create a product and assign the creator as admin
"""Create a new product and assign initial roles.
Args:
form_data: Cleaned form data
person: Person creating the product
organisation: Optional organisation to own the product
form_data (dict): Product data from form submission
person (Person): Person creating the product
organisation (Organisation, optional): Organisation owning the product
Returns:
Created Product instance
Product: The newly created product instance
Creates a new product and:
- Assigns the creator as an admin
- Sets ownership (org or personal)
- Publishes product.created event
"""
logger.info(f"Creating product - person: {person}, org: {organisation}")

if not form_data.get('name'):
raise InvalidInputError("Product name is required")

try:
product_data = form_data.copy()
if organisation:
product_data['organisation'] = organisation

with transaction.atomic():
# Create the product
product = Product.objects.create(**product_data)
product = Product.objects.create(
name=form_data['name'],
slug=form_data['slug'],
short_description=form_data['short_description'],
photo=form_data.get('photo'),
visibility=form_data['visibility'],
organisation=organisation,
person=None if organisation else person
)
logger.info(f"Product created: {product.id} - {product.name}")

# Assign creator as admin
RoleService.assign_product_role(
person=person,
Expand All @@ -342,25 +347,20 @@ def create_product(form_data: dict, person: Person, organisation: Organisation =
)
logger.info(f"Assigned {person} as ADMIN for product {product.id}")

# Publish event
# Publish event ONCE after everything is set up
event_bus = get_event_bus()
event_bus.publish(EventTypes.PRODUCT_CREATED, {
'organisation_id': product.organisation_id if product.organisation else None,
'person_id': product.person_id if product.person else None,
event_payload = {
'productId': product.id,
'organisationId': product.organisation_id,
'personId': person.id,
'name': product.name,
'url': product.get_absolute_url(),
'product_id': product.id
})
'url': product.get_absolute_url()
}
# Use on_commit to ensure the event is only published after transaction commits
transaction.on_commit(lambda: event_bus.publish(EventTypes.PRODUCT_CREATED, event_payload))
logger.info(f"Published product.created event for product {product.id}")

return product

except ValidationError as e:
logger.error(f"Validation error creating product: {str(e)}")
raise InvalidInputError(str(e))
except Exception as e:
logger.error(f"Error creating product: {str(e)}", exc_info=True)
raise InvalidInputError(f"Failed to create product: {str(e)}")

@classmethod
@transaction.atomic
Expand All @@ -373,10 +373,10 @@ def update_product(cls, product: Product, data: Dict, person: Person) -> Product
# After successful update
event_bus = get_event_bus()
event_bus.publish(EventTypes.PRODUCT_UPDATED, {
'organisation_id': product.organisation_id if product.organisation else None,
'person_id': product.person_id if product.person else None,
'organisationId': product.organisation_id,
'personId': product.person_id,
'name': product.name,
'product_id': product.id,
'productId': product.id,
'url': product.get_absolute_url()
})

Expand Down
2 changes: 1 addition & 1 deletion apps/common/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ class Media:

# Django Q Configuration (using PostgreSQL as broker)
Q_CLUSTER = {
'name': 'DjangoORM',
'name': 'openunited',
'workers': int(os.getenv('DJANGO_Q_WORKERS', '4')),
'timeout': 90,
'retry': 120,
Expand Down
27 changes: 15 additions & 12 deletions apps/engagement/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@
class EngagementConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "apps.engagement"
has_run_ready = False

def ready(self):
from apps.event_hub.services.factory import get_event_bus
from .events import (
handle_product_created,
handle_product_updated,
# Add other handlers as needed
)
if not self.has_run_ready:
self.has_run_ready = True
from apps.event_hub.services.factory import get_event_bus
from .events import (
handle_product_created,
handle_product_updated,
# Add other handlers as needed
)

event_bus = get_event_bus()

# Register all event handlers
event_bus.register_listener(EventTypes.PRODUCT_CREATED, handle_product_created)
event_bus.register_listener(EventTypes.PRODUCT_UPDATED, handle_product_updated)
# Add other event registrations as needed
event_bus = get_event_bus()
# Register all event handlers
event_bus.register_listener(EventTypes.PRODUCT_CREATED, handle_product_created)
event_bus.register_listener(EventTypes.PRODUCT_UPDATED, handle_product_updated)
# Add other event registrations as needed
Loading

0 comments on commit ad7a522

Please sign in to comment.