Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Adding config to nx-parallel #68

Closed
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a95ceb4
initial commit
Schefflera-Arboricola Jun 5, 2024
390078f
bug_fix
Schefflera-Arboricola Jun 5, 2024
27205d1
Merge branch 'networkx:main' into config_1
Schefflera-Arboricola Jun 24, 2024
ead0927
added default_config to get_info
Schefflera-Arboricola Jun 24, 2024
273d53d
added get_curr_configs
Schefflera-Arboricola Jun 24, 2024
a7ca076
updated configs
Schefflera-Arboricola Jun 24, 2024
4982fc0
rm 3 configs
Schefflera-Arboricola Jun 24, 2024
d4aaedb
Merge branch 'networkx:main' into config_1
Schefflera-Arboricola Jul 22, 2024
4465712
using joblib.parallel_config
Schefflera-Arboricola Jul 24, 2024
9ce30d7
indentation
Schefflera-Arboricola Jul 24, 2024
d7ba93d
updated cpu_count
Schefflera-Arboricola Jul 26, 2024
79e6881
integrating nx config
Schefflera-Arboricola Jul 27, 2024
ced3898
rm get_default_configs
Schefflera-Arboricola Jul 27, 2024
61dcbb7
renamed get_n_jobs
Schefflera-Arboricola Jul 27, 2024
788dbf8
n_job=-1s
Schefflera-Arboricola Jul 27, 2024
9085093
changing n_jobs=None
Schefflera-Arboricola Jul 27, 2024
9870bad
added backend=loky for testing
Schefflera-Arboricola Jul 27, 2024
f08780c
Merge branch 'networkx:main' into config_1
Schefflera-Arboricola Aug 1, 2024
75604c8
rm inconsistency - using config(singular)
Schefflera-Arboricola Aug 2, 2024
3979f2e
removed __all__ from config.py
Schefflera-Arboricola Aug 2, 2024
9c8b57b
changing the default backend to loky
Schefflera-Arboricola Aug 2, 2024
de99170
enhanced config documentation
Schefflera-Arboricola Aug 2, 2024
d354351
rm fetching list of configs
Schefflera-Arboricola Aug 2, 2024
e4bf668
made n_jobs=-1 as default
Schefflera-Arboricola Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions Config.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# [WIP] Using Configs

## Current state

All nx-parallel functions implement a `joblib.Parallel` internally, and you change the parameters of that using the `joblib.parallel_config` context manager. We recommend using the `joblib.parallel_config` context manager throughout your code.

Example usage:

```
>>> import networkx as nx
>>> from joblib import parallel_config
>>> from joblib.parallel import get_active_backend
>>> G = nx.complete_graph(4)
>>> get_active_backend()
(<joblib._parallel_backends.LokyBackend object at 0x10348a3c0>, None)
>>> with parallel_config(n_jobs=-1):
... get_active_backend()
... nx.square_clustering(G, backend="parallel")
... with parallel_config(n_jobs=3):
... get_active_backend()
... nx.square_clustering(G, backend="parallel")
... get_active_backend()
...
(<joblib._parallel_backends.LokyBackend object at 0x104a24b30>, -1)
{0: 1.0, 1: 1.0, 2: 1.0, 3: 1.0}
(<joblib._parallel_backends.LokyBackend object at 0x102a99a60>, 3)
{0: 1.0, 1: 1.0, 2: 1.0, 3: 1.0}
(<joblib._parallel_backends.LokyBackend object at 0x10348a3c0>, -1)
>>> get_active_backend()
(<joblib._parallel_backends.LokyBackend object at 0x102a37920>, None)
```

## WIP

- Sync `nx.config.backends.parallel` with default configs in `joblib.parallel_config`, so, something like this is possible. Note that here `nx_parallel_config` is a context manager that behaves almost like `joblib.parallel_config` except that its default configs (only for the outermost config manager) are the configs passed in by the user in the `nx.config.backends.parallel` dataclass.

```py
nx.config.backends.parallel.n_jobs = 3
nx.config.backends.parallel.backend = "loky"
nx.config.backends.parallel.verbose = 15

nx.square_clustering(
G, backend="parallel"
) # n_jobs --> 3, backend --> loky, verbose --> 15

with nx_parallel_config(n_jobs=4):
get_active_backend()
nx.square_clustering(
G, backend="parallel"
) # n_jobs --> 4, backend --> loky, verbose --> 15
with nx_parallel_config(verbose=0):
get_active_backend()
nx.square_clustering(
G, backend="parallel"
) # n_jobs --> 4, backend --> loky, verbose --> 0
with nx_parallel_config(n_jobs=5):
get_active_backend()
nx.square_clustering(
G, backend="parallel"
) # n_jobs --> 5, backend --> loky, verbose --> 0
get_active_backend()
get_active_backend()
get_active_backend()
```

## Notes

- Don't recommend changing global configuration within a context manager, but you can obviously do whatever you want! (Changes made to any global configurations inside a context manager will be lost after exiting that context manager?)

## Resources

- [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
- [`joblib.parallel_config`](https://joblib.readthedocs.io/en/latest/generated/joblib.parallel_config.html)
- Using a distributed backend - [docs](https://joblib.readthedocs.io/en/latest/auto_examples/parallel/distributed_backend_simple.html#sphx-glr-auto-examples-parallel-distributed-backend-simple-py)
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ nxp.betweenness_centrality(H)
# output : {0: 0.0, 1: 0.6666666666666666, 2: 0.6666666666666666, 3: 0.0}
```

For more on how to play with configs refer [Config.md](./Config.md)!

### Notes

1. Some functions in networkx have the same name but different implementations, so to avoid these name conflicts at the time of dispatching networkx differentiates them by specifying the `name` parameter in the `_dispatchable` decorator of such algorithms. So, `method 3` and `method 4` are not recommended. But, you can use them if you know the correct `name`. For example:
Expand Down
23 changes: 20 additions & 3 deletions _nx_parallel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ def get_info():
"package": "nx_parallel",
"url": "https://github.com/networkx/nx-parallel",
"short_summary": "Parallel backend for NetworkX algorithms",
"default_config": {
"backend": None,
"n_jobs": None,
"verbose": 0,
"temp_folder": None,
"max_nbytes": "1M",
"mmap_mode": "r",
"prefer": None,
"require": None,
"inner_max_num_threads": None,
"backend_params": None,
},
"functions": {
"number_of_isolates": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/isolate.py#L8",
Expand Down Expand Up @@ -150,18 +162,23 @@ def get_info():
'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` into `n` chunks, where `n` is the number of CPU cores."
},
},
"get_curr_configs": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/config.py#L43",
"additional_docs": "Returns the current configuration settings for nx_parallel.",
"additional_parameters": None,
},
"chunks": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L8",
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L9",
"additional_docs": "Divides an iterable into chunks of size n",
"additional_parameters": None,
},
"cpu_count": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L18",
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L19",
"additional_docs": "Returns the number of logical CPUs or cores",
"additional_parameters": None,
},
"create_iterables": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L26",
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/utils/chunk.py#L35",
"additional_docs": "Creates an iterable of function inputs for parallel computation based on the provided iterator type.",
"additional_parameters": {
"G : NetworkX graph": "iterator : str Type of iterator. Valid values are 'node', 'edge', 'isolate'",
Expand Down
14 changes: 11 additions & 3 deletions _nx_parallel/update_get_info.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import ast
from nx_parallel.utils.config import _configs

__all__ = [
"get_funcs_info",
Expand Down Expand Up @@ -58,7 +59,9 @@ def extract_docstrings_from_file(file_path):
and node.targets[0].id == "__all__"
):
all_list = [
expr.s for expr in node.value.elts if isinstance(expr, ast.Str)
expr.value
for expr in node.value.elts
if isinstance(expr, ast.Constant)
dschult marked this conversation as resolved.
Show resolved Hide resolved
]
elif isinstance(node, ast.FunctionDef):
if all_list and node.name in all_list:
Expand Down Expand Up @@ -134,7 +137,8 @@ def get_url(file_path, function_name):

# Creating a temp__init__.py file

string = '''# This file was automatically generated by update_get_info.py
string = (
'''# This file was automatically generated by update_get_info.py


def get_info():
Expand All @@ -145,7 +149,11 @@ def get_info():
"package": "nx_parallel",
"url": "https://github.com/networkx/nx-parallel",
"short_summary": "Parallel backend for NetworkX algorithms",
"functions": '''
"default_config": '''
+ str(_configs.get_config_dict())
+ """,
"functions": """
)

with open("_nx_parallel/temp__init__.py", "w") as f:
f.write(string + str(get_funcs_info()) + "}\n")
7 changes: 4 additions & 3 deletions nx_parallel/algorithms/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,16 @@ def _compute_clustering_chunk(node_iter_chunk):
else:
node_iter = list(G.nbunch_iter(nodes))

total_cores = nxp.cpu_count()
n_jobs = nxp.cpu_count()
n_jobs = 1 if n_jobs is None else n_jobs # changing the default
Schefflera-Arboricola marked this conversation as resolved.
Show resolved Hide resolved

if get_chunks == "chunks":
num_in_chunk = max(len(node_iter) // total_cores, 1)
num_in_chunk = max(len(node_iter) // n_jobs, 1)
node_iter_chunks = nxp.chunks(node_iter, num_in_chunk)
else:
node_iter_chunks = get_chunks(node_iter)

result = Parallel(n_jobs=total_cores)(
result = Parallel()(
delayed(_compute_clustering_chunk)(node_iter_chunk)
for node_iter_chunk in node_iter_chunks
)
Expand Down
1 change: 1 addition & 0 deletions nx_parallel/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .chunk import *
from .config import *
11 changes: 10 additions & 1 deletion nx_parallel/utils/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@ def cpu_count():
# Check if we are running under pytest
Schefflera-Arboricola marked this conversation as resolved.
Show resolved Hide resolved
if "PYTEST_CURRENT_TEST" in os.environ:
return 2
return os.cpu_count()
else:
from joblib.parallel import get_active_backend

n_jobs = get_active_backend()[1]
n_cpus = os.cpu_count()
if n_jobs < 0:
return n_cpus + n_jobs + 1
if n_jobs == 0:
raise ValueError("n_jobs == 0 in Parallel has no meaning")
return int(n_jobs)


def create_iterables(G, iterator, n_cores, list_of_iterator=None):
Expand Down
180 changes: 180 additions & 0 deletions nx_parallel/utils/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
from networkx.utils.configs import Config
from typing import Union
from dataclasses import asdict
import networkx as nx

__all__ = [
"NxpConfig",
"_configs",
"get_curr_configs",
]


class NxpConfig(Config):
backend: str = None
n_jobs: int = None
verbose: int = 0
temp_folder: str = None
max_nbytes: Union[int, str] = "1M"
mmap_mode: str = "r"
prefer: str = None
require: str = None
inner_max_num_threads: int = None
backend_params: dict = None

def get_config_dict(self, config=None):
"""Return the default configuration as a dictionary."""
config_dict = asdict(self)
if config is None:
return config_dict
elif isinstance(config, list):
new_config = {k: config_dict[k] for k in config if k in config_dict}
return new_config
elif config in config_dict:
return config_dict[config]
else:
raise KeyError(f"Invalid config: {config}")


_configs = NxpConfig()


def get_curr_configs(config=None):
"""Returns the current configuration settings for nx_parallel."""
config_dict = dict(nx.config.backends.parallel)
if config is None:
return config_dict
elif isinstance(config, list):
new_config = {k: config_dict[k] for k in config if k in config_dict}
return new_config
elif config in config_dict:
return config_dict[config]
else:
raise KeyError(f"Invalid config: {config}")


# WIP

"""
from joblib._utils import _Sentinel
from joblib.parallel import _backend, BACKENDS, EXTERNAL_BACKENDS, MAYBE_AVAILABLE_BACKENDS, DEFAULT_BACKEND
import warnings
from joblib import parallel_config

global_config = nx.config.backends.parallel

default_parallel_config = {
"backend": _Sentinel(default_value=global_config.backend),
"n_jobs": _Sentinel(default_value=global_config.n_jobs),
"verbose": _Sentinel(default_value=global_config.verbose),
"temp_folder": _Sentinel(default_value=global_config.temp_folder),
"max_nbytes": _Sentinel(default_value=global_config.max_nbytes),
"mmap_mode": _Sentinel(default_value=global_config.mmap_mode),
"prefer": _Sentinel(default_value=global_config.prefer),
"require": _Sentinel(default_value=global_config.require),
}

class nx_parallel_config(parallel_config):
def __init__(
self,
backend=default_parallel_config["backend"],
*,
n_jobs=default_parallel_config["n_jobs"],
verbose=default_parallel_config["verbose"],
temp_folder=default_parallel_config["temp_folder"],
max_nbytes=default_parallel_config["max_nbytes"],
mmap_mode=default_parallel_config["mmap_mode"],
prefer=default_parallel_config["prefer"],
require=default_parallel_config["require"],
inner_max_num_threads=None,
**backend_params
):
# Save the parallel info and set the active parallel config
self.old_parallel_config = getattr(
_backend, "config", default_parallel_config
)

backend = self._check_backend(
backend, inner_max_num_threads, **backend_params
)

new_config = {
"n_jobs": n_jobs,
"verbose": verbose,
"temp_folder": temp_folder,
"max_nbytes": max_nbytes,
"mmap_mode": mmap_mode,
"prefer": prefer,
"require": require,
"backend": backend
}
self.parallel_config = self.old_parallel_config.copy()
self.parallel_config.update({
k: v for k, v in new_config.items()
if not isinstance(v, _Sentinel)
})

setattr(_backend, "config", self.parallel_config)

def _check_backend(self, backend, inner_max_num_threads, **backend_params):
if backend is default_parallel_config['backend']:
if inner_max_num_threads is not None or len(backend_params) > 0:
raise ValueError(
"inner_max_num_threads and other constructor "
"parameters backend_params are only supported "
"when backend is not None."
)
return backend

if isinstance(backend, str):
# Handle non-registered or missing backends
if backend not in BACKENDS:
if backend in EXTERNAL_BACKENDS:
register = EXTERNAL_BACKENDS[backend]
register()
elif backend in MAYBE_AVAILABLE_BACKENDS:
warnings.warn(
f"joblib backend '{backend}' is not available on "
f"your system, falling back to {DEFAULT_BACKEND}.",
UserWarning,
stacklevel=2
)
BACKENDS[backend] = BACKENDS[DEFAULT_BACKEND]
else:
raise ValueError(
f"Invalid backend: {backend}, expected one of "
f"{sorted(BACKENDS.keys())}"
)

backend = BACKENDS[backend](**backend_params)

if inner_max_num_threads is not None:
msg = (
f"{backend.__class__.__name__} does not accept setting the "
"inner_max_num_threads argument."
)
assert backend.supports_inner_max_num_threads, msg
backend.inner_max_num_threads = inner_max_num_threads

# If the nesting_level of the backend is not set previously, use the
# nesting level from the previous active_backend to set it
if backend.nesting_level is None:
parent_backend = self.old_parallel_config['backend']
if parent_backend is default_parallel_config['backend']:
nesting_level = 0
else:
nesting_level = parent_backend.nesting_level
backend.nesting_level = nesting_level

return backend
"""

"""
import joblib
from joblib.parallel import _Sentinel

# Register the custom parallel backend
joblib.parallel.register_parallel_backend('NxpJoblibBackend', lambda **kwargs: joblib.parallel.ParallelBackendSequential(**default_parallel_config))

default_parallel_config["backend"] = _Sentinel(default_value=NxpJoblibBackend),
"""
Loading