Skip to content

Commit

Permalink
feat: add functions to detect cycles in Graph (#3327)
Browse files Browse the repository at this point in the history
* feat: Add functions to detect cycles in directed graphs.

* test: Add new test cases for cycle detection in graph utils.

* test: temporarily disable test

---------

Co-authored-by: italojohnny <italojohnnydosanjos@gmail.com>
  • Loading branch information
ogabrielluiz and italojohnny authored Aug 14, 2024
1 parent 0e2f277 commit 77b0711
Show file tree
Hide file tree
Showing 3 changed files with 309 additions and 1 deletion.
126 changes: 125 additions & 1 deletion src/backend/base/langflow/graph/graph/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import copy
from collections import deque
from collections import defaultdict, deque
from typing import Dict, List

PRIORITY_LIST_OF_INPUTS = ["webhook", "chat"]
Expand Down Expand Up @@ -282,3 +282,127 @@ def sort_up_to_vertex(graph: Dict[str, Dict[str, List[str]]], vertex_id: str, is
excluded.add(succ_id)

return list(visited)


def has_cycle(vertex_ids: list[str], edges: list[tuple[str, str]]) -> bool:
"""
Determines whether a directed graph represented by a list of vertices and edges contains a cycle.
Args:
vertex_ids (list[str]): A list of vertex IDs.
edges (list[tuple[str, str]]): A list of tuples representing directed edges between vertices.
Returns:
bool: True if the graph contains a cycle, False otherwise.
"""
# Build the graph as an adjacency list
graph = defaultdict(list)
for u, v in edges:
graph[u].append(v)

# Utility function to perform DFS
def dfs(v, visited, rec_stack):
visited.add(v)
rec_stack.add(v)

for neighbor in graph[v]:
if neighbor not in visited:
if dfs(neighbor, visited, rec_stack):
return True
elif neighbor in rec_stack:
return True

rec_stack.remove(v)
return False

visited: set[str] = set()
rec_stack: set[str] = set()

for vertex in vertex_ids:
if vertex not in visited:
if dfs(vertex, visited, rec_stack):
return True

return False


def find_cycle_edge(entry_point: str, edges: list[tuple[str, str]]) -> tuple[str, str]:
"""
Find the edge that causes a cycle in a directed graph starting from a given entry point.
Args:
entry_point (str): The vertex ID from which to start the search.
edges (list[tuple[str, str]]): A list of tuples representing directed edges between vertices.
Returns:
tuple[str, str]: A tuple representing the edge that causes a cycle, or None if no cycle is found.
"""
# Build the graph as an adjacency list
graph = defaultdict(list)
for u, v in edges:
graph[u].append(v)

# Utility function to perform DFS
def dfs(v, visited, rec_stack):
visited.add(v)
rec_stack.add(v)

for neighbor in graph[v]:
if neighbor not in visited:
result = dfs(neighbor, visited, rec_stack)
if result:
return result
elif neighbor in rec_stack:
return (v, neighbor) # This edge causes the cycle

rec_stack.remove(v)
return None

visited: set[str] = set()
rec_stack: set[str] = set()

return dfs(entry_point, visited, rec_stack)


def find_all_cycle_edges(entry_point: str, edges: list[tuple[str, str]]) -> list[tuple[str, str]]:
"""
Find all edges that cause cycles in a directed graph starting from a given entry point.
Args:
entry_point (str): The vertex ID from which to start the search.
edges (list[tuple[str, str]]): A list of tuples representing directed edges between vertices.
Returns:
list[tuple[str, str]]: A list of tuples representing edges that cause cycles.
"""
# Build the graph as an adjacency list
graph = defaultdict(list)
for u, v in edges:
graph[u].append(v)

# Utility function to perform DFS
def dfs(v, visited, rec_stack):
visited.add(v)
rec_stack.add(v)

cycle_edges = []

for neighbor in graph[v]:
if neighbor not in visited:
cycle_edges += dfs(neighbor, visited, rec_stack)
elif neighbor in rec_stack:
cycle_edges.append((v, neighbor)) # This edge causes a cycle

rec_stack.remove(v)
return cycle_edges

visited: set[str] = set()
rec_stack: set[str] = set()

return dfs(entry_point, visited, rec_stack)


def should_continue(yielded_counts: dict[str, int], max_iterations: int | None) -> bool:
if max_iterations is None:
return True
return max(yielded_counts.values(), default=0) <= max_iterations
183 changes: 183 additions & 0 deletions src/backend/tests/unit/graph/graph/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
from langflow.graph.graph import utils


@pytest.fixture
def client():
pass


@pytest.fixture
def graph():
return {
Expand Down Expand Up @@ -120,3 +125,181 @@ def test_sort_up_to_vertex_invalid_vertex(graph):

with pytest.raises(ValueError):
utils.sort_up_to_vertex(graph, vertex_id)


def test_has_cycle():
edges = [("A", "B"), ("B", "C"), ("C", "D"), ("D", "E"), ("E", "B")]
vertices = ["A", "B", "C", "D", "E"]
assert utils.has_cycle(vertices, edges) is True


class TestFindCycleEdge:
# Detects a cycle in a simple directed graph
def test_detects_cycle_in_simple_graph(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("C", "A")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("C", "A")

# Returns None when no cycle is present
def test_returns_none_when_no_cycle(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C")]
result = utils.find_cycle_edge(entry_point, edges)
assert result is None

# Correctly identifies the first cycle encountered
def test_identifies_first_cycle(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("C", "A"), ("A", "D"), ("D", "E"), ("E", "A")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("C", "A")

# Handles graphs with multiple edges between the same nodes
def test_multiple_edges_between_same_nodes(self):
entry_point = "A"
edges = [("A", "B"), ("A", "B"), ("B", "C"), ("C", "A")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("C", "A")

# Processes graphs with multiple disconnected components
def test_disconnected_components(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("D", "E"), ("E", "F"), ("F", "D")]
result = utils.find_cycle_edge(entry_point, edges)
assert result is None

# Handles an empty list of edges
def test_empty_edges_list(self):
entry_point = "A"
edges = []
result = utils.find_cycle_edge(entry_point, edges)
assert result is None

# Manages a graph with a single node and no edges
def test_single_node_no_edges(self):
entry_point = "A"
edges = []
result = utils.find_cycle_edge(entry_point, edges)
assert result is None

# Detects cycles in graphs with self-loops
def test_self_loop_cycle(self):
entry_point = "A"
edges = [("A", "A")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("A", "A")

# Handles graphs with multiple cycles
def test_multiple_cycles(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("C", "A"), ("B", "D"), ("D", "B")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("C", "A")

# Processes graphs with nodes having no outgoing edges
def test_nodes_with_no_outgoing_edges(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C")]
result = utils.find_cycle_edge(entry_point, edges)
assert result is None

# Handles large graphs efficiently
def test_large_graph_efficiency(self):
entry_point = "0"
edges = [(str(i), str(i + 1)) for i in range(1000)] + [("999", "0")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("999", "0")

# Manages graphs with duplicate edges
def test_duplicate_edges(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("C", "A"), ("C", "A")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("C", "A")


class TestFindAllCycleEdges:
# Detects cycles in a simple directed graph
def test_detects_cycles_in_simple_graph(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("C", "A")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == [("C", "A")]

# Identifies multiple cycles in a complex graph
def test_identifies_multiple_cycles(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("C", "A"), ("B", "D"), ("D", "B")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert set(result) == {("C", "A"), ("D", "B")}

# Returns an empty list when no cycles are present
def test_no_cycles_present(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == []

# Handles graphs with a single node and no edges
def test_single_node_no_edges(self):
entry_point = "A"
edges = []
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == []

# Processes graphs with disconnected components
def test_disconnected_components(self):
entry_point = "A"
edges = [("A", "B"), ("C", "D")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == []

# Handles graphs with self-loops
def test_self_loops(self):
entry_point = "A"
edges = [("A", "A")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == [("A", "A")]

# Manages graphs with multiple edges between the same nodes
def test_multiple_edges_between_same_nodes(self):
entry_point = "A"
edges = [("A", "B"), ("A", "B"), ("B", "C"), ("C", "A")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == [("C", "A")]

# Processes graphs with nodes having no outgoing edges
def test_nodes_with_no_outgoing_edges(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == []

# Handles large graphs efficiently
def test_large_graphs_efficiency(self):
entry_point = "A"
edges = [(chr(65 + i), chr(65 + (i + 1) % 26)) for i in range(1000)]
result = utils.find_all_cycle_edges(entry_point, edges)
assert isinstance(result, list)

# Manages graphs with nodes having no incoming edges
def test_nodes_with_no_incoming_edges(self):
entry_point = "A"
edges = [("B", "C"), ("C", "D")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == []

# Handles graphs with mixed data types in edges
def test_mixed_data_types_in_edges(self):
entry_point = 1
edges = [(1, 2), (2, 3), (3, 1)]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == [(3, 1)]

# Processes graphs with duplicate edges
def test_duplicate_edges(self):
entry_point = "A"
edges = [("A", "B"), ("A", "B"), ("B", "C"), ("C", "A"), ("C", "A")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert set(result) == {("C", "A")}
1 change: 1 addition & 0 deletions src/backend/tests/unit/services/variable/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def session():
yield session


@pytest.mark.skip(reason="Temporarily disabled")
def test_initialize_user_variables__donkey(service, session):
user_id = uuid4()
name = "OPENAI_API_KEY"
Expand Down

0 comments on commit 77b0711

Please sign in to comment.