Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: nebula graph add time label #1383

Merged
merged 4 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 89 additions & 20 deletions camel/storages/graph_storages/nebula_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import logging
import re
import time
from typing import TYPE_CHECKING, Any, Dict, List, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple

from camel.storages.graph_storages.base import BaseGraphStorage
from camel.storages.graph_storages.graph_element import (
Expand Down Expand Up @@ -203,46 +203,62 @@ def add_graph_elements(
def ensure_edge_type_exists(
self,
edge_type: str,
time_label: Optional[str] = None,
) -> None:
r"""Ensures that a specified edge type exists in the NebulaGraph
database. If the edge type already exists, this method does nothing.

Args:
edge_type (str): The name of the edge type to be created.
time_label (str, optional): A specific timestamp to set as the
default value for the time label property. If not
provided, no timestamp will be added. (default: :obj:`None`)

Raises:
Exception: If the edge type creation fails after multiple retry
attempts, an exception is raised with the error message.
"""
create_edge_stmt = f'CREATE EDGE IF NOT EXISTS {edge_type}()'
create_edge_stmt = f"CREATE EDGE IF NOT EXISTS {edge_type} ()"
if time_label is not None:
time_label = self._validate_time_label(time_label)
create_edge_stmt = f"""CREATE EDGE IF NOT EXISTS {edge_type}
(time_label DATETIME DEFAULT {time_label})"""

for attempt in range(MAX_RETRIES):
res = self.query(create_edge_stmt)
if res.is_succeeded():
return # Tag creation succeeded, exit the method
return # Edge type creation succeeded

if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_DELAY)
else:
# Final attempt failed, raise an exception
raise Exception(
f"Failed to create tag `{edge_type}` after "
f"Failed to create edge type `{edge_type}` after "
f"{MAX_RETRIES} attempts: {res.error_msg()}"
)

def ensure_tag_exists(self, tag_name: str) -> None:
def ensure_tag_exists(
self, tag_name: str, time_label: Optional[str] = None
) -> None:
r"""Ensures a tag is created in the NebulaGraph database. If the tag
already exists, it does nothing.

Args:
tag_name (str): The name of the tag to be created.
time_label (str, optional): A specific timestamp to set as the
default value for the time label property. If not provided,
no timestamp will be added. (default: :obj:`None`)

Raises:
Exception: If the tag creation fails after retries, an exception
is raised with the error message.
"""

create_tag_stmt = f'CREATE TAG IF NOT EXISTS {tag_name}()'
create_tag_stmt = f"CREATE TAG IF NOT EXISTS {tag_name} ()"
if time_label is not None:
time_label = self._validate_time_label(time_label)
create_tag_stmt = f"""CREATE TAG IF NOT EXISTS {tag_name}
(time_label DATETIME DEFAULT {time_label})"""

for attempt in range(MAX_RETRIES):
res = self.query(create_tag_stmt)
Expand All @@ -262,27 +278,39 @@ def add_node(
self,
node_id: str,
tag_name: str,
time_label: Optional[str] = None,
) -> None:
r"""Add a node with the specified tag and properties.

Args:
node_id (str): The ID of the node.
tag_name (str): The tag name of the node.
time_label (str, optional): A specific timestamp to set for
the node's time label property. If not provided, no timestamp
will be added. (default: :obj:`None`)
"""
node_id = re.sub(r'[^a-zA-Z0-9\u4e00-\u9fa5]', '', node_id)
tag_name = re.sub(r'[^a-zA-Z0-9\u4e00-\u9fa5]', '', tag_name)

self.ensure_tag_exists(tag_name)
self.ensure_tag_exists(tag_name, time_label)

# Insert node without properties
insert_stmt = (
f'INSERT VERTEX IF NOT EXISTS {tag_name}() VALUES "{node_id}":()'
)
# Insert node with or without time_label property
if time_label is not None:
time_label = self._validate_time_label(time_label)
insert_stmt = (
f'INSERT VERTEX IF NOT EXISTS {tag_name}(time_label) VALUES '
f'"{node_id}":("{time_label}")'
)
else:
insert_stmt = (
f'INSERT VERTEX IF NOT EXISTS {tag_name}() VALUES '
f'"{node_id}":()'
)

for attempt in range(MAX_RETRIES):
res = self.query(insert_stmt)
if res.is_succeeded():
return # Tag creation succeeded, exit the method
return # Node creation succeeded, exit the method

if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_DELAY)
Expand Down Expand Up @@ -348,7 +376,7 @@ def refresh_schema(self) -> None:
@property
def get_structured_schema(self) -> Dict[str, Any]:
r"""Generates a structured schema consisting of node and relationship
properties, relationships, and metadata.
properties, relationships, and metadata, including timestamps.

Returns:
Dict[str, Any]: A dictionary representing the structured schema.
Expand Down Expand Up @@ -419,6 +447,7 @@ def add_triplet(
subj: str,
obj: str,
rel: str,
time_label: Optional[str] = None,
) -> None:
r"""Adds a relationship (triplet) between two entities in the Nebula
Graph database.
Expand All @@ -427,28 +456,44 @@ def add_triplet(
subj (str): The identifier for the subject entity.
obj (str): The identifier for the object entity.
rel (str): The relationship between the subject and object.
time_label (str, optional): A specific timestamp to set for the
time label property of the relationship. If not provided,
no timestamp will be added. (default: :obj:`None`)

Raises:
ValueError: If the time_label format is invalid.
Exception: If creating the relationship fails.
"""
subj = re.sub(r'[^a-zA-Z0-9\u4e00-\u9fa5]', '', subj)
obj = re.sub(r'[^a-zA-Z0-9\u4e00-\u9fa5]', '', obj)
rel = re.sub(r'[^a-zA-Z0-9\u4e00-\u9fa5]', '', rel)

self.ensure_tag_exists(subj)
self.ensure_tag_exists(obj)
self.ensure_edge_type_exists(rel)
self.ensure_edge_type_exists(rel, time_label)
self.add_node(node_id=subj, tag_name=subj)
self.add_node(node_id=obj, tag_name=obj)

# Avoid latenicy
# Avoid latency
time.sleep(1)

insert_stmt = (
f'INSERT EDGE IF NOT EXISTS {rel}() VALUES "{subj}"->"{obj}":();'
)
# Create edge with or without time_label property
if time_label is not None:
time_label = self._validate_time_label(time_label)
insert_stmt = (
f'INSERT EDGE IF NOT EXISTS {rel}(time_label) VALUES '
f'"{subj}"->"{obj}":("{time_label}")'
)
else:
insert_stmt = (
f'INSERT EDGE IF NOT EXISTS {rel}() VALUES '
f'"{subj}"->"{obj}":()'
)

res = self.query(insert_stmt)
if not res.is_succeeded():
raise Exception(
f'create relationship `]{subj}` -> `{obj}`'
f'create relationship `{subj}` -> `{obj}`'
+ f'failed: {res.error_msg()}'
)

Expand Down Expand Up @@ -568,3 +613,27 @@ def get_relationship_properties(
)

return rel_schema_props, rel_structure_props

def _validate_time_label(self, time_label: str) -> str:
r"""Validates the format of a time label string.

Args:
time_label (str): The time label string to validate.
Should be in format 'YYYY-MM-DDThh:mm:ss'.

Returns:
str: The validated time label.

Raises:
ValueError: If the time label format is invalid.
"""
try:
# Check if the format matches YYYY-MM-DDThh:mm:ss
pattern = r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$'
if not re.match(pattern, time_label):
raise ValueError(
"Time label must be in format 'YYYY-MM-DDThh:mm:ss'"
)
return time_label
except Exception as e:
raise ValueError(f"Invalid time label format: {e!s}")
95 changes: 76 additions & 19 deletions test/storages/graph_storages/test_nebula_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import unittest
from unittest.mock import Mock, patch
from unittest.mock import Mock, call, patch

from unstructured.documents.elements import Element

Expand Down Expand Up @@ -106,7 +106,7 @@ def test_add_node(self):

self.graph.add_node(node_id, tag_name)

self.graph.ensure_tag_exists.assert_called_with(tag_name)
self.graph.ensure_tag_exists.assert_has_calls([call(tag_name, None)])
insert_stmt = (
f'INSERT VERTEX IF NOT EXISTS {tag_name}() VALUES "{node_id}":()'
)
Expand All @@ -121,7 +121,7 @@ def test_ensure_tag_exists_success(self):

self.graph.ensure_tag_exists(tag_name)

create_tag_stmt = f'CREATE TAG IF NOT EXISTS {tag_name}()'
create_tag_stmt = f'CREATE TAG IF NOT EXISTS {tag_name} ()'
self.graph.query.assert_called_with(create_tag_stmt)

@patch('time.sleep', return_value=None)
Expand Down Expand Up @@ -153,13 +153,12 @@ def test_add_triplet(self):

self.graph.add_triplet(subj, obj, rel)

self.graph.ensure_tag_exists.assert_any_call(subj)
self.graph.ensure_tag_exists.assert_any_call(obj)
self.graph.ensure_edge_type_exists.assert_called_with(rel)
self.graph.ensure_tag_exists.assert_has_calls([call(subj), call(obj)])
self.graph.ensure_edge_type_exists.assert_has_calls([call(rel, None)])
self.graph.add_node.assert_any_call(node_id=subj, tag_name=subj)
self.graph.add_node.assert_any_call(node_id=obj, tag_name=obj)
insert_stmt = (
f'INSERT EDGE IF NOT EXISTS {rel}() VALUES "{subj}"->"{obj}":();'
f'INSERT EDGE IF NOT EXISTS {rel}() VALUES "{subj}"->"{obj}":()'
)
self.graph.query.assert_called_with(insert_stmt)

Expand Down Expand Up @@ -401,24 +400,24 @@ def test_get_schema(self):
def test_get_structured_schema(self):
self.graph.get_node_properties = Mock(
return_value=(
['Node.prop'],
[{'labels': 'Node', 'properties': ['prop']}],
['Person.name', 'Person.age'],
[{'labels': 'Person', 'properties': ['name', 'age']}],
)
)
self.graph.get_relationship_properties = Mock(
return_value=(
['Rel.prop'],
[{'type': 'Rel', 'properties': ['prop']}],
['KNOWS.since'],
[{'type': 'KNOWS', 'properties': ['since']}],
)
)
self.graph.get_relationship_types = Mock(return_value=['RELATES_TO'])
self.graph.get_indexes = Mock(return_value=['index1'])
self.graph.get_relationship_types = Mock(return_value=['KNOWS'])
self.graph.get_indexes = Mock(return_value=[])
structured_schema = self.graph.get_structured_schema
expected_schema = {
"node_props": {'Node': ['prop']},
"rel_props": {'Rel': ['prop']},
"relationships": ['RELATES_TO'],
"metadata": {"index": ['index1']},
"node_props": {"Person": ["name", "age"]},
"rel_props": {"KNOWS": ["since"]},
"relationships": ["KNOWS"],
"metadata": {"index": []},
}
self.assertEqual(structured_schema, expected_schema)

Expand Down Expand Up @@ -465,6 +464,64 @@ def test_add_graph_elements(self):
'node1', 'node2', 'RELATES_TO'
)

def test_validate_time_label_valid(self):
valid_time = "2024-12-31T21:45:22"
result = self.graph._validate_time_label(valid_time)
self.assertEqual(result, valid_time)

def test_validate_time_label_none(self):
with self.assertRaises(ValueError):
self.graph._validate_time_label(None)

def test_add_triplet_with_time_label(self):
subj = 'node1'
obj = 'node2'
rel = 'RELATESTO'
time_label = '2024-12-31T21:45:22'

self.graph.ensure_tag_exists = Mock()
self.graph.ensure_edge_type_exists = Mock()
self.graph.add_node = Mock()
mock_result = Mock()
mock_result.is_succeeded.return_value = True
self.graph.query = Mock(return_value=mock_result)

self.graph.add_triplet(subj, obj, rel, time_label)

self.graph.ensure_tag_exists.assert_has_calls(
[call('node1'), call('node2')]
)
self.graph.ensure_edge_type_exists.assert_called_with(rel, time_label)
self.graph.add_node.assert_any_call(node_id=subj, tag_name=subj)
self.graph.add_node.assert_any_call(node_id=obj, tag_name=obj)

expected_stmt = (
f'INSERT EDGE IF NOT EXISTS {rel}(time_label) VALUES '
f'"{subj}"->"{obj}":("{time_label}")'
)
self.graph.query.assert_called_with(expected_stmt)

def test_add_triplet_with_invalid_time_label(self):
subj = 'node1'
obj = 'node2'
rel = 'RELATESTO'
invalid_time = '2024/12/31 21:45:22' # wrong format

with self.assertRaises(ValueError) as context:
self.graph.add_triplet(subj, obj, rel, invalid_time)

self.assertIn("Invalid time label format", str(context.exception))

def test_ensure_tag_exists_with_time_label(self):
tag_name = 'Tag1'
time_label = '2024-12-31T21:45:22'

mock_result = Mock()
mock_result.is_succeeded.return_value = True
self.graph.query = Mock(return_value=mock_result)

self.graph.ensure_tag_exists(tag_name, time_label)

if __name__ == '__main__':
unittest.main()
expected_stmt = f"""CREATE TAG IF NOT EXISTS {tag_name}
(time_label DATETIME DEFAULT {time_label})"""
self.graph.query.assert_called_with(expected_stmt)
Loading