diff --git a/backend/apps/ai/Makefile b/backend/apps/ai/Makefile index 2a4714f2e6..85577183f8 100644 --- a/backend/apps/ai/Makefile +++ b/backend/apps/ai/Makefile @@ -2,6 +2,10 @@ ai-create-chapter-chunks: @echo "Creating chapter chunks" @CMD="python manage.py ai_create_chapter_chunks" $(MAKE) exec-backend-command +ai-create-event-chunks: + @echo "Creating event chunks" + @CMD="python manage.py ai_create_event_chunks" $(MAKE) exec-backend-command + ai-create-slack-message-chunks: @echo "Creating Slack message chunks" @CMD="python manage.py ai_create_slack_message_chunks" $(MAKE) exec-backend-command diff --git a/backend/apps/ai/common/utils.py b/backend/apps/ai/common/utils.py new file mode 100644 index 0000000000..c0824760a1 --- /dev/null +++ b/backend/apps/ai/common/utils.py @@ -0,0 +1,64 @@ +"""AI utils.""" + +import logging +import time +from datetime import UTC, datetime, timedelta + +from apps.ai.common.constants import ( + DEFAULT_LAST_REQUEST_OFFSET_SECONDS, + MIN_REQUEST_INTERVAL_SECONDS, +) +from apps.ai.models.chunk import Chunk + +logger: logging.Logger = logging.getLogger(__name__) + + +def create_chunks_and_embeddings( + all_chunk_texts: list[str], + content_object, + openai_client, +) -> list[Chunk]: + """Create chunks and embeddings from given texts using OpenAI embeddings. + + Args: + all_chunk_texts (list[str]): List of text chunks to embed. + content_object: The object to associate the chunks with. + openai_client: Initialized OpenAI client instance. + + Returns: + list[Chunk]: List of Chunk instances (not saved). + + """ + try: + last_request_time = datetime.now(UTC) - timedelta( + seconds=DEFAULT_LAST_REQUEST_OFFSET_SECONDS + ) + time_since_last_request = datetime.now(UTC) - last_request_time + + if time_since_last_request < timedelta(seconds=MIN_REQUEST_INTERVAL_SECONDS): + time.sleep(MIN_REQUEST_INTERVAL_SECONDS - time_since_last_request.total_seconds()) + + response = openai_client.embeddings.create( + input=all_chunk_texts, + model="text-embedding-3-small", + ) + + return [ + chunk + for text, embedding in zip( + all_chunk_texts, + [d.embedding for d in response.data], + strict=True, + ) + if ( + chunk := Chunk.update_data( + text=text, + content_object=content_object, + embedding=embedding, + save=False, + ) + ) + ] + except Exception: + logger.exception("OpenAI API error") + return [] diff --git a/backend/apps/ai/management/commands/ai_create_chapter_chunks.py b/backend/apps/ai/management/commands/ai_create_chapter_chunks.py index bd80c3ea7d..8b73079e64 100644 --- a/backend/apps/ai/management/commands/ai_create_chapter_chunks.py +++ b/backend/apps/ai/management/commands/ai_create_chapter_chunks.py @@ -1,17 +1,12 @@ """A command to create chunks of OWASP chapter data for RAG.""" import os -import time -from datetime import UTC, datetime, timedelta import openai from django.core.management.base import BaseCommand -from apps.ai.common.constants import ( - DEFAULT_LAST_REQUEST_OFFSET_SECONDS, - DELIMITER, - MIN_REQUEST_INTERVAL_SECONDS, -) +from apps.ai.common.constants import DELIMITER +from apps.ai.common.utils import create_chunks_and_embeddings from apps.ai.models.chunk import Chunk from apps.owasp.models.chapter import Chapter @@ -65,7 +60,7 @@ def handle(self, *args, **options): batch_chunks = [] for chapter in batch_chapters: - batch_chunks.extend(self.create_chunks(chapter)) + batch_chunks.extend(self.handle_chunks(chapter)) if batch_chunks: Chunk.bulk_save(batch_chunks) @@ -73,7 +68,7 @@ def handle(self, *args, **options): self.stdout.write(f"Completed processing all {total_chapters} chapters") - def create_chunks(self, chapter: Chapter) -> list[Chunk]: + def handle_chunks(self, chapter: Chapter) -> list[Chunk]: """Create chunks from a chapter's data.""" prose_content, metadata_content = self.extract_chapter_content(chapter) @@ -89,41 +84,11 @@ def create_chunks(self, chapter: Chapter) -> list[Chunk]: self.stdout.write(f"No content to chunk for chapter {chapter.key}") return [] - try: - time_since_last_request = datetime.now(UTC) - getattr( - self, - "last_request_time", - datetime.now(UTC) - timedelta(seconds=DEFAULT_LAST_REQUEST_OFFSET_SECONDS), - ) - - if time_since_last_request < timedelta(seconds=MIN_REQUEST_INTERVAL_SECONDS): - time.sleep(MIN_REQUEST_INTERVAL_SECONDS - time_since_last_request.total_seconds()) - - response = self.openai_client.embeddings.create( - input=all_chunk_texts, - model="text-embedding-3-small", - ) - self.last_request_time = datetime.now(UTC) - - return [ - chunk - for text, embedding in zip( - all_chunk_texts, - [d.embedding for d in response.data], - strict=True, - ) - if ( - chunk := Chunk.update_data( - text=text, - content_object=chapter, - embedding=embedding, - save=False, - ) - ) - ] - except openai.OpenAIError as e: - self.stdout.write(self.style.ERROR(f"OpenAI API error for chapter {chapter.key}: {e}")) - return [] + return create_chunks_and_embeddings( + all_chunk_texts=all_chunk_texts, + content_object=chapter, + openai_client=self.openai_client, + ) def extract_chapter_content(self, chapter: Chapter) -> tuple[str, str]: """Extract and separate prose content from metadata for a chapter. @@ -164,9 +129,6 @@ def extract_chapter_content(self, chapter: Chapter) -> tuple[str, str]: if location_parts: metadata_parts.append(f"Location Information: {', '.join(location_parts)}") - if chapter.level: - metadata_parts.append(f"Chapter Level: {chapter.level}") - if chapter.currency: metadata_parts.append(f"Currency: {chapter.currency}") @@ -180,19 +142,7 @@ def extract_chapter_content(self, chapter: Chapter) -> tuple[str, str]: metadata_parts.append(f"Topics: {', '.join(chapter.topics)}") if chapter.leaders_raw: - leaders_info = [] - for leader in chapter.leaders_raw: - if isinstance(leader, dict): - leader_name = leader.get("name", "") - leader_email = leader.get("email", "") - if leader_name: - leader_text = f"Leader: {leader_name}" - if leader_email: - leader_text += f" ({leader_email})" - leaders_info.append(leader_text) - - if leaders_info: - metadata_parts.append(f"Chapter Leaders: {', '.join(leaders_info)}") + metadata_parts.append(f"Chapter Leaders: {', '.join(chapter.leaders_raw)}") if chapter.related_urls: valid_urls = [ diff --git a/backend/apps/ai/management/commands/ai_create_event_chunks.py b/backend/apps/ai/management/commands/ai_create_event_chunks.py new file mode 100644 index 0000000000..d0dab81a0c --- /dev/null +++ b/backend/apps/ai/management/commands/ai_create_event_chunks.py @@ -0,0 +1,133 @@ +"""A command to create chunks of OWASP event data for RAG.""" + +import os + +import openai +from django.core.management.base import BaseCommand + +from apps.ai.common.constants import DELIMITER +from apps.ai.common.utils import create_chunks_and_embeddings +from apps.ai.models.chunk import Chunk +from apps.owasp.models.event import Event + + +class Command(BaseCommand): + help = "Create chunks for OWASP event data" + + def add_arguments(self, parser): + parser.add_argument( + "--event", + type=str, + help="Process only the event with this key", + ) + parser.add_argument( + "--all", + action="store_true", + help="Process all the events", + ) + parser.add_argument( + "--batch-size", + type=int, + default=50, + help="Number of events to process in each batch", + ) + + def handle(self, *args, **options): + if not (openai_api_key := os.getenv("DJANGO_OPEN_AI_SECRET_KEY")): + self.stdout.write( + self.style.ERROR("DJANGO_OPEN_AI_SECRET_KEY environment variable not set") + ) + return + + self.openai_client = openai.OpenAI(api_key=openai_api_key) + + if event := options["event"]: + queryset = Event.objects.filter(key=event) + elif options["all"]: + queryset = Event.objects.all() + else: + queryset = Event.upcoming_events() + + if not (total_events := queryset.count()): + self.stdout.write("No events found to process") + return + + self.stdout.write(f"Found {total_events} events to process") + + batch_size = options["batch_size"] + for offset in range(0, total_events, batch_size): + batch_events = queryset[offset : offset + batch_size] + + batch_chunks = [] + for event in batch_events: + batch_chunks.extend(self.handle_chunks(event)) + + if batch_chunks: + Chunk.bulk_save(batch_chunks) + self.stdout.write(f"Saved {len(batch_chunks)} chunks") + + self.stdout.write(f"Completed processing all {total_events} events") + + def handle_chunks(self, event: Event) -> list[Chunk]: + """Create chunks from an event's data.""" + prose_content, metadata_content = self.extract_event_content(event) + + all_chunk_texts = [] + + if metadata_content.strip(): + all_chunk_texts.append(metadata_content) + + if prose_content.strip(): + all_chunk_texts.extend(Chunk.split_text(prose_content)) + + if not all_chunk_texts: + self.stdout.write(f"No content to chunk for event {event.key}") + return [] + + return create_chunks_and_embeddings( + all_chunk_texts, + content_object=event, + openai_client=self.openai_client, + ) + + def extract_event_content(self, event: Event) -> tuple[str, str]: + """Extract and separate prose content from metadata for an event. + + Returns: + tuple[str, str]: (prose_content, metadata_content) + + """ + prose_parts = [] + metadata_parts = [] + + if event.description: + prose_parts.append(f"Description: {event.description}") + + if event.summary: + prose_parts.append(f"Summary: {event.summary}") + + if event.name: + metadata_parts.append(f"Event Name: {event.name}") + + if event.category: + metadata_parts.append(f"Category: {event.get_category_display()}") + + if event.start_date: + metadata_parts.append(f"Start Date: {event.start_date}") + + if event.end_date: + metadata_parts.append(f"End Date: {event.end_date}") + + if event.suggested_location: + metadata_parts.append(f"Location: {event.suggested_location}") + + if event.latitude and event.longitude: + metadata_parts.append(f"Coordinates: {event.latitude}, {event.longitude}") + + if event.url: + metadata_parts.append(f"Event URL: {event.url}") + + return ( + DELIMITER.join(filter(None, prose_parts)), + DELIMITER.join(filter(None, metadata_parts)), + ) diff --git a/backend/apps/ai/management/commands/ai_create_slack_message_chunks.py b/backend/apps/ai/management/commands/ai_create_slack_message_chunks.py index 6eeb90dda6..30b20e0f39 100644 --- a/backend/apps/ai/management/commands/ai_create_slack_message_chunks.py +++ b/backend/apps/ai/management/commands/ai_create_slack_message_chunks.py @@ -1,16 +1,11 @@ """A command to create chunks of Slack messages.""" import os -import time -from datetime import UTC, datetime, timedelta import openai from django.core.management.base import BaseCommand -from apps.ai.common.constants import ( - DEFAULT_LAST_REQUEST_OFFSET_SECONDS, - MIN_REQUEST_INTERVAL_SECONDS, -) +from apps.ai.common.utils import create_chunks_and_embeddings from apps.ai.models.chunk import Chunk from apps.slack.models.message import Message @@ -36,13 +31,13 @@ def handle(self, *args, **options): [ chunk for message in Message.objects.all()[offset : offset + batch_size] - for chunk in self.create_chunks(message) + for chunk in self.handle_chunks(message) ] ) self.stdout.write(f"Completed processing all {total_messages} messages") - def create_chunks(self, message: Message) -> list[Chunk]: + def handle_chunks(self, message: Message) -> list[Chunk]: """Create chunks from a message.""" if message.subtype in {"channel_join", "channel_leave"}: return [] @@ -54,40 +49,8 @@ def create_chunks(self, message: Message) -> list[Chunk]: ) return [] - try: - time_since_last_request = datetime.now(UTC) - getattr( - self, - "last_request_time", - datetime.now(UTC) - timedelta(seconds=DEFAULT_LAST_REQUEST_OFFSET_SECONDS), - ) - - if time_since_last_request < timedelta(seconds=MIN_REQUEST_INTERVAL_SECONDS): - time.sleep(MIN_REQUEST_INTERVAL_SECONDS - time_since_last_request.total_seconds()) - - response = self.openai_client.embeddings.create( - input=chunk_text, - model="text-embedding-3-small", - ) - self.last_request_time = datetime.now(UTC) - - return [ - chunk - for text, embedding in zip( - chunk_text, - [d.embedding for d in response.data], # Embedding data from OpenAI response. - strict=True, - ) - if ( - chunk := Chunk.update_data( - text=text, - content_object=message, - embedding=embedding, - save=False, - ) - ) - ] - except openai.OpenAIError as e: - self.stdout.write( - self.style.ERROR(f"OpenAI API error for message {message.slack_message_id}: {e}") - ) - return [] + return create_chunks_and_embeddings( + all_chunk_texts=chunk_text, + content_object=message, + openai_client=self.openai_client, + )