diff --git a/backend/.env.example b/backend/.env.example index 91a7d540..b5005d8a 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -8,6 +8,8 @@ SUPABASE_SERVICE_ROLE_KEY= DISCORD_BOT_TOKEN= # ENABLE_DISCORD_BOT=true +GITHUB_TOKEN= + # EMBEDDING_MODEL=BAAI/bge-small-en-v1.5 # EMBEDDING_MAX_BATCH_SIZE=32 # EMBEDDING_DEVICE=cpu diff --git a/backend/app/api/v1/auth.py b/backend/app/api/v1/auth.py index 1e29acf8..2a20e1c7 100644 --- a/backend/app/api/v1/auth.py +++ b/backend/app/api/v1/auth.py @@ -2,8 +2,10 @@ from fastapi.responses import HTMLResponse from app.db.supabase.supabase_client import get_supabase_client from app.db.supabase.users_service import find_user_by_session_and_verify, get_verification_session_info +from app.db.weaviate.user_profiling import profile_user_from_github from typing import Optional import logging +import asyncio logger = logging.getLogger(__name__) router = APIRouter() @@ -67,7 +69,15 @@ async def auth_callback(request: Request, code: Optional[str] = Query(None), ses logger.error("User verification failed - no pending verification found") return _error_response("No pending verification found or verification has expired. Please try the !verify_github command again.") - logger.info(f"Successfully verified user: {verified_user.id}") + logger.info(f"Successfully verified user: {verified_user.id}!") + + logger.info(f"Indexing user: {verified_user.id} into Weaviate...") + try: + asyncio.create_task(profile_user_from_github(str(verified_user.id), github_username)) + logger.info(f"User profiling started in background for: {verified_user.id}") + except Exception as e: + logger.error(f"Error starting user profiling: {verified_user.id}: {str(e)}") + return _success_response(github_username) except Exception as e: diff --git a/backend/app/db/weaviate/user_profiling.py b/backend/app/db/weaviate/user_profiling.py new file mode 100644 index 00000000..8d77255d --- /dev/null +++ b/backend/app/db/weaviate/user_profiling.py @@ -0,0 +1,310 @@ +import logging +import asyncio +import aiohttp +from typing import List, Optional, Dict +from datetime import datetime +from collections import Counter +from app.model.weaviate.models import WeaviateUserProfile, WeaviateRepository, WeaviatePullRequest +from app.db.weaviate.weaviate_operations import store_user_profile +from app.core.config import settings + +logger = logging.getLogger(__name__) + + +class GitHubUserProfiler: + """ + Class to handle GitHub user profiling and Weaviate storage. + Uses organization's GitHub token to fetch public user data via GitHub REST API. + """ + + def __init__(self): + if not settings.github_token: + raise ValueError("GitHub token not configured in environment variables") + + self.headers = { + "Authorization": f"token {settings.github_token}", + "Accept": "application/vnd.github.v3+json", + "User-Agent": "DevRel-AI-Bot/1.0" + } + self.base_url = "https://api.github.com" + self.session = None + + async def __aenter__(self): + """Create async HTTP session""" + timeout = aiohttp.ClientTimeout(total=60, connect=10, sock_read=30) + connector = aiohttp.TCPConnector( + limit=50, # Total connection pool size + limit_per_host=10, # Per-host connection limit + ttl_dns_cache=300, # DNS cache TTL + use_dns_cache=True, + ) + + self.session = aiohttp.ClientSession( + headers=self.headers, + timeout=timeout, + connector=connector + ) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Close async HTTP session""" + if self.session: + await self.session.close() + + async def _make_request(self, url: str, params: Dict = None) -> Optional[Dict]: + """Make a GET request to GitHub API""" + try: + async with self.session.get(url, params=params) as response: + if response.status == 200: + return await response.json() + elif response.status == 404: + logger.warning(f"GitHub API 404: {url}") + return None + elif response.status == 403: + logger.error(f"GitHub API rate limit exceeded: {url}") + return None + else: + logger.error(f"GitHub API error {response.status}: {url}") + return None + except asyncio.TimeoutError: + logger.error(f"Timeout accessing GitHub API: {url}") + return None + except Exception as e: + logger.error(f"Error making request to {url}: {str(e)}") + return None + + async def get_user_data(self, github_username: str) -> Optional[Dict]: + """Fetch user data""" + url = f"{self.base_url}/users/{github_username}" + user_data = await self._make_request(url) + + if user_data: + logger.info(f"Successfully fetched user data for {github_username}") + else: + logger.error(f"Failed to fetch user data for {github_username}") + + return user_data + + async def get_user_repositories(self, github_username: str, max_repos: int = 50) -> List[Dict]: + """Fetch user repositories""" + try: + params = { + "type": "owner", + "sort": "updated", + "direction": "desc", + "per_page": max_repos + } + + url = f"{self.base_url}/users/{github_username}/repos" + repos = await self._make_request(url, params) + + if repos and isinstance(repos, list): + logger.info(f"Successfully fetched {len(repos)} repositories for {github_username}") + return repos + else: + logger.info(f"No repositories found for {github_username}") + return [] + + except Exception as e: + logger.error(f"Error fetching repositories for {github_username}: {str(e)}") + return [] + + async def get_repository_languages(self, languages_url: str) -> List[str]: + """Fetch repository languages""" + try: + languages_data = await self._make_request(languages_url) + if languages_data and isinstance(languages_data, dict): + return list(languages_data.keys()) + return [] + except Exception as e: + logger.warning(f"Error fetching languages from {languages_url}: {str(e)}") + return [] + + async def get_user_pull_requests(self, github_username: str, max_prs: int = 100) -> List[WeaviatePullRequest]: + """Fetch pull requests""" + try: + params = { + "q": f"author:{github_username} is:pr", + "sort": "created", + "order": "desc", + "per_page": max_prs + } + + url = f"{self.base_url}/search/issues" + search_result = await self._make_request(url, params) + + if not search_result or "items" not in search_result: + logger.info(f"No pull requests found for {github_username}") + return [] + + items = search_result["items"] + pull_requests = [] + + for pr_data in items: + try: + repo_name = "unknown" + if pr_data.get("html_url"): + url_parts = pr_data["html_url"].split('/') + if len(url_parts) >= 5: + repo_name = f"{url_parts[3]}/{url_parts[4]}" + + merged_at = None + if pr_data.get("pull_request") and pr_data["pull_request"].get("merged_at"): + merged_at = pr_data["pull_request"]["merged_at"] + + pr_obj = WeaviatePullRequest( + title=pr_data["title"], + body=pr_data.get("body", "")[:500] if pr_data.get("body") else "", + state=pr_data["state"], + repository=repo_name, + created_at=pr_data.get("created_at"), + closed_at=pr_data.get("closed_at"), + merged_at=merged_at, + labels=[label["name"] for label in pr_data.get("labels", [])], + url=pr_data["html_url"] + ) + pull_requests.append(pr_obj) + + except Exception as e: + logger.warning(f"Error processing pull request: {str(e)}") + continue + + logger.info(f"Successfully fetched {len(pull_requests)} pull requests for {github_username}") + return pull_requests + + except Exception as e: + logger.error(f"Error fetching pull requests for {github_username}: {str(e)}") + return [] + + async def _process_repository(self, repo_data: Dict) -> Optional[WeaviateRepository]: + """Process a single repository""" + try: + languages = [] + if repo_data.get("languages_url"): + languages = await self.get_repository_languages(repo_data["languages_url"]) + + return WeaviateRepository( + name=repo_data["name"], + description=repo_data.get("description"), + url=repo_data["html_url"], + languages=languages, + stars=repo_data.get("stargazers_count", 0), + forks=repo_data.get("forks_count", 0) + ) + except Exception as e: + logger.warning(f"Error processing repository {repo_data.get('name', 'unknown')}: {str(e)}") + return None + + def analyze_language_frequency(self, repositories: List[WeaviateRepository]) -> List[str]: + """ + Analyze language frequency across repositories to identify top 5 languages. + """ + language_counter = Counter() + for repo in repositories: + language_counter.update(repo.languages) + + top_languages = language_counter.most_common(5) + logger.info(f"Top 5 languages by frequency: {top_languages}") + return [lang for lang, _ in top_languages] + + async def build_user_profile(self, user_id: str, github_username: str) -> Optional[WeaviateUserProfile]: + """ + Build a complete user profile for Weaviate indexing + """ + logger.info(f"Building user profile for GitHub user: {github_username}") + + # Run user data, repositories, and pull requests fetch concurrently + user_task = self.get_user_data(github_username) + repos_task = self.get_user_repositories(github_username) + prs_task = self.get_user_pull_requests(github_username) + + try: + user_data, repos_data, pull_requests = await asyncio.gather( + user_task, repos_task, prs_task, return_exceptions=True + ) + except Exception as e: + logger.error(f"Error in concurrent data fetching: {str(e)}") + return None + + if isinstance(user_data, Exception) or not user_data: + logger.error(f"Could not fetch user data for {github_username}") + return None + + if isinstance(repos_data, Exception): + logger.warning(f"Error fetching repositories: {repos_data}") + repos_data = [] + + if isinstance(pull_requests, Exception): + logger.warning(f"Error fetching pull requests: {pull_requests}") + pull_requests = [] + + logger.info(f"Found {len(repos_data)} repositories and {len(pull_requests)} pull requests for {github_username}") + + repository_tasks = [self._process_repository(repo) for repo in repos_data] + + repositories = [] + if repository_tasks: + try: + repo_results = await asyncio.gather(*repository_tasks, return_exceptions=True) + repositories = [r for r in repo_results if r is not None and not isinstance(r, Exception)] + except Exception as e: + logger.warning(f"Error processing repositories: {str(e)}") + + all_languages = set() + all_topics = set() + total_stars = 0 + total_forks = 0 + + for repo_obj in repositories: + all_languages.update(repo_obj.languages) + total_stars += repo_obj.stars + total_forks += repo_obj.forks + + for repo_data in repos_data: + topics = repo_data.get("topics", []) + if topics: + all_topics.update(topics) + + top_languages = self.analyze_language_frequency(repositories) + + profile = WeaviateUserProfile( + user_id=user_id, + github_username=github_username, + display_name=user_data.get("name"), + bio=user_data.get("bio"), + location=user_data.get("location"), + repositories=repositories, + pull_requests=pull_requests, + languages=top_languages, + topics=list(all_topics), + followers_count=user_data.get("followers", 0), + following_count=user_data.get("following", 0), + total_stars_received=total_stars, + total_forks=total_forks, + profile_text_for_embedding="", # TODO: Invoke agent/llm to generate this + last_updated=datetime.now() + ) + + logger.info( + f"Successfully built profile for {github_username}: " + f"{len(repositories)} repos, {len(top_languages)} top languages, " + f"{len(pull_requests)} pull requests analyzed" + ) + return profile + + +async def profile_user_from_github(user_id: str, github_username: str) -> bool: + """Profile a user and store in Weaviate with proper resource management.""" + + async with GitHubUserProfiler() as profiler: + try: + profile = await profiler.build_user_profile(user_id, github_username) + if profile: + success = await store_user_profile(profile) + if success: + logger.info(f"Successfully stored profile for user {github_username}") + return success + return False + except Exception as e: + logger.error(f"Failed to profile user {github_username}: {str(e)}") + return False diff --git a/backend/app/db/weaviate/weaviate_client.py b/backend/app/db/weaviate/weaviate_client.py index 98bf241e..5ed11ede 100644 --- a/backend/app/db/weaviate/weaviate_client.py +++ b/backend/app/db/weaviate/weaviate_client.py @@ -1,8 +1,32 @@ import weaviate +from contextlib import asynccontextmanager +from typing import AsyncGenerator +import logging -# Connect to local Weaviate instance -client = weaviate.connect_to_local() +logger = logging.getLogger(__name__) + +_client = None def get_client(): - return client + """Get or create the global Weaviate client instance.""" + global _client + if _client is None: + _client = weaviate.use_async_with_local() + return _client + +@asynccontextmanager +async def get_weaviate_client() -> AsyncGenerator[weaviate.WeaviateClient, None]: + """Async context manager for Weaviate client.""" + client = get_client() + try: + await client.connect() + yield client + except Exception as e: + logger.error(f"Weaviate client error: {str(e)}") + raise + finally: + try: + await client.close() + except Exception as e: + logger.warning(f"Error closing Weaviate client: {str(e)}") diff --git a/backend/app/db/weaviate/weaviate_operations.py b/backend/app/db/weaviate/weaviate_operations.py new file mode 100644 index 00000000..a86adcf7 --- /dev/null +++ b/backend/app/db/weaviate/weaviate_operations.py @@ -0,0 +1,136 @@ +import logging +import json +from typing import Optional, Dict, Any +from datetime import datetime, timezone +from app.model.weaviate.models import WeaviateUserProfile +from app.db.weaviate.weaviate_client import get_weaviate_client +import weaviate.exceptions as weaviate_exceptions +import weaviate.classes as wvc + +logger = logging.getLogger(__name__) + +class WeaviateUserOperations: + """ + Class to handle Weaviate operations for user profiles. + """ + + def __init__(self, collection_name: str = "weaviate_user_profile"): + self.collection_name = collection_name + + async def find_user_by_id(self, user_id: str) -> Optional[str]: + """ + Find a user profile by user_id and return the UUID if found. + """ + try: + async with get_weaviate_client() as client: + collection = client.collections.get(self.collection_name) + + response = await collection.query.fetch_objects( + where=wvc.query.Filter.by_property("user_id").equal(user_id), + limit=1 + ) + + if response.objects: + found_uuid = str(response.objects[0].uuid) + logger.info(f"Found existing user profile with UUID: {found_uuid}") + return found_uuid + return None + + except weaviate_exceptions.WeaviateBaseError as e: + logger.error(f"Weaviate error finding user by ID: {str(e)}") + return None + except Exception as e: + logger.error(f"Unexpected error finding user by ID: {str(e)}") + return None + + async def create_user_profile(self, profile: WeaviateUserProfile) -> bool: + """ + Create a new user profile in Weaviate. + """ + try: + profile_dict = self._prepare_profile_data(profile) + + async with get_weaviate_client() as client: + collection = client.collections.get(self.collection_name) + + result = await collection.data.insert( + properties=profile_dict + ) + + logger.info(f"Created user profile for {profile.github_username} with UUID: {result}") + return True + + except weaviate_exceptions.WeaviateBaseError as e: + logger.error(f"Weaviate error creating user profile: {str(e)}") + return False + except Exception as e: + logger.error(f"Unexpected error creating user profile: {str(e)}") + return False + + async def update_user_profile(self, uuid: str, profile: WeaviateUserProfile) -> bool: + """ + Update an existing user profile in Weaviate. + """ + try: + profile_dict = self._prepare_profile_data(profile) + + async with get_weaviate_client() as client: + collection = client.collections.get(self.collection_name) + await collection.data.update( + uuid=uuid, + properties=profile_dict + ) + + logger.info(f"Updated user profile for {profile.github_username} with UUID: {uuid}") + return True + + except weaviate_exceptions.WeaviateBaseError as e: + logger.error(f"Weaviate error updating user profile: {str(e)}") + return False + except Exception as e: + logger.error(f"Unexpected error updating user profile: {str(e)}") + return False + + async def upsert_user_profile(self, profile: WeaviateUserProfile) -> bool: + """ + Create or update a user profile (upsert operation). + """ + try: + existing_uuid = await self.find_user_by_id(profile.user_id) + + if existing_uuid: + logger.info(f"Updating existing profile for user_id: {profile.user_id}") + return await self.update_user_profile(existing_uuid, profile) + else: + logger.info(f"Creating new profile for user_id: {profile.user_id}") + return await self.create_user_profile(profile) + + except Exception as e: + logger.error(f"Error in upsert operation: {str(e)}") + return False + + def _prepare_profile_data(self, profile: WeaviateUserProfile) -> Dict[str, Any]: + """ + Prepare profile data for Weaviate storage. + """ + profile_dict = profile.model_dump() + + profile_dict["repositories"] = json.dumps([repo.model_dump() for repo in profile.repositories]) + profile_dict["pull_requests"] = json.dumps([pr.model_dump() for pr in profile.pull_requests]) + + if isinstance(profile.last_updated, datetime): + if profile.last_updated.tzinfo is None: + profile.last_updated = profile.last_updated.replace(tzinfo=timezone.utc) + profile_dict["last_updated"] = profile.last_updated.isoformat() + else: + profile_dict["last_updated"] = datetime.now(timezone.utc).isoformat() + + return profile_dict + + +async def store_user_profile(profile: WeaviateUserProfile) -> bool: + """ + Convenience function to store or update a user profile. + """ + operations = WeaviateUserOperations() + return await operations.upsert_user_profile(profile) diff --git a/backend/app/model/weaviate/models.py b/backend/app/model/weaviate/models.py index b42eb892..612806bf 100644 --- a/backend/app/model/weaviate/models.py +++ b/backend/app/model/weaviate/models.py @@ -14,6 +14,20 @@ class WeaviateRepository(BaseModel): stars: int = Field(0, description="The number of stars the repository has.") forks: int = Field(0, description="The number of forks the repository has.") +class WeaviatePullRequest(BaseModel): + """ + Represents a single pull request created by the user. + Provides insights into contribution patterns and collaboration style. + """ + title: str = Field(..., description="The title of the pull request.") + body: Optional[str] = Field(None, description="The body/description of the pull request (truncated to 500 chars).") + state: str = Field(..., description="The state of the PR: 'open', 'closed', etc.") + repository: str = Field(..., description="The full name of the repository (e.g., 'owner/repo').") + created_at: Optional[str] = Field(None, description="ISO timestamp when the PR was created.") + closed_at: Optional[str] = Field(None, description="ISO timestamp when the PR was closed (if applicable).") + merged_at: Optional[str] = Field(None, description="ISO timestamp when the PR was merged (if applicable).") + labels: List[str] = Field(default_factory=list, description="Labels associated with the pull request.") + url: str = Field(..., description="The URL of the pull request.") class WeaviateUserProfile(BaseModel): """ @@ -29,8 +43,11 @@ class WeaviateUserProfile(BaseModel): repositories: List[WeaviateRepository] = Field( default_factory=list, description="List of repositories the user's repositories.") + pull_requests: List[WeaviatePullRequest] = Field( + default_factory=list, description="List of pull requests the user has created.") + languages: List[str] = Field(default_factory=list, - description="A unique, aggregated list of all programming languages from the user's repositories.") + description="A unique, aggregated list of top 5 languages the user is most comfortable with based on usage frequency.") topics: List[str] = Field(default_factory=list, description="A unique, aggregated list of all topics from the user's repositories.") @@ -76,6 +93,30 @@ class Config: "forks": 150 } ], + "pull_requests": [ + { + "title": "Add async support for database connections", + "body": "This PR adds comprehensive async support for database connections, improving performance by 40%...", + "state": "closed", + "repository": "microsoft/vscode", + "created_at": "2024-01-15T10:30:00Z", + "closed_at": "2024-01-20T14:20:00Z", + "merged_at": "2024-01-20T14:20:00Z", + "labels": ["enhancement", "database", "performance"], + "url": "https://github.com/microsoft/vscode/pull/12345", + }, + { + "title": "Fix memory leak in WebAssembly module", + "body": "Fixes a critical memory leak that was causing crashes in production environments...", + "state": "open", + "repository": "facebook/react", + "created_at": "2024-02-01T09:15:00Z", + "closed_at": None, + "merged_at": None, + "labels": ["bug", "wasm", "critical"], + "url": "https://github.com/facebook/react/pull/67890", + } + ], "languages": ["Rust", "JavaScript", "TypeScript", "TOML"], "topics": ["rust", "webdev", "performance", "framework", "data-visualization", "d3", "charts"], "followers_count": 1800, diff --git a/backend/app/scripts/weaviate/create_schemas.py b/backend/app/scripts/weaviate/create_schemas.py index 8cc10fa2..6d6fedcd 100644 --- a/backend/app/scripts/weaviate/create_schemas.py +++ b/backend/app/scripts/weaviate/create_schemas.py @@ -1,14 +1,15 @@ +import asyncio from app.db.weaviate.weaviate_client import get_client import weaviate.classes.config as wc -def create_schema(client, name, properties): - client.collections.create( +async def create_schema(client, name, properties): + await client.collections.create( name=name, properties=properties, ) print(f"Created: {name}") -def create_user_profile_schema(client): +async def create_user_profile_schema(client): """ Create schema for WeaviateUserProfile model. Main vectorization will be on profile_text_for_embedding field. @@ -20,6 +21,7 @@ def create_user_profile_schema(client): wc.Property(name="bio", data_type=wc.DataType.TEXT), wc.Property(name="location", data_type=wc.DataType.TEXT), wc.Property(name="repositories", data_type=wc.DataType.TEXT), # JSON string + wc.Property(name="pull_requests", data_type=wc.DataType.TEXT), # JSON string wc.Property(name="languages", data_type=wc.DataType.TEXT_ARRAY), wc.Property(name="topics", data_type=wc.DataType.TEXT_ARRAY), wc.Property(name="followers_count", data_type=wc.DataType.INT), @@ -29,13 +31,27 @@ def create_user_profile_schema(client): wc.Property(name="profile_text_for_embedding", data_type=wc.DataType.TEXT), wc.Property(name="last_updated", data_type=wc.DataType.DATE), ] - create_schema(client, "weaviate_user_profile", properties) + await create_schema(client, "weaviate_user_profile", properties) -def create_all_schemas(): +async def create_all_schemas(): """ - Create only the user profile schema as per the updated model structure. + Create only the user profile schema as per the model structure. """ client = get_client() - create_user_profile_schema(client) - client.close() - print("✅ User profile schema created successfully.") + try: + await client.connect() + await create_user_profile_schema(client) + print("✅ User profile schema created successfully.") + except Exception as e: + print(f"❌ Error creating schema: {str(e)}") + raise + finally: + await client.close() + +def main(): + """Entry point for running the schema creation.""" + asyncio.run(create_all_schemas()) + + +if __name__ == "__main__": + main() diff --git a/backend/app/scripts/weaviate/populate_db.py b/backend/app/scripts/weaviate/populate_db.py index 72737dc0..0137570c 100644 --- a/backend/app/scripts/weaviate/populate_db.py +++ b/backend/app/scripts/weaviate/populate_db.py @@ -1,8 +1,9 @@ import json +import asyncio from datetime import datetime -from app.db.weaviate.weaviate_client import get_client +from app.db.weaviate.weaviate_client import get_weaviate_client -def populate_weaviate_user_profile(client): +async def populate_weaviate_user_profile(client): """ Populate WeaviateUserProfile collection with sample data matching the model structure. """ @@ -13,7 +14,8 @@ def populate_weaviate_user_profile(client): "user_id": "a1b2c3d4-e5f6-7890-1234-567890abcdef", "github_username": "jane-dev", "display_name": "Jane Developer", - "bio": "Creator of innovative open-source tools. Full-stack developer with a passion for Rust and WebAssembly.", + "bio": ("Creator of innovative open-source tools. Full-stack developer " + "with a passion for Rust and WebAssembly."), "location": "Berlin, Germany", "repositories": json.dumps([ { @@ -33,14 +35,36 @@ def populate_weaviate_user_profile(client): "forks": 150 } ]), + "pull_requests": json.dumps([ + { + "title": "Add async support for database connections", + "body": ("This PR adds comprehensive async support for database " + "connections, improving performance by 40%..."), + "state": "closed", + "repository": "microsoft/vscode", + "created_at": "2024-01-15T10:30:00Z", + "closed_at": "2024-01-20T14:20:00Z", + "merged_at": "2024-01-20T14:20:00Z", + "labels": ["enhancement", "database", "performance"], + "url": "https://github.com/microsoft/vscode/pull/12345" + } + ]), "languages": ["Rust", "JavaScript", "TypeScript", "TOML"], - "topics": ["rust", "webdev", "performance", "framework", "data-visualization", "d3", "charts"], + "topics": ["rust", "webdev", "performance", "framework", + "data-visualization", "d3", "charts"], "followers_count": 1800, "following_count": 250, "total_stars_received": 3700, "total_forks": 550, - "profile_text_for_embedding": "Jane Developer, Creator of innovative open-source tools. Full-stack developer with a passion for Rust and WebAssembly. Repositories: rust-web-framework, A high-performance web framework for Rust. data-viz-lib, A declarative data visualization library for JavaScript. Languages: Rust, JavaScript, TypeScript. Topics: rust, webdev, performance, data-visualization.", - "last_updated": current_time + "profile_text_for_embedding": ( + "Jane Developer, Creator of innovative open-source tools. " + "Full-stack developer with a passion for Rust and WebAssembly. " + "Repositories: rust-web-framework, A high-performance web framework for Rust. " + "data-viz-lib, A declarative data visualization library for JavaScript. " + "Languages: Rust, JavaScript, TypeScript. " + "Topics: rust, webdev, performance, data-visualization." + ), + "last_updated": current_time.isoformat() }, { "user_id": "b2c3d4e5-f6g7-8901-2345-678901bcdefg", @@ -66,14 +90,35 @@ def populate_weaviate_user_profile(client): "forks": 320 } ]), + "pull_requests": json.dumps([ + { + "title": "Implement advanced ML algorithms", + "body": ("Adding support for advanced machine learning algorithms " + "including neural networks..."), + "state": "open", + "repository": "tensorflow/tensorflow", + "created_at": "2024-02-01T09:15:00Z", + "closed_at": None, + "merged_at": None, + "labels": ["enhancement", "ml", "algorithms"], + "url": "https://github.com/tensorflow/tensorflow/pull/67890" + } + ]), "languages": ["Python", "SQL", "Jupyter Notebook"], "topics": ["machine-learning", "ai", "data-science", "python", "big-data"], "followers_count": 2400, "following_count": 180, "total_stars_received": 5000, "total_forks": 900, - "profile_text_for_embedding": "Alex Chen, Python enthusiast and machine learning researcher. Building the future of AI. Repositories: ml-toolkit, A comprehensive machine learning toolkit for Python. data-pipeline, Scalable data processing pipeline for big data applications. Languages: Python, SQL. Topics: machine-learning, ai, data-science, python.", - "last_updated": current_time + "profile_text_for_embedding": ( + "Alex Chen, Python enthusiast and machine learning researcher. " + "Building the future of AI. " + "Repositories: ml-toolkit, A comprehensive machine learning toolkit for Python. " + "data-pipeline, Scalable data processing pipeline for big data applications. " + "Languages: Python, SQL. " + "Topics: machine-learning, ai, data-science, python." + ), + "last_updated": current_time.isoformat() }, { "user_id": "c3d4e5f6-g7h8-9012-3456-789012cdefgh", @@ -91,14 +136,33 @@ def populate_weaviate_user_profile(client): "forks": 280 } ]), + "pull_requests": json.dumps([ + { + "title": "Add support for custom resources", + "body": ("Implementing support for custom Kubernetes resources " + "in the operator..."), + "state": "merged", + "repository": "kubernetes/kubernetes", + "created_at": "2024-01-10T14:30:00Z", + "closed_at": "2024-01-15T16:45:00Z", + "merged_at": "2024-01-15T16:45:00Z", + "labels": ["enhancement", "k8s", "operator"], + "url": "https://github.com/kubernetes/kubernetes/pull/54321" + } + ]), "languages": ["Go", "Dockerfile"], "topics": ["kubernetes", "microservices", "cloud", "devops", "api"], "followers_count": 890, "following_count": 120, "total_stars_received": 1500, "total_forks": 280, - "profile_text_for_embedding": "Sam Rodriguez, Cloud infrastructure engineer specializing in Go and Kubernetes. Repositories: k8s-operator, Custom Kubernetes operator for managing microservices. Languages: Go, Dockerfile. Topics: kubernetes, microservices, cloud, devops.", - "last_updated": current_time + "profile_text_for_embedding": ( + "Sam Rodriguez, Cloud infrastructure engineer specializing in Go and Kubernetes. " + "Repositories: k8s-operator, Custom Kubernetes operator for managing microservices. " + "Languages: Go, Dockerfile. " + "Topics: kubernetes, microservices, cloud, devops." + ), + "last_updated": current_time.isoformat() }, { "user_id": "d4e5f6g7-h8i9-0123-4567-890123defghi", @@ -124,14 +188,34 @@ def populate_weaviate_user_profile(client): "forks": 180 } ]), + "pull_requests": json.dumps([ + { + "title": "Improve accessibility features", + "body": ("Adding comprehensive accessibility features to the " + "React component library..."), + "state": "open", + "repository": "facebook/react", + "created_at": "2024-02-05T11:20:00Z", + "closed_at": None, + "merged_at": None, + "labels": ["accessibility", "enhancement", "a11y"], + "url": "https://github.com/facebook/react/pull/98765" + } + ]), "languages": ["TypeScript", "JavaScript", "CSS", "HTML"], "topics": ["react", "frontend", "typescript", "css", "ui-ux", "accessibility"], "followers_count": 1320, "following_count": 200, "total_stars_received": 2950, "total_forks": 600, - "profile_text_for_embedding": "Emily Johnson, Frontend developer creating beautiful and accessible web experiences. Repositories: react-components, Reusable React component library with TypeScript. css-animations, Collection of smooth CSS animations and transitions. Languages: TypeScript, JavaScript, CSS. Topics: react, frontend, typescript, css, ui-ux.", - "last_updated": current_time + "profile_text_for_embedding": ( + "Emily Johnson, Frontend developer creating beautiful and accessible web experiences. " + "Repositories: react-components, Reusable React component library with TypeScript. " + "css-animations, Collection of smooth CSS animations and transitions. " + "Languages: TypeScript, JavaScript, CSS. " + "Topics: react, frontend, typescript, css, ui-ux." + ), + "last_updated": current_time.isoformat() }, { "user_id": "e5f6g7h8-i9j0-1234-5678-901234efghij", @@ -157,34 +241,66 @@ def populate_weaviate_user_profile(client): "forks": 180 } ]), + "pull_requests": json.dumps([ + { + "title": "Optimize memory allocation patterns", + "body": ("Implementing advanced memory allocation optimization techniques " + "for better performance..."), + "state": "merged", + "repository": "rust-lang/rust", + "created_at": "2024-01-25T08:45:00Z", + "closed_at": "2024-02-01T10:30:00Z", + "merged_at": "2024-02-01T10:30:00Z", + "labels": ["performance", "memory", "optimization"], + "url": "https://github.com/rust-lang/rust/pull/13579" + } + ]), "languages": ["Rust", "C++", "Assembly"], "topics": ["rust", "systems-programming", "performance", "memory-safety", "concurrency"], "followers_count": 980, "following_count": 85, "total_stars_received": 2950, "total_forks": 420, - "profile_text_for_embedding": "David Kim, Systems programmer passionate about performance and memory safety. Repositories: memory-allocator, Custom memory allocator written in Rust for high-performance applications. concurrent-data-structures, Lock-free data structures for concurrent programming in Rust. Languages: Rust, C++, Assembly. Topics: rust, systems-programming, performance, memory-safety.", - "last_updated": current_time + "profile_text_for_embedding": ( + "David Kim, Systems programmer passionate about performance and memory safety. " + "Repositories: memory-allocator, Custom memory allocator written in Rust for " + "high-performance applications. concurrent-data-structures, Lock-free data structures " + "for concurrent programming in Rust. Languages: Rust, C++, Assembly. " + "Topics: rust, systems-programming, performance, memory-safety." + ), + "last_updated": current_time.isoformat() } ] try: - with client.batch.dynamic() as batch: + collection = client.collections.get("weaviate_user_profile") + async with collection.batch.dynamic() as batch: for profile in user_profiles: batch.add_object( - collection="weaviate_user_profile", properties=profile ) print("✅ Populated weaviate_user_profile with sample user data.") except Exception as e: print(f"❌ Error populating weaviate_user_profile: {e}") + raise -def populate_all_collections(): +async def populate_all_collections(): """ Populate only the user profile collection as per the updated model structure. """ - client = get_client() - print("Populating Weaviate user profile collection with sample data...") - populate_weaviate_user_profile(client) - client.close() - print("✅ User profile collection populated successfully.") + try: + async with get_weaviate_client() as client: + print("Populating Weaviate user profile collection with sample data...") + await populate_weaviate_user_profile(client) + print("✅ User profile collection populated successfully.") + except Exception as e: + print(f"❌ Error during population: {e}") + raise + +def main(): + """Entry point for running the population script.""" + asyncio.run(populate_all_collections()) + + +if __name__ == "__main__": + main() diff --git a/backend/main.py b/backend/main.py index 190e99bd..77a69440 100644 --- a/backend/main.py +++ b/backend/main.py @@ -10,7 +10,7 @@ from app.core.config import settings from app.core.orchestration.agent_coordinator import AgentCoordinator from app.core.orchestration.queue_manager import AsyncQueueManager -# from app.db.weaviate.weaviate_client import get_client +from app.db.weaviate.weaviate_client import get_weaviate_client from bots.discord.discord_bot import DiscordBot from bots.discord.discord_cogs import DevRelCommands @@ -28,13 +28,7 @@ class DevRAIApplication: def __init__(self): """Initializes all services required by the application.""" - # try: - # self.weaviate_client = get_client() - # logger.info(f"Weaviate client initialized: {self.weaviate_client.is_ready()}") - # except Exception as e: - # logger.error(f"Fatal: Error initializing Weaviate client: {e}", exc_info=True) - # self.weaviate_client = None - # sys.exit(1) + self.weaviate_client = None self.queue_manager = AsyncQueueManager() self.agent_coordinator = AgentCoordinator(self.queue_manager) self.discord_bot = DiscordBot(self.queue_manager) @@ -44,6 +38,9 @@ async def start_background_tasks(self): """Starts the Discord bot and queue workers in the background.""" try: logger.info("Starting background tasks (Discord Bot & Queue Manager)...") + + await self.test_weaviate_connection() + await self.queue_manager.start(num_workers=3) asyncio.create_task( self.discord_bot.start(settings.discord_bot_token) @@ -53,6 +50,17 @@ async def start_background_tasks(self): logger.error(f"Error during background task startup: {e}", exc_info=True) await self.stop_background_tasks() + async def test_weaviate_connection(self): + """Test Weaviate connection during startup.""" + try: + async with get_weaviate_client() as client: + is_ready = await client.is_ready() + if is_ready: + logger.info("Weaviate connection successful and ready") + except Exception as e: + logger.error(f"Failed to connect to Weaviate: {e}") + raise + async def stop_background_tasks(self): """Stops all background tasks and connections gracefully.""" logger.info("Stopping background tasks and closing connections...") @@ -70,13 +78,6 @@ async def stop_background_tasks(self): except Exception as e: logger.error(f"Error stopping queue manager: {e}", exc_info=True) - try: - if hasattr(self, 'weaviate_client') and self.weaviate_client is not None: - self.weaviate_client.close() - logger.info("Weaviate client connection closed.") - except Exception as e: - logger.error(f"Error closing Weaviate client: {e}", exc_info=True) - logger.info("All background tasks and connections stopped.") @@ -102,13 +103,34 @@ async def favicon(): """Return empty favicon to prevent 404 logs""" return Response(status_code=204) +@api.get("/health") +async def health_check(): + """Health check endpoint to verify services are running""" + try: + async with get_weaviate_client() as client: + weaviate_ready = await client.is_ready() + + return { + "status": "healthy", + "services": { + "weaviate": "ready" if weaviate_ready else "not_ready", + "discord_bot": "running" if app_instance.discord_bot and not app_instance.discord_bot.is_closed() else "stopped" + } + } + except Exception as e: + logger.error(f"Health check failed: {e}") + return { + "status": "unhealthy", + "error": str(e) + } + api.include_router(auth_router, prefix="/v1/auth", tags=["Authentication"]) if __name__ == "__main__": required_vars = [ "DISCORD_BOT_TOKEN", "SUPABASE_URL", "SUPABASE_KEY", - "BACKEND_URL", "GEMINI_API_KEY", "TAVILY_API_KEY" + "BACKEND_URL", "GEMINI_API_KEY", "TAVILY_API_KEY", "GITHUB_TOKEN" ] missing_vars = [var for var in required_vars if not getattr(settings, var.lower(), None)] diff --git a/backend/requirements.txt b/backend/requirements.txt index 5d1fdbbd..ec719451 100644 Binary files a/backend/requirements.txt and b/backend/requirements.txt differ diff --git a/poetry.lock b/poetry.lock index 4e658b08..5c61fb92 100644 --- a/poetry.lock +++ b/poetry.lock @@ -4389,18 +4389,18 @@ crypto-eth-addresses = ["eth-hash[pycryptodome] (>=0.7.0)"] [[package]] name = "weaviate-client" -version = "4.15.0" +version = "4.15.4" description = "A python native Weaviate client" optional = false python-versions = ">=3.9" groups = ["main"] files = [ - {file = "weaviate_client-4.15.0-py3-none-any.whl", hash = "sha256:36bb83dacfc102cab0267aa211eb21eaca31cc80921b977451d814532c9a50e6"}, - {file = "weaviate_client-4.15.0.tar.gz", hash = "sha256:0e1c06e0bb08c8ab6987d91cf024b625f517b349965e6dee5439f675ff90cc64"}, + {file = "weaviate_client-4.15.4-py3-none-any.whl", hash = "sha256:870b73495b8e5849003690f7c36f2795b90765e62482f07ab71275dbd21b271d"}, + {file = "weaviate_client-4.15.4.tar.gz", hash = "sha256:005e7a6cee52ff4ac66f244a572b5ed98dd0e15bfda380979a88fddf692720df"}, ] [package.dependencies] -authlib = ">=1.2.1,<1.3.2" +authlib = ">=1.2.1,<2.0.0" deprecation = ">=2.1.0,<3.0.0" grpcio = ">=1.66.2,<2.0.0" grpcio-health-checking = ">=1.66.2,<2.0.0"