Skip to content

Commit

Permalink
test: Add new test cases for cycle detection in graph utils.
Browse files Browse the repository at this point in the history
  • Loading branch information
ogabrielluiz committed Aug 14, 2024
1 parent 2ff3641 commit cf47a84
Showing 1 changed file with 183 additions and 0 deletions.
183 changes: 183 additions & 0 deletions src/backend/tests/unit/graph/graph/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
from langflow.graph.graph import utils


@pytest.fixture
def client():
pass


@pytest.fixture
def graph():
return {
Expand Down Expand Up @@ -120,3 +125,181 @@ def test_sort_up_to_vertex_invalid_vertex(graph):

with pytest.raises(ValueError):
utils.sort_up_to_vertex(graph, vertex_id)


def test_has_cycle():
edges = [("A", "B"), ("B", "C"), ("C", "D"), ("D", "E"), ("E", "B")]
vertices = ["A", "B", "C", "D", "E"]
assert utils.has_cycle(vertices, edges) is True


class TestFindCycleEdge:
# Detects a cycle in a simple directed graph
def test_detects_cycle_in_simple_graph(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("C", "A")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("C", "A")

# Returns None when no cycle is present
def test_returns_none_when_no_cycle(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C")]
result = utils.find_cycle_edge(entry_point, edges)
assert result is None

# Correctly identifies the first cycle encountered
def test_identifies_first_cycle(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("C", "A"), ("A", "D"), ("D", "E"), ("E", "A")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("C", "A")

# Handles graphs with multiple edges between the same nodes
def test_multiple_edges_between_same_nodes(self):
entry_point = "A"
edges = [("A", "B"), ("A", "B"), ("B", "C"), ("C", "A")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("C", "A")

# Processes graphs with multiple disconnected components
def test_disconnected_components(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("D", "E"), ("E", "F"), ("F", "D")]
result = utils.find_cycle_edge(entry_point, edges)
assert result is None

# Handles an empty list of edges
def test_empty_edges_list(self):
entry_point = "A"
edges = []
result = utils.find_cycle_edge(entry_point, edges)
assert result is None

# Manages a graph with a single node and no edges
def test_single_node_no_edges(self):
entry_point = "A"
edges = []
result = utils.find_cycle_edge(entry_point, edges)
assert result is None

# Detects cycles in graphs with self-loops
def test_self_loop_cycle(self):
entry_point = "A"
edges = [("A", "A")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("A", "A")

# Handles graphs with multiple cycles
def test_multiple_cycles(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("C", "A"), ("B", "D"), ("D", "B")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("C", "A")

# Processes graphs with nodes having no outgoing edges
def test_nodes_with_no_outgoing_edges(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C")]
result = utils.find_cycle_edge(entry_point, edges)
assert result is None

# Handles large graphs efficiently
def test_large_graph_efficiency(self):
entry_point = "0"
edges = [(str(i), str(i + 1)) for i in range(1000)] + [("999", "0")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("999", "0")

# Manages graphs with duplicate edges
def test_duplicate_edges(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("C", "A"), ("C", "A")]
result = utils.find_cycle_edge(entry_point, edges)
assert result == ("C", "A")


class TestFindAllCycleEdges:
# Detects cycles in a simple directed graph
def test_detects_cycles_in_simple_graph(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("C", "A")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == [("C", "A")]

# Identifies multiple cycles in a complex graph
def test_identifies_multiple_cycles(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C"), ("C", "A"), ("B", "D"), ("D", "B")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert set(result) == {("C", "A"), ("D", "B")}

# Returns an empty list when no cycles are present
def test_no_cycles_present(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == []

# Handles graphs with a single node and no edges
def test_single_node_no_edges(self):
entry_point = "A"
edges = []
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == []

# Processes graphs with disconnected components
def test_disconnected_components(self):
entry_point = "A"
edges = [("A", "B"), ("C", "D")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == []

# Handles graphs with self-loops
def test_self_loops(self):
entry_point = "A"
edges = [("A", "A")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == [("A", "A")]

# Manages graphs with multiple edges between the same nodes
def test_multiple_edges_between_same_nodes(self):
entry_point = "A"
edges = [("A", "B"), ("A", "B"), ("B", "C"), ("C", "A")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == [("C", "A")]

# Processes graphs with nodes having no outgoing edges
def test_nodes_with_no_outgoing_edges(self):
entry_point = "A"
edges = [("A", "B"), ("B", "C")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == []

# Handles large graphs efficiently
def test_large_graphs_efficiency(self):
entry_point = "A"
edges = [(chr(65 + i), chr(65 + (i + 1) % 26)) for i in range(1000)]
result = utils.find_all_cycle_edges(entry_point, edges)
assert isinstance(result, list)

# Manages graphs with nodes having no incoming edges
def test_nodes_with_no_incoming_edges(self):
entry_point = "A"
edges = [("B", "C"), ("C", "D")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == []

# Handles graphs with mixed data types in edges
def test_mixed_data_types_in_edges(self):
entry_point = 1
edges = [(1, 2), (2, 3), (3, 1)]
result = utils.find_all_cycle_edges(entry_point, edges)
assert result == [(3, 1)]

# Processes graphs with duplicate edges
def test_duplicate_edges(self):
entry_point = "A"
edges = [("A", "B"), ("A", "B"), ("B", "C"), ("C", "A"), ("C", "A")]
result = utils.find_all_cycle_edges(entry_point, edges)
assert set(result) == {("C", "A")}

0 comments on commit cf47a84

Please sign in to comment.