Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 55 additions & 52 deletions backend/python/app/connectors/services/base_arango_service.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
"""ArangoDB service for interacting with the database"""

# pylint: disable=E1101, W0718
import asyncio
import datetime
Expand All @@ -9,50 +7,41 @@
from typing import Any, Dict, List, Optional, Set, Tuple

import aiohttp # type: ignore
from arango import ArangoClient # type: ignore
from arango.database import TransactionDatabase # type: ignore
from fastapi import Request # type: ignore

from app.config.configuration_service import ConfigurationService
from app.config.constants.arangodb import (
CollectionNames,
Connectors,
DepartmentNames,
GraphNames,
LegacyGraphNames,
OriginTypes,
RecordTypes,
)
from app.config.constants.arangodb import (CollectionNames, Connectors,
DepartmentNames, GraphNames,
LegacyGraphNames, OriginTypes,
RecordTypes)
from app.config.constants.http_status_code import HttpStatusCode
from app.config.constants.service import DefaultEndpoints, config_node_constants
from app.config.constants.service import (DefaultEndpoints,
config_node_constants)
from app.connectors.services.kafka_service import KafkaService
from app.models.entities import AppUserGroup, FileRecord, Record, RecordGroup, User
from app.schema.arango.documents import (
agent_schema,
agent_template_schema,
app_schema,
department_schema,
file_record_schema,
mail_record_schema,
orgs_schema,
record_group_schema,
record_schema,
team_schema,
ticket_record_schema,
user_schema,
webpage_record_schema,
)
from app.schema.arango.edges import (
basic_edge_schema,
belongs_to_schema,
is_of_type_schema,
permissions_schema,
record_relations_schema,
user_app_relation_schema,
user_drive_relation_schema,
)
from app.models.entities import (AppUserGroup, FileRecord, Record, RecordGroup,
User)
from app.schema.arango.documents import (agent_schema, agent_template_schema,
app_schema, department_schema,
file_record_schema,
mail_record_schema, orgs_schema,
record_group_schema, record_schema,
team_schema, ticket_record_schema,
user_schema, webpage_record_schema)
from app.schema.arango.edges import (basic_edge_schema, belongs_to_schema,
is_of_type_schema, permissions_schema,
record_relations_schema,
user_app_relation_schema,
user_drive_relation_schema)
from app.schema.arango.graph import EDGE_DEFINITIONS
from app.utils.time_conversion import get_epoch_timestamp_in_ms
from arango import ArangoClient # type: ignore
from arango.database import TransactionDatabase # type: ignore
from codeflash.code_utils.codeflash_wrap_decorator import \
codeflash_performance_async
from fastapi import Request # type: ignore

"""ArangoDB service for interacting with the database"""




# Collection definitions with their schemas
NODE_COLLECTIONS = [
Expand Down Expand Up @@ -180,10 +169,11 @@ def __init__(
}

# Initialize collections dictionary
self._collections = {
collection_name: None
for collection_name, _ in NODE_COLLECTIONS + EDGE_COLLECTIONS
}
# Use generator expression for slight performance over list concatenation
self._collections = dict.fromkeys(
[name for name, _ in NODE_COLLECTIONS] +
[name for name, _ in EDGE_COLLECTIONS], None
)

async def _initialize_new_collections(self) -> None:
"""Initialize all collections (both nodes and edges)"""
Expand Down Expand Up @@ -1782,6 +1772,7 @@ async def remove_user_access_to_record(self, connector_name: Connectors, externa
self.logger.error(f"❌ Failed to remove user access {external_id} from {connector_name}: {str(e)}")
raise

@codeflash_performance_async
async def _remove_user_access_from_record(self, record_id: str, user_id: str) -> Dict:
"""Remove a specific user's access to a record"""
try:
Expand All @@ -1796,12 +1787,16 @@ async def _remove_user_access_from_record(self, record_id: str, user_id: str) ->
RETURN OLD
"""

cursor = self.db.aql.execute(user_removal_query, bind_vars={
"record_from": f"records/{record_id}",
"user_to": f"users/{user_id}"
})
# Use run_in_executor to avoid blocking event loop on sync DB I/O
def _execute_query():
cursor = self.db.aql.execute(user_removal_query, bind_vars={
"record_from": f"records/{record_id}",
"user_to": f"users/{user_id}"
})
return list(cursor)

removed_permissions = await asyncio.to_thread(_execute_query)

removed_permissions = list(cursor)

if removed_permissions:
self.logger.info(f"✅ Removed {len(removed_permissions)} permission(s) for user {user_id} on record {record_id}")
Expand Down Expand Up @@ -3475,7 +3470,6 @@ async def batch_create_edges(
) -> bool | None:
"""Batch create PARENT_CHILD relationships"""
try:
self.logger.info("🚀 Batch creating edges: %s", collection)

batch_query = """
FOR edge IN @edges
Expand All @@ -3491,6 +3485,10 @@ async def batch_create_edges(

cursor = db.aql.execute(batch_query, bind_vars=bind_vars)
results = list(cursor)
self.logger.info(
"🚀 Batch creating edges: %s", collection
)

self.logger.info(
"✅ Successfully created %d edges in collection '%s'.",
len(results),
Expand Down Expand Up @@ -3576,6 +3574,7 @@ async def get_record_by_conversation_index(
)
return None

@codeflash_performance_async
async def get_record_owner_source_user_email(
self,
record_id: str,
Expand Down Expand Up @@ -3604,7 +3603,11 @@ async def get_record_owner_source_user_email(
"""

db = transaction if transaction else self.db
cursor = db.aql.execute(query, bind_vars={"record_id": record_id})

# Offload the blocking db.aql.execute to a thread and make it async
cursor = await asyncio.to_thread(
db.aql.execute, query, bind_vars={"record_id": record_id}
)
result = next(cursor, None)
return result

Expand Down