Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add functions to detect cycles in Graph #3327

Merged
merged 3 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 125 additions & 1 deletion src/backend/base/langflow/graph/graph/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import copy
from collections import deque
from collections import defaultdict, deque
from typing import Dict, List

PRIORITY_LIST_OF_INPUTS = ["webhook", "chat"]
Expand Down Expand Up @@ -282,3 +282,127 @@ def sort_up_to_vertex(graph: Dict[str, Dict[str, List[str]]], vertex_id: str, is
excluded.add(succ_id)

return list(visited)


def has_cycle(vertex_ids: list[str], edges: list[tuple[str, str]]) -> bool:
"""
Determines whether a directed graph represented by a list of vertices and edges contains a cycle.

Args:
vertex_ids (list[str]): A list of vertex IDs.
edges (list[tuple[str, str]]): A list of tuples representing directed edges between vertices.

Returns:
bool: True if the graph contains a cycle, False otherwise.
"""
# Build the graph as an adjacency list
graph = defaultdict(list)
for u, v in edges:
graph[u].append(v)

# Utility function to perform DFS
def dfs(v, visited, rec_stack):
visited.add(v)
rec_stack.add(v)

for neighbor in graph[v]:
if neighbor not in visited:
if dfs(neighbor, visited, rec_stack):
return True
elif neighbor in rec_stack:
return True

rec_stack.remove(v)
return False

visited: set[str] = set()
rec_stack: set[str] = set()

for vertex in vertex_ids:
if vertex not in visited:
if dfs(vertex, visited, rec_stack):
return True

return False


def find_cycle_edge(entry_point: str, edges: list[tuple[str, str]]) -> tuple[str, str]:
"""
Find the edge that causes a cycle in a directed graph starting from a given entry point.

Args:
entry_point (str): The vertex ID from which to start the search.
edges (list[tuple[str, str]]): A list of tuples representing directed edges between vertices.

Returns:
tuple[str, str]: A tuple representing the edge that causes a cycle, or None if no cycle is found.
"""
# Build the graph as an adjacency list
graph = defaultdict(list)
for u, v in edges:
graph[u].append(v)

# Utility function to perform DFS
def dfs(v, visited, rec_stack):
visited.add(v)
rec_stack.add(v)

for neighbor in graph[v]:
if neighbor not in visited:
result = dfs(neighbor, visited, rec_stack)
if result:
return result
elif neighbor in rec_stack:
return (v, neighbor) # This edge causes the cycle

rec_stack.remove(v)
return None

visited: set[str] = set()
rec_stack: set[str] = set()

return dfs(entry_point, visited, rec_stack)


def find_all_cycle_edges(entry_point: str, edges: list[tuple[str, str]]) -> list[tuple[str, str]]:
"""
Find all edges that cause cycles in a directed graph starting from a given entry point.

Args:
entry_point (str): The vertex ID from which to start the search.
edges (list[tuple[str, str]]): A list of tuples representing directed edges between vertices.

Returns:
list[tuple[str, str]]: A list of tuples representing edges that cause cycles.
"""
# Build the graph as an adjacency list
graph = defaultdict(list)
for u, v in edges:
graph[u].append(v)

# Utility function to perform DFS
def dfs(v, visited, rec_stack):
visited.add(v)
rec_stack.add(v)

cycle_edges = []

for neighbor in graph[v]:
if neighbor not in visited:
cycle_edges += dfs(neighbor, visited, rec_stack)
elif neighbor in rec_stack:
cycle_edges.append((v, neighbor)) # This edge causes a cycle

rec_stack.remove(v)
return cycle_edges

visited: set[str] = set()
rec_stack: set[str] = set()

return dfs(entry_point, visited, rec_stack)


def should_continue(yielded_counts: dict[str, int], max_iterations: int | None) -> bool:
if max_iterations is None:
return True
return max(yielded_counts.values(), default=0) <= max_iterations
183 changes: 183 additions & 0 deletions src/backend/tests/unit/graph/graph/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
from langflow.graph.graph import utils


@pytest.fixture
def client():
pass


@pytest.fixture
def graph():
return {
Expand Down Expand Up @@ -120,3 +125,181 @@ def test_sort_up_to_vertex_invalid_vertex(graph):

with pytest.raises(ValueError):
utils.sort_up_to_vertex(graph, vertex_id)


def test_has_cycle():
edges = [("A", "B"), ("B", "C"), ("C", "D"), ("D", "E"), ("E", "B")]
vertices = ["A", "B", "C", "D", "E"]
assert utils.has_cycle(vertices, edges) is True


class TestFindCycleEdge:
# Detects a cycle in a simple directed graph
def test_detects_cycle_in_simple_graph(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("C", "A")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("C", "A")

# Returns None when no cycle is present
def test_returns_none_when_no_cycle(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C")]
result = utils.find_cycle_edge(entry_point, edges)
assert result is None

# Correctly identifies the first cycle encountered
def test_identifies_first_cycle(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("C", "A"), ("A", "D"), ("D", "E"), ("E", "A")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("C", "A")

# Handles graphs with multiple edges between the same nodes
def test_multiple_edges_between_same_nodes(self):
entry_point = "A"
edges = [("A", "B"), ("A", "B"), ("B", "C"), ("C", "A")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("C", "A")

# Processes graphs with multiple disconnected components
def test_disconnected_components(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("D", "E"), ("E", "F"), ("F", "D")]
result = utils.find_cycle_edge(entry_point, edges)
assert result is None

# Handles an empty list of edges
def test_empty_edges_list(self):
entry_point = "A"
edges = []
result = utils.find_cycle_edge(entry_point, edges)
assert result is None

# Manages a graph with a single node and no edges
def test_single_node_no_edges(self):
entry_point = "A"
edges = []
result = utils.find_cycle_edge(entry_point, edges)
assert result is None

# Detects cycles in graphs with self-loops
def test_self_loop_cycle(self):
entry_point = "A"
edges = [("A", "A")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("A", "A")

# Handles graphs with multiple cycles
def test_multiple_cycles(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("C", "A"), ("B", "D"), ("D", "B")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("C", "A")

# Processes graphs with nodes having no outgoing edges
def test_nodes_with_no_outgoing_edges(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C")]
result = utils.find_cycle_edge(entry_point, edges)
assert result is None

# Handles large graphs efficiently
def test_large_graph_efficiency(self):
entry_point = "0"
edges = [(str(i), str(i + 1)) for i in range(1000)] + [("999", "0")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("999", "0")

# Manages graphs with duplicate edges
def test_duplicate_edges(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("C", "A"), ("C", "A")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("C", "A")


class TestFindAllCycleEdges:
# Detects cycles in a simple directed graph
def test_detects_cycles_in_simple_graph(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("C", "A")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == [("C", "A")]

# Identifies multiple cycles in a complex graph
def test_identifies_multiple_cycles(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("C", "A"), ("B", "D"), ("D", "B")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert set(result) == {("C", "A"), ("D", "B")}

# Returns an empty list when no cycles are present
def test_no_cycles_present(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == []

# Handles graphs with a single node and no edges
def test_single_node_no_edges(self):
entry_point = "A"
edges = []
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == []

# Processes graphs with disconnected components
def test_disconnected_components(self):
entry_point = "A"
edges = [("A", "B"), ("C", "D")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == []

# Handles graphs with self-loops
def test_self_loops(self):
entry_point = "A"
edges = [("A", "A")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == [("A", "A")]

# Manages graphs with multiple edges between the same nodes
def test_multiple_edges_between_same_nodes(self):
entry_point = "A"
edges = [("A", "B"), ("A", "B"), ("B", "C"), ("C", "A")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == [("C", "A")]

# Processes graphs with nodes having no outgoing edges
def test_nodes_with_no_outgoing_edges(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == []

# Handles large graphs efficiently
def test_large_graphs_efficiency(self):
entry_point = "A"
edges = [(chr(65 + i), chr(65 + (i + 1) % 26)) for i in range(1000)]
result = utils.find_all_cycle_edges(entry_point, edges)
assert isinstance(result, list)

# Manages graphs with nodes having no incoming edges
def test_nodes_with_no_incoming_edges(self):
entry_point = "A"
edges = [("B", "C"), ("C", "D")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == []

# Handles graphs with mixed data types in edges
def test_mixed_data_types_in_edges(self):
entry_point = 1
edges = [(1, 2), (2, 3), (3, 1)]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == [(3, 1)]

# Processes graphs with duplicate edges
def test_duplicate_edges(self):
entry_point = "A"
edges = [("A", "B"), ("A", "B"), ("B", "C"), ("C", "A"), ("C", "A")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert set(result) == {("C", "A")}
1 change: 1 addition & 0 deletions src/backend/tests/unit/services/variable/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def session():
yield session


@pytest.mark.skip(reason="Temporarily disabled")
def test_initialize_user_variables__donkey(service, session):
user_id = uuid4()
name = "OPENAI_API_KEY"
Expand Down
Loading