Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Nov 13, 2025

📄 7,095% (70.95x) speedup for BaseArangoService.delete_google_drive_record in backend/python/app/connectors/services/base_arango_service.py

⏱️ Runtime : 685 milliseconds 9.52 milliseconds (best of 100 runs)

📝 Explanation and details

The optimized code achieves a 70x runtime speedup and 30% throughput improvement through several key optimizations:

1. Parallelized Independent Database Operations
The most significant optimization is in _execute_drive_record_deletion, where previously sequential database operations are now run concurrently using asyncio.gather(). The file record fetch, edge deletion, and permission deletion are independent operations that can execute simultaneously, dramatically reducing I/O wait time. Additionally, file record and main record deletions are parallelized when both operations are needed.

2. Query Caching Optimization
In get_user_by_user_id, the f-string query construction is replaced with a static query template using bind variables for the collection name. This enables ArangoDB's query engine to cache and reuse query plans more effectively, reducing query parsing and optimization overhead on repeated calls.

3. Early User Validation
The user lookup is moved before logging operations, allowing faster exit when users don't exist (preventing unnecessary logging and permission checks in failure cases).

4. Reduced Logging Overhead
Non-critical debug logging in hot paths is minimized, particularly in _check_drive_permissions, reducing string formatting overhead during high-frequency operations.

5. Streamlined Transaction Handling
Lambda wrapper removal in transaction commit/abort calls eliminates unnecessary function call overhead.

Performance Impact by Test Type:

  • High-volume concurrent tests (50-100 deletions) see the greatest benefit from parallelization
  • Basic success cases benefit from query caching and reduced logging
  • Permission failure cases benefit from early user validation

The optimization is particularly effective for workloads involving bulk record deletions or high-frequency deletion operations, where the parallelized database operations and improved query caching compound to deliver substantial performance gains.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 247 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 76.9%
🌀 Generated Regression Tests and Runtime

import asyncio # used to run async functions
from unittest.mock import AsyncMock, MagicMock, patch

import pytest # used for our unit tests
from app.connectors.services.base_arango_service import BaseArangoService

Function to test (copied verbatim as per instructions)

... [Function code omitted here for brevity, see above for full code] ...

---------------------

UNIT TESTS START HERE

---------------------

@pytest.fixture
def base_arango_service():
"""
Fixture to create a BaseArangoService instance with mocked dependencies.
"""
logger = MagicMock()
arango_client = MagicMock()
config_service = MagicMock()
kafka_service = MagicMock()
service = BaseArangoService(
logger=logger,
arango_client=arango_client,
config_service=config_service,
kafka_service=kafka_service
)
# Mock .db on the service
service.db = MagicMock()
return service

@pytest.mark.asyncio
async def test_delete_google_drive_record_success(base_arango_service):
"""
Basic test: Should delete a record successfully if user and permissions are valid.
"""
# Arrange
record_id = "rec_1"
user_id = "user_1"
record = {"_key": record_id, "some": "data"}
user = {"_key": "user_key_1", "userId": user_id}
user_role = "OWNER"
deletion_result = {
"success": True,
"record_id": record_id,
"connector": "google_drive",
"user_role": user_role
}

# Patch methods
base_arango_service.get_user_by_user_id = AsyncMock(return_value=user)
base_arango_service._check_drive_permissions = AsyncMock(return_value=user_role)
base_arango_service._execute_drive_record_deletion = AsyncMock(return_value=deletion_result)

# Act
result = await base_arango_service.delete_google_drive_record(record_id, user_id, record)
base_arango_service.get_user_by_user_id.assert_awaited_once_with(user_id)
base_arango_service._check_drive_permissions.assert_awaited_once_with(record_id, user["_key"])
base_arango_service._execute_drive_record_deletion.assert_awaited_once_with(record_id, record, user_role)

@pytest.mark.asyncio
async def test_delete_google_drive_record_user_not_found(base_arango_service):
"""
Edge: Should return 404 if user is not found.
"""
record_id = "rec_2"
user_id = "user_missing"
record = {"_key": record_id}

base_arango_service.get_user_by_user_id = AsyncMock(return_value=None)

result = await base_arango_service.delete_google_drive_record(record_id, user_id, record)
base_arango_service.get_user_by_user_id.assert_awaited_once_with(user_id)

@pytest.mark.asyncio
async def test_delete_google_drive_record_insufficient_permissions(base_arango_service):
"""
Edge: Should return 403 if user does not have sufficient Drive permissions.
"""
record_id = "rec_3"
user_id = "user_3"
record = {"_key": record_id}
user = {"_key": "user_key_3", "userId": user_id}

base_arango_service.get_user_by_user_id = AsyncMock(return_value=user)
base_arango_service._check_drive_permissions = AsyncMock(return_value="READER")  # not allowed

result = await base_arango_service.delete_google_drive_record(record_id, user_id, record)
base_arango_service.get_user_by_user_id.assert_awaited_once_with(user_id)
base_arango_service._check_drive_permissions.assert_awaited_once_with(record_id, user["_key"])

@pytest.mark.asyncio
async def test_delete_google_drive_record_permission_none(base_arango_service):
"""
Edge: Should return 403 if user has no Drive permissions at all.
"""
record_id = "rec_4"
user_id = "user_4"
record = {"_key": record_id}
user = {"_key": "user_key_4", "userId": user_id}

base_arango_service.get_user_by_user_id = AsyncMock(return_value=user)
base_arango_service._check_drive_permissions = AsyncMock(return_value=None)

result = await base_arango_service.delete_google_drive_record(record_id, user_id, record)

@pytest.mark.asyncio
async def test_delete_google_drive_record_execute_drive_record_deletion_error(base_arango_service):
"""
Edge: Should catch and handle exceptions from _execute_drive_record_deletion.
"""
record_id = "rec_5"
user_id = "user_5"
record = {"_key": record_id}
user = {"_key": "user_key_5", "userId": user_id}

base_arango_service.get_user_by_user_id = AsyncMock(return_value=user)
base_arango_service._check_drive_permissions = AsyncMock(return_value="OWNER")
base_arango_service._execute_drive_record_deletion = AsyncMock(side_effect=Exception("Simulated deletion error"))

result = await base_arango_service.delete_google_drive_record(record_id, user_id, record)

@pytest.mark.asyncio
async def test_delete_google_drive_record_concurrent_calls(base_arango_service):
"""
Edge: Test concurrent execution of delete_google_drive_record.
"""
record_ids = [f"rec_{i}" for i in range(3)]
user_id = "user_concurrent"
record = {"_key": "some", "data": "here"}
user = {"_key": "user_key_concurrent", "userId": user_id}
user_role = "OWNER"
deletion_result = {
"success": True,
"record_id": None,
"connector": "google_drive",
"user_role": user_role
}

# Patch methods for all calls
base_arango_service.get_user_by_user_id = AsyncMock(return_value=user)
base_arango_service._check_drive_permissions = AsyncMock(return_value=user_role)
async def _execute(record_id, record, user_role):
    # Simulate a unique result for each record
    return {**deletion_result, "record_id": record_id}
base_arango_service._execute_drive_record_deletion = AsyncMock(side_effect=_execute)

# Act: run concurrently
coros = [
    base_arango_service.delete_google_drive_record(rid, user_id, record)
    for rid in record_ids
]
results = await asyncio.gather(*coros)

@pytest.mark.asyncio
async def test_delete_google_drive_record_internal_exception(base_arango_service):
"""
Edge: Should catch and handle unexpected exceptions in the main method.
"""
record_id = "rec_6"
user_id = "user_6"
record = {"_key": record_id}

# Patch get_user_by_user_id to raise an exception
base_arango_service.get_user_by_user_id = AsyncMock(side_effect=Exception("DB error!"))

result = await base_arango_service.delete_google_drive_record(record_id, user_id, record)

@pytest.mark.asyncio
async def test_delete_google_drive_record_basic_async_behavior(base_arango_service):
"""
Basic: Test that the function is a coroutine and requires await.
"""
record_id = "rec_7"
user_id = "user_7"
record = {"_key": record_id}
user = {"_key": "user_key_7", "userId": user_id}
user_role = "OWNER"
deletion_result = {
"success": True,
"record_id": record_id,
"connector": "google_drive",
"user_role": user_role
}

base_arango_service.get_user_by_user_id = AsyncMock(return_value=user)
base_arango_service._check_drive_permissions = AsyncMock(return_value=user_role)
base_arango_service._execute_drive_record_deletion = AsyncMock(return_value=deletion_result)

# Should return a coroutine if not awaited
codeflash_output = base_arango_service.delete_google_drive_record(record_id, user_id, record); coro = codeflash_output
result = await coro

@pytest.mark.asyncio
async def test_delete_google_drive_record_handles_unexpected_return_from_check_permissions(base_arango_service):
"""
Edge: If _check_drive_permissions returns a value not in allowed_roles, should return 403.
"""
record_id = "rec_8"
user_id = "user_8"
record = {"_key": record_id}
user = {"_key": "user_key_8", "userId": user_id}

base_arango_service.get_user_by_user_id = AsyncMock(return_value=user)
base_arango_service._check_drive_permissions = AsyncMock(return_value="READER")  # Not in allowed_roles

result = await base_arango_service.delete_google_drive_record(record_id, user_id, record)

@pytest.mark.asyncio
async def test_delete_google_drive_record_handles_dict_record_input(base_arango_service):
"""
Basic: Should accept any dict for 'record' parameter.
"""
record_id = "rec_9"
user_id = "user_9"
record = {"foo": "bar"}
user = {"_key": "user_key_9", "userId": user_id}
user_role = "OWNER"
deletion_result = {
"success": True,
"record_id": record_id,
"connector": "google_drive",
"user_role": user_role
}

base_arango_service.get_user_by_user_id = AsyncMock(return_value=user)
base_arango_service._check_drive_permissions = AsyncMock(return_value=user_role)
base_arango_service._execute_drive_record_deletion = AsyncMock(return_value=deletion_result)

result = await base_arango_service.delete_google_drive_record(record_id, user_id, record)

@pytest.mark.asyncio
async def test_delete_google_drive_record_large_scale_concurrent_load(base_arango_service):
"""
Large Scale: Test with 50 concurrent deletions.
"""
num_calls = 50
record_ids = [f"rec_{i}" for i in range(num_calls)]
user_id = "user_bulk"
record = {"bulk": True}
user = {"_key": "user_key_bulk", "userId": user_id}
user_role = "OWNER"
deletion_result = {
"success": True,
"record_id": None,
"connector": "google_drive",
"user_role": user_role
}

base_arango_service.get_user_by_user_id = AsyncMock(return_value=user)
base_arango_service._check_drive_permissions = AsyncMock(return_value=user_role)
async def _execute(record_id, record, user_role):
    return {**deletion_result, "record_id": record_id}
base_arango_service._execute_drive_record_deletion = AsyncMock(side_effect=_execute)

coros = [
    base_arango_service.delete_google_drive_record(rid, user_id, record)
    for rid in record_ids
]
results = await asyncio.gather(*coros)

@pytest.mark.asyncio
async def test_delete_google_drive_record_throughput_small_load(base_arango_service):
"""
Throughput: Test performance with a small batch of 5 deletions.
"""
record_ids = [f"rec_throughput_{i}" for i in range(5)]
user_id = "user_throughput"
record = {"throughput": True}
user = {"_key": "user_key_throughput", "userId": user_id}
user_role = "OWNER"
deletion_result = {
"success": True,
"record_id": None,
"connector": "google_drive",
"user_role": user_role
}

base_arango_service.get_user_by_user_id = AsyncMock(return_value=user)
base_arango_service._check_drive_permissions = AsyncMock(return_value=user_role)
async def _execute(record_id, record, user_role):
    return {**deletion_result, "record_id": record_id}
base_arango_service._execute_drive_record_deletion = AsyncMock(side_effect=_execute)

coros = [
    base_arango_service.delete_google_drive_record(rid, user_id, record)
    for rid in record_ids
]
results = await asyncio.gather(*coros)

@pytest.mark.asyncio
async def test_delete_google_drive_record_throughput_medium_load(base_arango_service):
"""
Throughput: Test performance with a medium batch of 30 deletions.
"""
record_ids = [f"rec_throughput_{i}" for i in range(30)]
user_id = "user_throughput_medium"
record = {"throughput": "medium"}
user = {"_key": "user_key_throughput_medium", "userId": user_id}
user_role = "OWNER"
deletion_result = {
"success": True,
"record_id": None,
"connector": "google_drive",
"user_role": user_role
}

base_arango_service.get_user_by_user_id = AsyncMock(return_value=user)
base_arango_service._check_drive_permissions = AsyncMock(return_value=user_role)
async def _execute(record_id, record, user_role):
    return {**deletion_result, "record_id": record_id}
base_arango_service._execute_drive_record_deletion = AsyncMock(side_effect=_execute)

coros = [
    base_arango_service.delete_google_drive_record(rid, user_id, record)
    for rid in record_ids
]
results = await asyncio.gather(*coros)

@pytest.mark.asyncio
async def test_delete_google_drive_record_throughput_high_volume(base_arango_service):
"""
Throughput: Test performance with a high volume (100) of concurrent deletions.
"""
record_ids = [f"rec_throughput_{i}" for i in range(100)]
user_id = "user_throughput_high"
record = {"throughput": "high"}
user = {"_key": "user_key_throughput_high", "userId": user_id}
user_role = "OWNER"
deletion_result = {
"success": True,
"record_id": None,
"connector": "google_drive",
"user_role": user_role
}

base_arango_service.get_user_by_user_id = AsyncMock(return_value=user)
base_arango_service._check_drive_permissions = AsyncMock(return_value=user_role)
async def _execute(record_id, record, user_role):
    return {**deletion_result, "record_id": record_id}
base_arango_service._execute_drive_record_deletion = AsyncMock(side_effect=_execute)

coros = [
    base_arango_service.delete_google_drive_record(rid, user_id, record)
    for rid in record_ids
]
results = await asyncio.gather(*coros)

codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

#------------------------------------------------
import asyncio # used to run async functions
from typing import Dict, Optional

import pytest # used for our unit tests
from app.connectors.services.base_arango_service import BaseArangoService

---- BEGIN: Minimal Stubs and Mocks for Dependencies ----

class DummyLogger:
def init(self):
self.infos = []
self.errors = []
self.warnings = []
self.debugs = []
def info(self, msg, *args): self.infos.append(msg % args if args else msg)
def error(self, msg, *args): self.errors.append(msg % args if args else msg)
def warning(self, msg, *args): self.warnings.append(msg % args if args else msg)
def debug(self, msg, *args): self.debugs.append(msg % args if args else msg)

class DummyCursor:
def init(self, items): self.items = items
def iter(self): return iter(self.items)
def next(self): return next(iter(self.items))

class DummyAQL:
def init(self, db):
self.db = db
self.last_query = None
self.last_bind_vars = None
self.execute_calls = []

def execute(self, query, bind_vars=None):
    self.last_query = query
    self.last_bind_vars = bind_vars
    self.execute_calls.append((query, bind_vars))
    # Simulate different queries based on context
    if "FOR user IN users" in query:
        # get_user_by_user_id
        user_id = bind_vars.get("user_id")
        user = self.db._users.get(user_id)
        if user:
            return DummyCursor([user])
        return DummyCursor([])
    elif "FOR doc IN" in query:
        # get_document
        collection = bind_vars.get("@collection")
        document_key = bind_vars.get("document_key")
        docs = self.db._collections.get(collection, {})
        doc = docs.get(document_key)
        if doc:
            return DummyCursor([doc])
        return DummyCursor([])
    elif "LET user_from = CONCAT('users/', @user_key)" in query:
        # _check_drive_permissions
        record_id = bind_vars.get("record_id")
        user_key = bind_vars.get("user_key")
        # Simulate permission lookup logic
        permission = self.db._permissions.get((record_id, user_key))
        if permission:
            return DummyCursor([{"permission": permission, "source": "DIRECT"}])
        return DummyCursor([{"permission": None, "source": "NONE"}])
    elif "REMOVE" in query:
        # Simulate deletion queries for edges, files, records, anyone
        # Always return empty list (simulate no error)
        return DummyCursor([])
    return DummyCursor([])

class DummyTransaction:
def init(self):
self.aql = DummyAQL(self)
self.committed = False
self.aborted = False
self.commit_transaction_calls = 0
self.abort_transaction_calls = 0
def commit_transaction(self):
self.committed = True
self.commit_transaction_calls += 1
def abort_transaction(self):
self.aborted = True
self.abort_transaction_calls += 1

class DummyDB:
def init(self, users=None, collections=None, permissions=None):
self.aql = DummyAQL(self)
self.begin_transaction_calls = []
self._users = users if users is not None else {}
self._collections = collections if collections is not None else {}
self._permissions = permissions if permissions is not None else {}
self._transactions = []
def begin_transaction(self, write):
t = DummyTransaction()
self._transactions.append(t)
self.begin_transaction_calls.append(write)
return t

class DummyConfigService:
pass

class DummyKafkaService:
pass

---- END: Minimal Stubs and Mocks for Dependencies ----

---- BEGIN: CONSTANTS ----

class CollectionNames:
RECORDS = type("Enum", (), {"value": "records"})
DRIVES = type("Enum", (), {"value": "drives"})
FILES = type("Enum", (), {"value": "files"})
LINKS = type("Enum", (), {"value": "links"})
MAILS = type("Enum", (), {"value": "mails"})
WEBPAGES = type("Enum", (), {"value": "webpages"})
PEOPLE = type("Enum", (), {"value": "people"})
USERS = type("Enum", (), {"value": "users"})
GROUPS = type("Enum", (), {"value": "groups"})
ORGS = type("Enum", (), {"value": "orgs"})
ANYONE = type("Enum", (), {"value": "anyone"})
CHANNEL_HISTORY = type("Enum", (), {"value": "channel_history"})
PAGE_TOKENS = type("Enum", (), {"value": "page_tokens"})
APPS = type("Enum", (), {"value": "apps"})
DEPARTMENTS = type("Enum", (), {"value": "departments"})
CATEGORIES = type("Enum", (), {"value": "categories"})
LANGUAGES = type("Enum", (), {"value": "languages"})
TOPICS = type("Enum", (), {"value": "topics"})
SUBCATEGORIES1 = type("Enum", (), {"value": "subcategories1"})
SUBCATEGORIES2 = type("Enum", (), {"value": "subcategories2"})
SUBCATEGORIES3 = type("Enum", (), {"value": "subcategories3"})
BLOCKS = type("Enum", (), {"value": "blocks"})
RECORD_GROUPS = type("Enum", (), {"value": "record_groups"})
AGENT_INSTANCES = type("Enum", (), {"value": "agent_instances"})
AGENT_TEMPLATES = type("Enum", (), {"value": "agent_templates"})
TICKETS = type("Enum", (), {"value": "tickets"})
SYNC_POINTS = type("Enum", (), {"value": "sync_points"})
TEAMS = type("Enum", (), {"value": "teams"})
VIRTUAL_RECORD_TO_DOC_ID_MAPPING = type("Enum", (), {"value": "virtual_record_to_doc_id_mapping"})
IS_OF_TYPE = type("Enum", (), {"value": "is_of_type"})
RECORD_RELATIONS = type("Enum", (), {"value": "record_relations"})
USER_DRIVE_RELATION = type("Enum", (), {"value": "user_drive_relation"})
BELONGS_TO_DEPARTMENT = type("Enum", (), {"value": "belongs_to_department"})
ORG_DEPARTMENT_RELATION = type("Enum", (), {"value": "org_department_relation"})
BELONGS_TO = type("Enum", (), {"value": "belongs_to"})
PERMISSIONS = type("Enum", (), {"value": "permissions"})
ORG_APP_RELATION = type("Enum", (), {"value": "org_app_relation"})
USER_APP_RELATION = type("Enum", (), {"value": "user_app_relation"})
BELONGS_TO_CATEGORY = type("Enum", (), {"value": "belongs_to_category"})
BELONGS_TO_LANGUAGE = type("Enum", (), {"value": "belongs_to_language"})
BELONGS_TO_TOPIC = type("Enum", (), {"value": "belongs_to_topic"})
BELONGS_TO_RECORD_GROUP = type("Enum", (), {"value": "belongs_to_record_group"})
INTER_CATEGORY_RELATIONS = type("Enum", (), {"value": "inter_category_relations"})
PERMISSIONS_TO_KB = type("Enum", (), {"value": "permissions_to_kb"})
PERMISSION = type("Enum", (), {"value": "permission"})

class Connectors:
GOOGLE_DRIVE = type("Enum", (), {"value": "google_drive"})
GOOGLE_MAIL = type("Enum", (), {"value": "google_mail"})
OUTLOOK = type("Enum", (), {"value": "outlook"})
KNOWLEDGE_BASE = type("Enum", (), {"value": "knowledge_base"})

---- END: CONSTANTS ----

---- BEGIN: FUNCTION TO TEST (EXACT COPY) ----

(Paste the full BaseArangoService class implementation here, as provided above.)

---- END: FUNCTION TO TEST ----

---- BEGIN: PATCHING FOR TESTS ----

Patch BaseArangoService to use our dummy DB, logger, etc.

@pytest.fixture
def base_arango_service_factory():
def _factory(users=None, collections=None, permissions=None):
logger = DummyLogger()
db = DummyDB(users=users, collections=collections, permissions=permissions)
config = DummyConfigService()
kafka = DummyKafkaService()
service = BaseArangoService(logger, None, config, kafka)
service.db = db
# Patch _publish_record_event and _create_deleted_record_event_payload to be async no-ops
async def dummy_publish_record_event(event_type, payload): return None
async def dummy_create_deleted_record_event_payload(record, file_record): return {"dummy": "payload"}
service._publish_record_event = dummy_publish_record_event
service._create_deleted_record_event_payload = dummy_create_deleted_record_event_payload
return service, logger, db
return _factory

---- END: PATCHING FOR TESTS ----

---- BEGIN: BASIC TEST CASES ----

@pytest.mark.asyncio
async def test_delete_google_drive_record_success_owner(base_arango_service_factory):
"""Test successful deletion with OWNER role"""
# Setup: user exists, has OWNER permission, record exists
users = {"u1": {"_key": "ukey1", "userId": "u1"}}
collections = {
CollectionNames.FILES.value: {"r1": {"_key": "r1", "driveId": "d1"}},
CollectionNames.RECORDS.value: {"r1": {"_key": "r1"}},
}
permissions = {("r1", "ukey1"): "OWNER"}
service, logger, db = base_arango_service_factory(users, collections, permissions)
result = await service.delete_google_drive_record("r1", "u1", {"_key": "r1"})

@pytest.mark.asyncio
async def test_delete_google_drive_record_success_writer(base_arango_service_factory):
"""Test successful deletion with WRITER role"""
users = {"u2": {"_key": "ukey2", "userId": "u2"}}
collections = {
CollectionNames.FILES.value: {"r2": {"_key": "r2", "driveId": "d2"}},
CollectionNames.RECORDS.value: {"r2": {"_key": "r2"}},
}
permissions = {("r2", "ukey2"): "WRITER"}
service, logger, db = base_arango_service_factory(users, collections, permissions)
result = await service.delete_google_drive_record("r2", "u2", {"_key": "r2"})

@pytest.mark.asyncio
async def test_delete_google_drive_record_success_fileorganizer(base_arango_service_factory):
"""Test successful deletion with FILEORGANIZER role"""
users = {"u3": {"_key": "ukey3", "userId": "u3"}}
collections = {
CollectionNames.FILES.value: {"r3": {"_key": "r3", "driveId": "d3"}},
CollectionNames.RECORDS.value: {"r3": {"_key": "r3"}},
}
permissions = {("r3", "ukey3"): "FILEORGANIZER"}
service, logger, db = base_arango_service_factory(users, collections, permissions)
result = await service.delete_google_drive_record("r3", "u3", {"_key": "r3"})

@pytest.mark.asyncio
async def test_delete_google_drive_record_user_not_found(base_arango_service_factory):
"""Test deletion fails when user does not exist"""
users = {} # No users
service, logger, db = base_arango_service_factory(users)
result = await service.delete_google_drive_record("r4", "u4", {"_key": "r4"})

@pytest.mark.asyncio
async def test_delete_google_drive_record_insufficient_permissions(base_arango_service_factory):
"""Test deletion fails when user has insufficient permissions"""
users = {"u5": {"_key": "ukey5", "userId": "u5"}}
# User has READER permission, which is not allowed
permissions = {("r5", "ukey5"): "READER"}
service, logger, db = base_arango_service_factory(users, {}, permissions)
result = await service.delete_google_drive_record("r5", "u5", {"_key": "r5"})

---- END: BASIC TEST CASES ----

---- BEGIN: EDGE TEST CASES ----

@pytest.mark.asyncio
async def test_delete_google_drive_record_no_permissions_found(base_arango_service_factory):
"""Test deletion fails when user has no permissions at all"""
users = {"u6": {"_key": "ukey6", "userId": "u6"}}
permissions = {} # No permissions
service, logger, db = base_arango_service_factory(users, {}, permissions)
result = await service.delete_google_drive_record("r6", "u6", {"_key": "r6"})

@pytest.mark.asyncio

async def test_delete_google_drive_record_concurrent_deletions(base_arango_service_factory):
"""Test concurrent deletion calls for different records/users"""
users = {
"u8": {"_key": "ukey8", "userId": "u8"},
"u9": {"_key": "ukey9", "userId": "u9"},
}
collections = {
CollectionNames.FILES.value: {
"r8": {"_key": "r8", "driveId": "d8"},
"r9": {"_key": "r9", "driveId": "d9"},
},
CollectionNames.RECORDS.value: {
"r8": {"_key": "r8"},
"r9": {"_key": "r9"},
},
}
permissions = {
("r8", "ukey8"): "OWNER",
("r9", "ukey9"): "WRITER",
}
service, logger, db = base_arango_service_factory(users, collections, permissions)
# Run two deletions concurrently
results = await asyncio.gather(
service.delete_google_drive_record("r8", "u8", {"_key": "r8"}),
service.delete_google_drive_record("r9", "u9", {"_key": "r9"}),
)

@pytest.mark.asyncio
async def test_delete_google_drive_record_edge_case_missing_file_record(base_arango_service_factory):
"""Test deletion when file record is missing but main record exists"""
users = {"u10": {"_key": "ukey10", "userId": "u10"}}
collections = {
CollectionNames.FILES.value: {}, # No file record
CollectionNames.RECORDS.value: {"r10": {"_key": "r10"}},
}
permissions = {("r10", "ukey10"): "OWNER"}
service, logger, db = base_arango_service_factory(users, collections, permissions)
result = await service.delete_google_drive_record("r10", "u10", {"_key": "r10"})

---- END: EDGE TEST CASES ----

---- BEGIN: LARGE SCALE TEST CASES ----

@pytest.mark.asyncio
async def test_delete_google_drive_record_many_concurrent_deletions(base_arango_service_factory):
"""Test scalability with 20 concurrent deletions"""
users = {f"u{i}": {"_key": f"ukey{i}", "userId": f"u{i}"} for i in range(20)}
collections = {
CollectionNames.FILES.value: {f"r{i}": {"_key": f"r{i}", "driveId": f"d{i}"} for i in range(20)},
CollectionNames.RECORDS.value: {f"r{i}": {"_key": f"r{i}"} for i in range(20)},
}
permissions = {(f"r{i}", f"ukey{i}"): "OWNER" for i in range(20)}
service, logger, db = base_arango_service_factory(users, collections, permissions)
tasks = [
service.delete_google_drive_record(f"r{i}", f"u{i}", {"_key": f"r{i}"})
for i in range(20)
]
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
pass

@pytest.mark.asyncio
async def test_delete_google_drive_record_large_scale_permissions(base_arango_service_factory):
"""Test with mixed permissions for 50 records/users"""
users = {f"u{i}": {"_key": f"ukey{i}", "userId": f"u{i}"} for i in range(50)}
collections = {
CollectionNames.FILES.value: {f"r{i}": {"_key": f"r{i}", "driveId": f"d{i}"} for i in range(50)},
CollectionNames.RECORDS.value: {f"r{i}": {"_key": f"r{i}"} for i in range(50)},
}
permissions = {}
# Alternate OWNER, WRITER, FILEORGANIZER, READER, None
roles = ["OWNER", "WRITER", "FILEORGANIZER", "READER", None]
for i in range(50):
role = roles[i % len(roles)]
if role:
permissions[(f"r{i}", f"ukey{i}")] = role
service, logger, db = base_arango_service_factory(users, collections, permissions)
tasks = [
service.delete_google_drive_record(f"r{i}", f"u{i}", {"_key": f"r{i}"})
for i in range(50)
]
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
role = roles[i % len(roles)]
if role in ["OWNER", "WRITER", "FILEORGANIZER"]:
pass
else:
pass

---- END: LARGE SCALE TEST CASES ----

---- BEGIN: THROUGHPUT TEST CASES ----

@pytest.mark.asyncio
async def test_delete_google_drive_record_throughput_small_load(base_arango_service_factory):
"""Throughput test: small load (5 deletions)"""
users = {f"u{i}": {"_key": f"ukey{i}", "userId": f"u{i}"} for i in range(5)}
collections = {
CollectionNames.FILES.value: {f"r{i}": {"_key": f"r{i}", "driveId": f"d{i}"} for i in range(5)},
CollectionNames.RECORDS.value: {f"r{i}": {"_key": f"r{i}"} for i in range(5)},
}
permissions = {(f"r{i}", f"ukey{i}"): "OWNER" for i in range(5)}
service, logger, db = base_arango_service_factory(users, collections, permissions)
tasks = [service.delete_google_drive_record(f"r{i}", f"u{i}", {"_key": f"r{i}"}) for i in range(5)]
results = await asyncio.gather(*tasks)

@pytest.mark.asyncio
async def test_delete_google_drive_record_throughput_medium_load(base_arango_service_factory):
"""Throughput test: medium load (50 deletions)"""
users = {f"u{i}": {"_key": f"ukey{i}", "userId": f"u{i}"} for i in range(50)}
collections = {
CollectionNames.FILES.value: {f"r{i}": {"_key": f"r{i}", "driveId": f"d{i}"} for i in range(50)},
CollectionNames.RECORDS.value: {f"r{i}": {"_key": f"r{i}"} for i in range(50)},
}
permissions = {(f"r{i}", f"ukey{i}"): "OWNER" for i in range(50)}
service, logger, db = base_arango_service_factory(users, collections, permissions)
tasks = [service.delete_google_drive_record(f"r{i}", f"u{i}", {"_key": f"r{i}"}) for i in range(50)]
results = await asyncio.gather(*tasks)

@pytest.mark.asyncio
async def test_delete_google_drive_record_throughput_high_volume(base_arango_service_factory):
"""Throughput test: high volume (100 deletions)"""
users = {f"u{i}": {"_key": f"ukey{i}", "userId": f"u{i}"} for i in range(100)}
collections = {
CollectionNames.FILES.value: {f"r{i}": {"_key": f"r{i}", "driveId": f"d{i}"} for i in range(100)},
CollectionNames.RECORDS.value: {f"r{i}": {"_key": f"r{i}"} for i in range(100)},
}
permissions = {(f"r{i}", f"ukey{i}"): "OWNER" for i in range(100)}
service, logger, db = base_arango_service_factory(users, collections, permissions)
tasks = [service.delete_google_drive_record(f"r{i}", f"u{i}", {"_key": f"r{i}"}) for i in range(100)]
results = await asyncio.gather(*tasks)

codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-BaseArangoService.delete_google_drive_record-mhxljd4l and push.

Codeflash Static Badge

The optimized code achieves a **70x runtime speedup** and **30% throughput improvement** through several key optimizations:

**1. Parallelized Independent Database Operations**
The most significant optimization is in `_execute_drive_record_deletion`, where previously sequential database operations are now run concurrently using `asyncio.gather()`. The file record fetch, edge deletion, and permission deletion are independent operations that can execute simultaneously, dramatically reducing I/O wait time. Additionally, file record and main record deletions are parallelized when both operations are needed.

**2. Query Caching Optimization** 
In `get_user_by_user_id`, the f-string query construction is replaced with a static query template using bind variables for the collection name. This enables ArangoDB's query engine to cache and reuse query plans more effectively, reducing query parsing and optimization overhead on repeated calls.

**3. Early User Validation**
The user lookup is moved before logging operations, allowing faster exit when users don't exist (preventing unnecessary logging and permission checks in failure cases).

**4. Reduced Logging Overhead**
Non-critical debug logging in hot paths is minimized, particularly in `_check_drive_permissions`, reducing string formatting overhead during high-frequency operations.

**5. Streamlined Transaction Handling**
Lambda wrapper removal in transaction commit/abort calls eliminates unnecessary function call overhead.

**Performance Impact by Test Type:**
- **High-volume concurrent tests** (50-100 deletions) see the greatest benefit from parallelization
- **Basic success cases** benefit from query caching and reduced logging
- **Permission failure cases** benefit from early user validation

The optimization is particularly effective for workloads involving bulk record deletions or high-frequency deletion operations, where the parallelized database operations and improved query caching compound to deliver substantial performance gains.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 November 13, 2025 15:41
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Nov 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant