Skip to content

Feature/parallelize random walks #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10"]
python-version: ["3.8", "3.9"]

steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,6 @@ healthchecksdb
MigrationBackup/

# Virtual environment
venv*
venv/

# End of https://www.gitignore.io/api/osx,python,pycharm,windows,visualstudio,visualstudiocode
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
default_language_version:
python: python3.10
python: python3.8

default_stages: [commit, push]

Expand Down
2 changes: 1 addition & 1 deletion cookiecutter-config-file.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ default_context:
project_description: "dynnode2vec is a package to embed dynamic graphs"
organization: "pedugnat"
license: "MIT"
minimal_python_version: 3.10
minimal_python_version: 3.8
github_name: "pedugnat"
email: " "
version: "0.1.0"
Expand Down
33 changes: 22 additions & 11 deletions dynnode2vec/biased_random_walk.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
Define a BiasedRandomWalk class to perform biased random walks over graphs.
"""
# pylint: disable=invalid-name
from typing import Any, Iterable
from typing import Any, Dict, Iterable, List, Union

import bisect
import random
from functools import partial
from multiprocessing import Pool

import networkx as nx
import numpy as np

RandomWalks = list[list[Any]]
RandomWalks = List[List[Any]]


class BiasedRandomWalk:
Expand All @@ -29,8 +30,8 @@ def __init__(self, graph: nx.Graph) -> None:
graph, ordering="default", label_attribute="true_label"
)

self.mapping: dict[int, Any] = nx.get_node_attributes(self.graph, "true_label")
self.reverse_mapping: dict[Any, int] = {
self.mapping: Dict[int, Any] = nx.get_node_attributes(self.graph, "true_label")
self.reverse_mapping: Dict[Any, int] = {
true_label: int_id for int_id, true_label in self.mapping.items()
}

Expand All @@ -41,7 +42,7 @@ def map_int_ids_to_true_ids(self, walks: RandomWalks) -> None:
for i, walk in enumerate(walks):
walks[i] = [self.mapping[int_id] for int_id in walk]

def convert_true_ids_to_int_ids(self, nodes: Iterable[Any]) -> list[int]:
def convert_true_ids_to_int_ids(self, nodes: Iterable[Any]) -> List[int]:
"""
Convert list of node labels to list of int ids.
"""
Expand Down Expand Up @@ -72,7 +73,7 @@ def _generate_walk(
iq: float,
weighted: bool,
rn: random.Random,
) -> list[int]:
) -> List[int]:
# pylint: disable=too-many-arguments, too-many-locals
"""
Generate a number of random walks starting from a given node.
Expand Down Expand Up @@ -123,7 +124,7 @@ def _generate_walk_simple(
iq: float,
weighted: bool,
rn: random.Random,
) -> list[int]:
) -> List[int]:
# pylint: disable=too-many-arguments
"""
Fast implementation for the scenario where:
Expand All @@ -145,19 +146,21 @@ def _generate_walk_simple(

def run(
self,
nodes: list[Any],
nodes: List[Any],
*,
n_walks: int = 10,
walk_length: int = 10,
p: float = 1.0,
q: float = 1.0,
weighted: bool = False,
seed: int | None = None,
seed: Union[int, None] = None,
n_processes: int = 1,
) -> RandomWalks:
"""
Perform a number of random walks for all the nodes of the graph. The
behavior of the random walk is mainly conditioned by two parameters p and q.
"""
# pylint: disable=too-many-locals
rn = random.Random(seed)

nodes = self.convert_true_ids_to_int_ids(nodes)
Expand All @@ -180,12 +183,20 @@ def run(
)

walks = []
connected_nodes = []

for node in nodes:
if self.graph.degree[node] == 0:
# the node has no neighbors, so the walk ends instantly
walks.extend([[node] for _ in range(n_walks)])
walks.append([node])
else:
walks.extend([generate_walk(node) for _ in range(n_walks)])
connected_nodes.append(node)

if n_processes > 1:
with Pool(n_processes) as pool:
walks.extend(pool.map(generate_walk, connected_nodes * n_walks))
else:
walks.extend([generate_walk(node) for node in connected_nodes * n_walks])

# map back the integer ids (used for speed) to the original node ids
self.map_int_ids_to_true_ids(walks)
Expand Down
20 changes: 10 additions & 10 deletions dynnode2vec/dynnode2vec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Define a DynNode2Vec class to run dynnode2vec algorithm over dynamic graphs.
"""
# pylint: disable=invalid-name
from typing import Any, Iterable
from typing import Any, Iterable, List, Optional, Set, Tuple

from collections import namedtuple
from itertools import chain, starmap
Expand Down Expand Up @@ -37,7 +37,7 @@ def __init__(
n_walks_per_node: int = 10,
embedding_size: int = 128,
window: int = 10,
seed: int | None = 0,
seed: Optional[int] = 0,
parallel_processes: int = 4,
plain_node2vec: bool = False,
):
Expand Down Expand Up @@ -69,7 +69,7 @@ def __init__(
isinstance(window, int) and embedding_size > 0
), "window should be a strictly positive integer"
assert (
isinstance(seed, int | None)
seed is None or isinstance(seed, int)
) and embedding_size > 0, "seed should be either None or int"
assert (
isinstance(parallel_processes, int) and 0 < parallel_processes < 128
Expand All @@ -90,8 +90,8 @@ def __init__(
self.gensim_workers = max(self.parallel_processes - 1, 12)

def _initialize_embeddings(
self, graphs: list[nx.Graph]
) -> tuple[Word2Vec, list[Embedding]]:
self, graphs: List[nx.Graph]
) -> Tuple[Word2Vec, List[Embedding]]:
"""
Compute normal node2vec embedding at timestep 0.
"""
Expand Down Expand Up @@ -120,7 +120,7 @@ def _initialize_embeddings(
return model, [embedding]

@staticmethod
def get_delta_nodes(current_graph: nx.Graph, previous_graph: nx.Graph) -> set[Any]:
def get_delta_nodes(current_graph: nx.Graph, previous_graph: nx.Graph) -> Set[Any]:
"""
Find nodes in the current graph which have been modified, i.e. they have been added,
or at least one of their edge have been updated.
Expand All @@ -144,7 +144,7 @@ def get_delta_nodes(current_graph: nx.Graph, previous_graph: nx.Graph) -> set[An
# Delta nodes are new nodes (V_add) and current nodes which edges have changed.
# Since we only care about nodes that have at least one edge, we can
# assume that V_add ⊆ {v_i ∈ V_t | ∃e_i = (v_i, v_j) ∈ (E_add ∪ E_del)}
delta_nodes: set[Any] = current_graph.nodes & nodes_with_modified_edges
delta_nodes: Set[Any] = current_graph.nodes & nodes_with_modified_edges

return delta_nodes

Expand Down Expand Up @@ -174,7 +174,7 @@ def generate_updated_walks(

return updated_walks

def _simulate_walks(self, graphs: list[nx.Graph]) -> Iterable[RandomWalks]:
def _simulate_walks(self, graphs: List[nx.Graph]) -> Iterable[RandomWalks]:
"""
Parallelize the generation of walks on the time steps graphs.
"""
Expand All @@ -186,7 +186,7 @@ def _simulate_walks(self, graphs: list[nx.Graph]) -> Iterable[RandomWalks]:

def _update_embeddings(
self,
embeddings: list[Embedding],
embeddings: List[Embedding],
time_walks: Iterable[RandomWalks],
model: Word2Vec,
) -> None:
Expand Down Expand Up @@ -226,7 +226,7 @@ def _update_embeddings(

embeddings.append(embedding)

def compute_embeddings(self, graphs: list[nx.Graph]) -> list[Embedding]:
def compute_embeddings(self, graphs: List[nx.Graph]) -> List[Embedding]:
"""
Compute dynamic embeddings on a list of graphs.
"""
Expand Down
6 changes: 4 additions & 2 deletions dynnode2vec/utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
"""
Utility file to define miscellaneous functions.
"""
from typing import List

import random

import networkx as nx


def sample_nodes(graph: nx.Graph, k: int) -> list[int]:
def sample_nodes(graph: nx.Graph, k: int) -> List[int]:
"""
Samples nodes randomly from a graph.
"""
Expand All @@ -15,7 +17,7 @@ def sample_nodes(graph: nx.Graph, k: int) -> list[int]:

def create_dynamic_graph(
n_base_nodes: int = 100, n_steps: int = 10, base_density: float = 0.01
) -> list[nx.Graph]:
) -> List[nx.Graph]:
"""
Creates a list of graphs representing the evolution of a dynamic graph,
i.e. graphs that each depend on the previous graph.
Expand Down
78 changes: 40 additions & 38 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading