⚡️ Speed up method BaseArangoService.batch_upsert_nodes by 31%
#647
+49
−49
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
📄 31% (0.31x) speedup for
BaseArangoService.batch_upsert_nodesinbackend/python/app/connectors/services/base_arango_service.py⏱️ Runtime :
4.34 milliseconds→3.30 milliseconds(best of80runs)📝 Explanation and details
The optimized code achieves a 31% runtime improvement (4.34ms → 3.30ms) through a simple but effective early return optimization for empty node lists.
Key Optimization:
if not nodes: return Trueat the function start to skip all database operations, logging, and query execution when the input list is emptyWhy This Works:
The line profiler reveals that the original code spends significant time on logging operations (67.8% of total time on the first info log alone). When nodes are empty, this logging and subsequent database query execution becomes pure overhead. The optimization eliminates:
Performance Impact:
Test Case Analysis:
The optimization particularly benefits scenarios with:
This is a zero-risk optimization that preserves all existing behavior while eliminating wasteful operations for a common edge case (empty input batches).
✅ Correctness verification report:
🌀 Generated Regression Tests and Runtime
import asyncio # used to run async functions
Collection definitions with their schemas
from typing import Dict, List, Optional
from unittest.mock import AsyncMock, MagicMock, patch
import pytest # used for our unit tests
from app.connectors.services.base_arango_service import BaseArangoService
--- Function to test (copied exactly as provided) ---
(The function is pasted as per your instructions, do not modify!)
pylint: disable=E1101, W0718
class DummyLogger:
def init(self):
self.infos = []
self.errors = []
def info(self, msg, *args):
self.infos.append((msg, args))
def error(self, msg, *args):
self.errors.append((msg, args))
class DummyCursor:
def init(self, results):
self._results = results
self._iter = iter(self._results)
def iter(self):
return self
def next(self):
return next(self._iter)
class DummyDB:
def init(self, results=None, fail=False, fail_msg="fail"):
self.results = results if results is not None else []
self.fail = fail
self.fail_msg = fail_msg
self.executed = []
class DummyAQL:
def init(self, parent):
self.parent = parent
def execute(self, query, bind_vars=None):
self.parent.executed.append((query, bind_vars))
if self.parent.fail:
raise Exception(self.parent.fail_msg)
return DummyCursor(self.parent.results)
@Property
def aql(self):
return self.DummyAQL(self)
class DummyTransaction(DummyDB):
pass
Minimal stubs for enums/constants used in BaseArangoService
class CollectionNames:
RECORDS = type("Enum", (), {"value": "records"})
FILES = type("Enum", (), {"value": "files"})
MAILS = type("Enum", (), {"value": "mails"})
USERS = type("Enum", (), {"value": "users"})
DRIVES = type("Enum", (), {"value": "drives"})
LINKS = type("Enum", (), {"value": "links"})
WEBPAGES = type("Enum", (), {"value": "webpages"})
PEOPLE = type("Enum", (), {"value": "people"})
GROUPS = type("Enum", (), {"value": "groups"})
ORGS = type("Enum", (), {"value": "orgs"})
ANYONE = type("Enum", (), {"value": "anyone"})
CHANNEL_HISTORY = type("Enum", (), {"value": "channel_history"})
PAGE_TOKENS = type("Enum", (), {"value": "page_tokens"})
APPS = type("Enum", (), {"value": "apps"})
DEPARTMENTS = type("Enum", (), {"value": "departments"})
CATEGORIES = type("Enum", (), {"value": "categories"})
LANGUAGES = type("Enum", (), {"value": "languages"})
TOPICS = type("Enum", (), {"value": "topics"})
SUBCATEGORIES1 = type("Enum", (), {"value": "subcategories1"})
SUBCATEGORIES2 = type("Enum", (), {"value": "subcategories2"})
SUBCATEGORIES3 = type("Enum", (), {"value": "subcategories3"})
BLOCKS = type("Enum", (), {"value": "blocks"})
RECORD_GROUPS = type("Enum", (), {"value": "record_groups"})
AGENT_INSTANCES = type("Enum", (), {"value": "agent_instances"})
AGENT_TEMPLATES = type("Enum", (), {"value": "agent_templates"})
TICKETS = type("Enum", (), {"value": "tickets"})
SYNC_POINTS = type("Enum", (), {"value": "sync_points"})
TEAMS = type("Enum", (), {"value": "teams"})
VIRTUAL_RECORD_TO_DOC_ID_MAPPING = type("Enum", (), {"value": "virtual_record_to_doc_id_mapping"})
IS_OF_TYPE = type("Enum", (), {"value": "is_of_type"})
RECORD_RELATIONS = type("Enum", (), {"value": "record_relations"})
USER_DRIVE_RELATION = type("Enum", (), {"value": "user_drive_relation"})
BELONGS_TO_DEPARTMENT = type("Enum", (), {"value": "belongs_to_department"})
ORG_DEPARTMENT_RELATION = type("Enum", (), {"value": "org_department_relation"})
BELONGS_TO = type("Enum", (), {"value": "belongs_to"})
PERMISSIONS = type("Enum", (), {"value": "permissions"})
ORG_APP_RELATION = type("Enum", (), {"value": "org_app_relation"})
USER_APP_RELATION = type("Enum", (), {"value": "user_app_relation"})
BELONGS_TO_CATEGORY = type("Enum", (), {"value": "belongs_to_category"})
BELONGS_TO_LANGUAGE = type("Enum", (), {"value": "belongs_to_language"})
BELONGS_TO_TOPIC = type("Enum", (), {"value": "belongs_to_topic"})
BELONGS_TO_RECORD_GROUP = type("Enum", (), {"value": "belongs_to_record_group"})
INTER_CATEGORY_RELATIONS = type("Enum", (), {"value": "inter_category_relations"})
PERMISSIONS_TO_KB = type("Enum", (), {"value": "permissions_to_kb"})
PERMISSION = type("Enum", (), {"value": "permission"})
class Connectors:
GOOGLE_DRIVE = type("Enum", (), {"value": "google_drive"})
GOOGLE_MAIL = type("Enum", (), {"value": "google_mail"})
OUTLOOK = type("Enum", (), {"value": "outlook"})
KNOWLEDGE_BASE = type("Enum", (), {"value": "knowledge_base"})
from app.connectors.services.base_arango_service import BaseArangoService
---- UNIT TESTS ----
@pytest.mark.asyncio
async def test_batch_upsert_nodes_basic_success():
"""
Basic: Test upserting a small list of nodes returns True and logs success.
"""
logger = DummyLogger()
db = DummyDB(results=[{"_key": "a"}, {"_key": "b"}])
service = BaseArangoService(logger, None, None)
service.db = db
@pytest.mark.asyncio
async def test_batch_upsert_nodes_empty_nodes():
"""
Basic: Test upserting with an empty nodes list returns True and does not fail.
"""
logger = DummyLogger()
db = DummyDB(results=[])
service = BaseArangoService(logger, None, None)
service.db = db
@pytest.mark.asyncio
async def test_batch_upsert_nodes_transaction_success():
"""
Basic: Test upserting with a transaction object uses the transaction's db.
"""
logger = DummyLogger()
transaction = DummyTransaction(results=[{"_key": "x"}])
service = BaseArangoService(logger, None, None)
service.db = DummyDB(results=[{"_key": "should_not_be_used"}]) # Should not be used
@pytest.mark.asyncio
async def test_batch_upsert_nodes_returns_false_on_exception_without_transaction():
"""
Edge: If db.aql.execute raises, returns False and logs error if no transaction.
"""
logger = DummyLogger()
db = DummyDB(fail=True, fail_msg="db failure")
service = BaseArangoService(logger, None, None)
service.db = db
@pytest.mark.asyncio
async def test_batch_upsert_nodes_raises_on_exception_with_transaction():
"""
Edge: If db.aql.execute raises and a transaction is provided, the exception is re-raised.
"""
logger = DummyLogger()
transaction = DummyTransaction(fail=True, fail_msg="trans failure")
service = BaseArangoService(logger, None, None)
service.db = DummyDB() # Should not be used
@pytest.mark.asyncio
async def test_batch_upsert_nodes_concurrent_success():
"""
Edge: Test concurrent upserts on different collections do not interfere.
"""
logger1 = DummyLogger()
logger2 = DummyLogger()
db1 = DummyDB(results=[{"_key": "a"}])
db2 = DummyDB(results=[{"_key": "b"}])
service1 = BaseArangoService(logger1, None, None)
service2 = BaseArangoService(logger2, None, None)
service1.db = db1
service2.db = db2
@pytest.mark.asyncio
async def test_batch_upsert_nodes_handles_large_batch():
"""
Large Scale: Upserting a large number of nodes (hundreds) works and returns True.
"""
logger = DummyLogger()
nodes = [{"_key": str(i), "val": i} for i in range(500)]
db = DummyDB(results=nodes)
service = BaseArangoService(logger, None, None)
service.db = db
@pytest.mark.asyncio
async def test_batch_upsert_nodes_concurrent_large_batches():
"""
Large Scale: Concurrent upserts of large batches on different collections.
"""
logger1 = DummyLogger()
logger2 = DummyLogger()
nodes1 = [{"_key": f"r{i}", "v": i} for i in range(300)]
nodes2 = [{"_key": f"f{i}", "v": i} for i in range(300)]
db1 = DummyDB(results=nodes1)
db2 = DummyDB(results=nodes2)
service1 = BaseArangoService(logger1, None, None)
service2 = BaseArangoService(logger2, None, None)
service1.db = db1
service2.db = db2
@pytest.mark.asyncio
async def test_batch_upsert_nodes_throughput_small_load():
"""
Throughput: Multiple small batches in quick succession should all succeed.
"""
logger = DummyLogger()
db = DummyDB(results=[{"_key": "x"}])
service = BaseArangoService(logger, None, None)
service.db = db
@pytest.mark.asyncio
async def test_batch_upsert_nodes_throughput_medium_load():
"""
Throughput: Several medium-sized batches in parallel should all succeed.
"""
logger = DummyLogger()
db = DummyDB(results=[{"_key": str(i)} for i in range(50)])
service = BaseArangoService(logger, None, None)
service.db = db
@pytest.mark.asyncio
async def test_batch_upsert_nodes_throughput_high_volume():
"""
Throughput: High-volume (but bounded) concurrent upserts.
"""
logger = DummyLogger()
db = DummyDB(results=[{"_key": str(i)} for i in range(100)])
service = BaseArangoService(logger, None, None)
service.db = db
@pytest.mark.asyncio
async def test_batch_upsert_nodes_throughput_mixed_sizes():
"""
Throughput: Mixed batch sizes in parallel should all succeed.
"""
logger = DummyLogger()
db = DummyDB(results=[{"_key": str(i)} for i in range(100)])
service = BaseArangoService(logger, None, None)
service.db = db
codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
import asyncio # used to run async functions
from typing import Dict, List, Optional
from unittest.mock import AsyncMock, MagicMock
import pytest # used for our unit tests
from app.connectors.services.base_arango_service import BaseArangoService
----------- Unit Tests -----------
@pytest.fixture
def arango_service():
# Setup a mock logger
logger = MagicMock()
# Setup a mock arango client and config service (not used in function)
arango_client = MagicMock()
config_service = MagicMock()
# Setup a mock db with aql.execute that returns a list of nodes
mock_db = MagicMock()
mock_db.aql.execute = MagicMock()
service = BaseArangoService(logger, arango_client, config_service)
service.db = mock_db
return service, logger, mock_db
----------- 1. Basic Test Cases -----------
@pytest.mark.asyncio
async def test_batch_upsert_nodes_basic_success(arango_service):
"""Test basic successful upsert of nodes."""
service, logger, mock_db = arango_service
# Simulate db.aql.execute returning an iterable with one new node
mock_db.aql.execute.return_value = [{'_key': 'abc', 'name': 'test'}]
nodes = [{'_key': 'abc', 'name': 'test'}]
collection = 'test_collection'
result = await service.batch_upsert_nodes(nodes, collection)
# Check logger called with info
logger.info.assert_any_call("🚀 Batch upserting nodes: %s", collection)
logger.info.assert_any_call(
"✅ Successfully upserted %d nodes in collection '%s'.",
1,
collection,
)
# Check db.aql.execute called with correct query and bind_vars
mock_db.aql.execute.assert_called_once()
args, kwargs = mock_db.aql.execute.call_args
@pytest.mark.asyncio
async def test_batch_upsert_nodes_empty_nodes(arango_service):
"""Test upsert with empty nodes list."""
service, logger, mock_db = arango_service
mock_db.aql.execute.return_value = []
nodes = []
collection = 'test_collection'
result = await service.batch_upsert_nodes(nodes, collection)
logger.info.assert_any_call(
"✅ Successfully upserted %d nodes in collection '%s'.",
0,
collection,
)
mock_db.aql.execute.assert_called_once()
@pytest.mark.asyncio
async def test_batch_upsert_nodes_multiple_nodes(arango_service):
"""Test upsert with multiple nodes."""
service, logger, mock_db = arango_service
mock_db.aql.execute.return_value = [
{'_key': 'a', 'name': 'A'},
{'_key': 'b', 'name': 'B'},
{'_key': 'c', 'name': 'C'}
]
nodes = [
{'_key': 'a', 'name': 'A'},
{'_key': 'b', 'name': 'B'},
{'_key': 'c', 'name': 'C'}
]
collection = 'test_collection'
result = await service.batch_upsert_nodes(nodes, collection)
logger.info.assert_any_call(
"✅ Successfully upserted %d nodes in collection '%s'.",
3,
collection,
)
mock_db.aql.execute.assert_called_once()
----------- 2. Edge Test Cases -----------
@pytest.mark.asyncio
async def test_batch_upsert_nodes_exception_no_transaction(arango_service):
"""Test error handling when db.aql.execute raises and no transaction is provided."""
service, logger, mock_db = arango_service
mock_db.aql.execute.side_effect = Exception("DB error")
nodes = [{'_key': 'error', 'name': 'fail'}]
collection = 'test_collection'
result = await service.batch_upsert_nodes(nodes, collection)
logger.error.assert_called_with("❌ Batch upsert failed: %s", "DB error")
@pytest.mark.asyncio
async def test_batch_upsert_nodes_concurrent_calls(arango_service):
"""Test concurrent execution of batch_upsert_nodes."""
service, logger, mock_db = arango_service
# Each call returns different results
mock_db.aql.execute.side_effect = [
[{'_key': '1'}],
[{'_key': '2'}],
[{'_key': '3'}]
]
nodes_list = [
[{'_key': '1'}],
[{'_key': '2'}],
[{'_key': '3'}]
]
collection = 'test_collection'
tasks = [
service.batch_upsert_nodes(nodes, collection)
for nodes in nodes_list
]
results = await asyncio.gather(*tasks)
@pytest.mark.asyncio
async def test_batch_upsert_nodes_missing_key_field(arango_service):
"""Test upsert with nodes missing _key field."""
service, logger, mock_db = arango_service
mock_db.aql.execute.return_value = [{'name': 'no_key'}]
nodes = [{'name': 'no_key'}]
collection = 'test_collection'
result = await service.batch_upsert_nodes(nodes, collection)
mock_db.aql.execute.assert_called_once()
----------- 3. Large Scale Test Cases -----------
@pytest.mark.asyncio
async def test_batch_upsert_nodes_large_batch(arango_service):
"""Test upsert with a large batch of nodes (hundreds)."""
service, logger, mock_db = arango_service
# Simulate 500 nodes returned
mock_db.aql.execute.return_value = [{'_key': str(i), 'name': f'Node{i}'} for i in range(500)]
nodes = [{'_key': str(i), 'name': f'Node{i}'} for i in range(500)]
collection = 'test_collection'
result = await service.batch_upsert_nodes(nodes, collection)
logger.info.assert_any_call(
"✅ Successfully upserted %d nodes in collection '%s'.",
500,
collection,
)
mock_db.aql.execute.assert_called_once()
@pytest.mark.asyncio
async def test_batch_upsert_nodes_large_concurrent_batches(arango_service):
"""Test multiple large concurrent batch upserts."""
service, logger, mock_db = arango_service
# Each concurrent call returns 100 nodes
mock_db.aql.execute.side_effect = [
[{'_key': str(i)} for i in range(100)],
[{'_key': str(i)} for i in range(100, 200)]
]
nodes1 = [{'_key': str(i)} for i in range(100)]
nodes2 = [{'_key': str(i)} for i in range(100, 200)]
collection = 'test_collection'
results = await asyncio.gather(
service.batch_upsert_nodes(nodes1, collection),
service.batch_upsert_nodes(nodes2, collection)
)
----------- 4. Throughput Test Cases -----------
@pytest.mark.asyncio
async def test_batch_upsert_nodes_throughput_small_load(arango_service):
"""Throughput: Test small load (10 nodes)."""
service, logger, mock_db = arango_service
mock_db.aql.execute.return_value = [{'_key': str(i)} for i in range(10)]
nodes = [{'_key': str(i)} for i in range(10)]
collection = 'test_collection'
result = await service.batch_upsert_nodes(nodes, collection)
logger.info.assert_any_call(
"✅ Successfully upserted %d nodes in collection '%s'.",
10,
collection,
)
@pytest.mark.asyncio
async def test_batch_upsert_nodes_throughput_medium_load(arango_service):
"""Throughput: Test medium load (100 nodes)."""
service, logger, mock_db = arango_service
mock_db.aql.execute.return_value = [{'_key': str(i)} for i in range(100)]
nodes = [{'_key': str(i)} for i in range(100)]
collection = 'test_collection'
result = await service.batch_upsert_nodes(nodes, collection)
logger.info.assert_any_call(
"✅ Successfully upserted %d nodes in collection '%s'.",
100,
collection,
)
@pytest.mark.asyncio
async def test_batch_upsert_nodes_throughput_high_volume(arango_service):
"""Throughput: Test high volume load (500 nodes)."""
service, logger, mock_db = arango_service
mock_db.aql.execute.return_value = [{'_key': str(i)} for i in range(500)]
nodes = [{'_key': str(i)} for i in range(500)]
collection = 'test_collection'
result = await service.batch_upsert_nodes(nodes, collection)
logger.info.assert_any_call(
"✅ Successfully upserted %d nodes in collection '%s'.",
500,
collection,
)
@pytest.mark.asyncio
async def test_batch_upsert_nodes_throughput_sustained_concurrent_load(arango_service):
"""Throughput: Test sustained concurrent load (10 concurrent batches of 50 nodes each)."""
service, logger, mock_db = arango_service
# Each call returns 50 nodes
mock_db.aql.execute.side_effect = [
[{'key': str(i)} for i in range(j*50, (j+1)*50)]
for j in range(10)
]
collections = [f"collection{j}" for j in range(10)]
nodes_batches = [
[{'_key': str(i)} for i in range(j*50, (j+1)*50)]
for j in range(10)
]
tasks = [
service.batch_upsert_nodes(nodes_batches[j], collections[j])
for j in range(10)
]
results = await asyncio.gather(*tasks)
codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
To edit these changes
git checkout codeflash/optimize-BaseArangoService.batch_upsert_nodes-mhxod841and push.