Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Adding config to nx-parallel #68

Closed
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a95ceb4
initial commit
Schefflera-Arboricola Jun 5, 2024
390078f
bug_fix
Schefflera-Arboricola Jun 5, 2024
27205d1
Merge branch 'networkx:main' into config_1
Schefflera-Arboricola Jun 24, 2024
ead0927
added default_config to get_info
Schefflera-Arboricola Jun 24, 2024
273d53d
added get_curr_configs
Schefflera-Arboricola Jun 24, 2024
a7ca076
updated configs
Schefflera-Arboricola Jun 24, 2024
4982fc0
rm 3 configs
Schefflera-Arboricola Jun 24, 2024
d4aaedb
Merge branch 'networkx:main' into config_1
Schefflera-Arboricola Jul 22, 2024
4465712
using joblib.parallel_config
Schefflera-Arboricola Jul 24, 2024
9ce30d7
indentation
Schefflera-Arboricola Jul 24, 2024
d7ba93d
updated cpu_count
Schefflera-Arboricola Jul 26, 2024
79e6881
integrating nx config
Schefflera-Arboricola Jul 27, 2024
ced3898
rm get_default_configs
Schefflera-Arboricola Jul 27, 2024
61dcbb7
renamed get_n_jobs
Schefflera-Arboricola Jul 27, 2024
788dbf8
n_job=-1s
Schefflera-Arboricola Jul 27, 2024
9085093
changing n_jobs=None
Schefflera-Arboricola Jul 27, 2024
9870bad
added backend=loky for testing
Schefflera-Arboricola Jul 27, 2024
f08780c
Merge branch 'networkx:main' into config_1
Schefflera-Arboricola Aug 1, 2024
75604c8
rm inconsistency - using config(singular)
Schefflera-Arboricola Aug 2, 2024
3979f2e
removed __all__ from config.py
Schefflera-Arboricola Aug 2, 2024
9c8b57b
changing the default backend to loky
Schefflera-Arboricola Aug 2, 2024
de99170
enhanced config documentation
Schefflera-Arboricola Aug 2, 2024
d354351
rm fetching list of configs
Schefflera-Arboricola Aug 2, 2024
e4bf668
made n_jobs=-1 as default
Schefflera-Arboricola Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions Config.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Using Configs

nx-parallel algorithms have a `joblib.parallel_config` context manager which has the `joblib.Parallel` instance. You change the config parameters of the internal `joblib.parallel_config` context manager using networkx's context manager, like this:

```
>>> import networkx as nx
>>> nx.config # default configs
NetworkXConfig(backend_priority=[], backends=Config(parallel=ParallelConfig(backend=None, n_jobs=None, verbose=0, temp_folder=None, max_nbytes='1M', mmap_mode='r', prefer=None, require=None, inner_max_num_threads=None, backend_params={})), cache_converted_graphs=True)
>>>
>>> nxp_config = nx.config.backends.parallel
>>> nxp_config.backend = "loky" # global config
>>>
>>> G = nx.complete_graph(20)
>>> # using context manager
>>> with nxp_config(n_jobs=4): # backend -> loky , n_jobs -> 4 , verbose -> 0
... nx.square_clustering(G, backend="parallel")
... nxp_config
... print()
... with nxp_config(n_jobs=3, verbose=15): # backend -> loky , n_jobs -> 3 , verbose -> 15
Schefflera-Arboricola marked this conversation as resolved.
Show resolved Hide resolved
... nx.square_clustering(G, backend="parallel")
... nxp_config
... print()
... with nxp_config(verbose=0): # backend -> loky , n_jobs -> 3 , verbose -> 0
... nx.square_clustering(G, backend="parallel")
... nxp_config
...
{0: 1.0, 1: 1.0, 2: 1.0, 3: 1.0, 4: 1.0, 5: 1.0, 6: 1.0, 7: 1.0, 8: 1.0, 9: 1.0, 10: 1.0, 11: 1.0, 12: 1.0, 13: 1.0, 14: 1.0, 15: 1.0, 16: 1.0, 17: 1.0, 18: 1.0, 19: 1.0}
ParallelConfig(backend='loky', n_jobs=4, verbose=0, temp_folder=None, max_nbytes='1M', mmap_mode='r', prefer=None, require=None, inner_max_num_threads=None, backend_params={})

[Parallel(n_jobs=3)]: Using backend LokyBackend with 3 concurrent workers.
[Parallel(n_jobs=3)]: Done 1 tasks | elapsed: 0.1s
[Parallel(n_jobs=3)]: Batch computation too fast (0.09458684921264648s.) Setting batch_size=2.
[Parallel(n_jobs=3)]: Done 2 out of 4 | elapsed: 0.1s remaining: 0.1s
[Parallel(n_jobs=3)]: Done 4 out of 4 | elapsed: 0.1s finished
{0: 1.0, 1: 1.0, 2: 1.0, 3: 1.0, 4: 1.0, 5: 1.0, 6: 1.0, 7: 1.0, 8: 1.0, 9: 1.0, 10: 1.0, 11: 1.0, 12: 1.0, 13: 1.0, 14: 1.0, 15: 1.0, 16: 1.0, 17: 1.0, 18: 1.0, 19: 1.0}
ParallelConfig(backend='loky', n_jobs=3, verbose=15, temp_folder=None, max_nbytes='1M', mmap_mode='r', prefer=None, require=None, inner_max_num_threads=None, backend_params={})

{0: 1.0, 1: 1.0, 2: 1.0, 3: 1.0, 4: 1.0, 5: 1.0, 6: 1.0, 7: 1.0, 8: 1.0, 9: 1.0, 10: 1.0, 11: 1.0, 12: 1.0, 13: 1.0, 14: 1.0, 15: 1.0, 16: 1.0, 17: 1.0, 18: 1.0, 19: 1.0}
ParallelConfig(backend='loky', n_jobs=3, verbose=0, temp_folder=None, max_nbytes='1M', mmap_mode='r', prefer=None, require=None, inner_max_num_threads=None, backend_params={})
>>>
```

## Resources

- [NetworkX's Config docs](https://networkx.org/documentation/latest/reference/backends.html#module-networkx.utils.configs)
- [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
- [`joblib.parallel_config`](https://joblib.readthedocs.io/en/latest/generated/joblib.parallel_config.html)
- Using a distributed backends - [docs](https://joblib.readthedocs.io/en/latest/auto_examples/parallel/distributed_backend_simple.html#sphx-glr-auto-examples-parallel-distributed-backend-simple-py)
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ Note that for all functions inside `nx_code.py` that do not have an nx-parallel
import networkx as nx
import nx_parallel as nxp

nxp_config = nx.config.backends.parallel
nxp_config.backend = "loky"
nxp_config.n_jobs = 4
nxp_config.verbose = 15

G = nx.path_graph(4)
H = nxp.ParallelGraph(G)

Expand All @@ -104,6 +109,8 @@ nxp.betweenness_centrality(H)
# output : {0: 0.0, 1: 0.6666666666666666, 2: 0.6666666666666666, 3: 0.0}
```

For more on how to play with configs refer [Config.md](./Config.md)!

### Notes

1. Some functions in networkx have the same name but different implementations, so to avoid these name conflicts at the time of dispatching networkx differentiates them by specifying the `name` parameter in the `_dispatchable` decorator of such algorithms. So, `method 3` and `method 4` are not recommended. But, you can use them if you know the correct `name`. For example:
Expand Down
16 changes: 12 additions & 4 deletions _nx_parallel/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# This file was automatically generated by update_get_info.py

from .config import _configs


def get_info():
"""Return a dictionary with information about the package."""
Expand All @@ -9,6 +11,7 @@ def get_info():
"package": "nx_parallel",
"url": "https://github.com/networkx/nx-parallel",
"short_summary": "Parallel backend for NetworkX algorithms",
"default_config": _configs,
"functions": {
"number_of_isolates": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/isolate.py#L8",
Expand Down Expand Up @@ -151,22 +154,27 @@ def get_info():
},
},
"chunks": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L8",
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L15",
"additional_docs": "Divides an iterable into chunks of size n",
"additional_parameters": None,
},
"cpu_count": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L18",
"additional_docs": "Returns the number of logical CPUs or cores",
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L25",
"additional_docs": "Returns the positive value of `n_jobs` using `joblib.parallel.get_active_backend()`.",
"additional_parameters": None,
},
"create_iterables": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L26",
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L41",
"additional_docs": "Creates an iterable of function inputs for parallel computation based on the provided iterator type.",
"additional_parameters": {
"G : NetworkX graph": "iterator : str Type of iterator. Valid values are 'node', 'edge', 'isolate'",
"iterable : Iterable": "An iterable of function inputs.",
},
},
"get_configs": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L79",
"additional_docs": "Returns the current configuration settings for nx_parallel.",
"additional_parameters": None,
},
},
}
25 changes: 25 additions & 0 deletions _nx_parallel/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from networkx.utils.configs import Config
from typing import Union
from dataclasses import dataclass, field

__all__ = [
"ParallelConfig",
"_configs",
Schefflera-Arboricola marked this conversation as resolved.
Show resolved Hide resolved
]


@dataclass
class ParallelConfig(Config):
backend: str = None
n_jobs: int = None
verbose: int = 0
temp_folder: str = None
max_nbytes: Union[int, str] = "1M"
mmap_mode: str = "r"
prefer: str = None
require: str = None
inner_max_num_threads: int = None
backend_params: dict = field(default_factory=dict)


_configs = ParallelConfig()
6 changes: 5 additions & 1 deletion _nx_parallel/update_get_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ def extract_docstrings_from_file(file_path):
and node.targets[0].id == "__all__"
):
all_list = [
expr.s for expr in node.value.elts if isinstance(expr, ast.Str)
expr.value
for expr in node.value.elts
if isinstance(expr, ast.Constant)
dschult marked this conversation as resolved.
Show resolved Hide resolved
]
elif isinstance(node, ast.FunctionDef):
if all_list and node.name in all_list:
Expand Down Expand Up @@ -136,6 +138,7 @@ def get_url(file_path, function_name):

string = '''# This file was automatically generated by update_get_info.py

from .config import _configs

def get_info():
"""Return a dictionary with information about the package."""
Expand All @@ -145,6 +148,7 @@ def get_info():
"package": "nx_parallel",
"url": "https://github.com/networkx/nx-parallel",
"short_summary": "Parallel backend for NetworkX algorithms",
"default_config": _configs,
"functions": '''

with open("_nx_parallel/temp__init__.py", "w") as f:
Expand Down
16 changes: 9 additions & 7 deletions nx_parallel/algorithms/cluster.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from itertools import combinations, chain
from joblib import Parallel, delayed
from joblib import Parallel, delayed, parallel_config
import nx_parallel as nxp

__all__ = [
Expand Down Expand Up @@ -47,18 +47,20 @@ def _compute_clustering_chunk(node_iter_chunk):
else:
node_iter = list(G.nbunch_iter(nodes))

total_cores = nxp.cpu_count()
configs = nxp.get_configs()
Schefflera-Arboricola marked this conversation as resolved.
Show resolved Hide resolved
n_jobs = configs["n_jobs"]

if get_chunks == "chunks":
num_in_chunk = max(len(node_iter) // total_cores, 1)
num_in_chunk = max(len(node_iter) // n_jobs, 1)
node_iter_chunks = nxp.chunks(node_iter, num_in_chunk)
else:
node_iter_chunks = get_chunks(node_iter)

result = Parallel(n_jobs=total_cores)(
delayed(_compute_clustering_chunk)(node_iter_chunk)
for node_iter_chunk in node_iter_chunks
)
with parallel_config(**configs):
result = Parallel()(
delayed(_compute_clustering_chunk)(node_iter_chunk)
for node_iter_chunk in node_iter_chunks
)
clustering = dict(chain.from_iterable(result))

if nodes in G:
Expand Down
48 changes: 43 additions & 5 deletions nx_parallel/utils/chunk.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import itertools
import os
import networkx as nx
from dataclasses import asdict

__all__ = ["chunks", "cpu_count", "create_iterables"]
__all__ = [
"chunks",
"cpu_count",
"create_iterables",
"get_configs",
]


def chunks(iterable, n):
Expand All @@ -15,12 +21,20 @@ def chunks(iterable, n):
yield x


def cpu_count():
"""Returns the number of logical CPUs or cores"""
# Check if we are running under pytest
def cpu_count(n_jobs=None):
"""Returns the positive value of `n_jobs` using
`joblib.parallel.get_active_backend()`."""
if "PYTEST_CURRENT_TEST" in os.environ:
return 2
return os.cpu_count()
else:
n_cpus = os.cpu_count()
if n_jobs is None:
return 1
if n_jobs < 0:
return n_cpus + n_jobs + 1
if n_jobs == 0:
raise ValueError("n_jobs == 0 in Parallel has no meaning")
return int(n_jobs)


def create_iterables(G, iterator, n_cores, list_of_iterator=None):
Expand Down Expand Up @@ -51,3 +65,27 @@ def create_iterables(G, iterator, n_cores, list_of_iterator=None):
return chunks(list_of_iterator, num_in_chunk)
else:
raise ValueError("Invalid iterator type.")


def _get_configs(configs):
config_dict = asdict(configs)
config_dict.update(config_dict["backend_params"])
del config_dict["backend_params"]
config_dict["n_jobs"] = cpu_count(config_dict["n_jobs"])
if "PYTEST_CURRENT_TEST" in os.environ:
config_dict["backend"] = "loky"
Schefflera-Arboricola marked this conversation as resolved.
Show resolved Hide resolved
return config_dict


def get_configs(config=None):
"""Returns the current configuration settings for nx_parallel."""
config_dict = _get_configs(nx.config.backends.parallel)
if config is None:
return config_dict
elif isinstance(config, list):
new_config = {k: config_dict[k] for k in config if k in config_dict}
return new_config
Schefflera-Arboricola marked this conversation as resolved.
Show resolved Hide resolved
elif config in config_dict:
return config_dict[config]
else:
raise KeyError(f"Invalid config: {config}")
Loading