Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Evaluator unable to pickle, causing parallelization issues #386

Closed
qchempku2017 opened this issue Jul 10, 2023 · 2 comments
Closed

Evaluator unable to pickle, causing parallelization issues #386

qchempku2017 opened this issue Jul 10, 2023 · 2 comments
Assignees
Labels
bug Something isn't working

Comments

@qchempku2017
Copy link
Collaborator

qchempku2017 commented Jul 10, 2023

After PR #358 , Evaluator classes are used to calculate correlation functions. However, these classes cannot be properly pickled due to lack of "reduce" method. This makes it impossible to parallelize MC in separate processes using multiprocessing or Joblib, because the workers in these libraries require all I/O to be picklable.

Note that process-wise parallelization is still desirable even if you have implemented parallel evaluation of correlations, because people would frequently want to parallelize temperatures, chemical potentials, etc.

Current Behavior

Here is the error reported by pytest:

test setup failed
joblib.externals.loky.process_executor._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "C:\Users\66422\PycharmProjects\WFacer\venv\lib\site-packages\joblib\externals\loky\backend\queues.py", line 159, in _feed
    obj_ = dumps(obj, reducers=reducers)
  File "C:\Users\66422\PycharmProjects\WFacer\venv\lib\site-packages\joblib\externals\loky\backend\reduction.py", line 215, in dumps
    dump(obj, buf, reducers=reducers, protocol=protocol)
  File "C:\Users\66422\PycharmProjects\WFacer\venv\lib\site-packages\joblib\externals\loky\backend\reduction.py", line 208, in dump
    _LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
  File "C:\Users\66422\PycharmProjects\WFacer\venv\lib\site-packages\joblib\externals\cloudpickle\cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
  File "stringsource", line 2, in smol.utils.cluster.evaluator.ClusterSpaceEvaluator.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__
"""The above exception was the direct cause of the following exception:
​
initial_document = CeOutputsDocument(project_name='ace-work', cluster_subspace=Cluster Subspace Summary
Basis/Orthogonal/Orthonormal : in...tructures=None, enumerated_matrices=None, enumerated_features=None, undecorated_entries=None, computed_properties=None)
​
    @pytest.fixture
    def enum_output(initial_document):
>       return enumerate_structures(initial_document)
​
test_jobs.py:95: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
..\WFacer\jobs.py:239: in enumerate_structures
    new_structures, new_sc_matrices, new_features = _enumerate_structures(
..\WFacer\jobs.py:97: in _enumerate_structures
    new_structures, new_sc_matrices, new_features = generate_training_structures(
..\WFacer\enumeration.py:469: in generate_training_structures
    results = par(
..\venv\lib\site-packages\joblib\parallel.py:1944: in __call__
    return output if self.return_generator else list(output)
..\venv\lib\site-packages\joblib\parallel.py:1587: in _get_outputs
    yield from self._retrieve()
..\venv\lib\site-packages\joblib\parallel.py:1691: in _retrieve
    self._raise_error_fast()
..\venv\lib\site-packages\joblib\parallel.py:1726: in _raise_error_fast
    error_job.get_result(self.timeout)
..\venv\lib\site-packages\joblib\parallel.py:735: in get_result
    return self._return_or_raise()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
​
self = <joblib.parallel.BatchCompletionCallBack object at 0x00000120EE572220>def _return_or_raise(self):
        try:
            if self.status == TASK_ERROR:
>               raise self._result
E               _pickle.PicklingError: Could not pickle the task to send it to the workers.
​
..\venv\lib\site-packages\joblib\parallel.py:753: PicklingError

The cython class ClusterSpaceEvaluator does not have a reduce method.

Possible Solution

Implement a reduce method for all added cython classes in smol.util and test out process-wise parallelization again.

Steps to Reproduce

The problematic codes:

def generate_training_structures(
    ce,
    enumerated_matrices,
    enumerated_counts,
    previous_sampled_structures=None,
    previous_feature_matrix=None,
    keep_ground_states=True,
    num_structs=60,
    mc_generator_kwargs=None,
    n_parallel=None,
    duplicacy_criteria="correlations",
    **kwargs,
):
    """Generate training structures at the first iteration.

    Args:
        ce(ClusterExpansion):
            ClusterExpansion object initialized as null. If charge decorated,
            will contain an ewald contribution at 100%
        enumerated_matrices(list[3*3 ArrayLike[int]]):
            Previously enumerated supercell matrices. Must be the same super-cell
            size.
        enumerated_counts(list[1D ArrayLike]):
            Previously enumerated compositions in "counts" format. Must fit in
            the super-cell size.
            Note: Different super-cell sizes not supported!
        previous_sampled_structures(list[Structure]): optional
            Sample structures already calculated in past iterations.
            If given, that means you will add structures to an existing
            training set.
        previous_feature_matrix(list[list[[float]]): optional
            Correlation vectors of structures already calculated in past iterations.
        keep_ground_states(bool): optional
            Whether always to include the electrostatic ground states.
            Default to True.
        num_structs(int): optional
            Number of training structures to add at the iteration.
            At least 2~3 structures should be enumerated for each composition.
            And it is recommended that num_structs_init * 10 > 2 *
            len(supercell_and_counts).
            Default is 60.
        mc_generator_kwargs(dict): optional
            Keyword arguments for McSampleGenerator, except num_samples.
            Note: currently only support Canonical.
        n_parallel(int): optional
            Number of generators to run in parallel. Default is to use
            a quarter of cpu count.
        duplicacy_criteria(str):
            The criteria when to consider two structures as the same and
            old to add one of them into the candidate training set.
            Default is "correlations", which means to assert duplication
            if two structures have the same correlation vectors. While
            "structure" means two structures must be symmetrically equivalent
            after being reduced. No other option is allowed.
            Note that option "structure" might be significantly slower since
            it has to attempt reducing every structure to its primitive cell
            before matching. It should be used with caution.
        kwargs:
            Keyword arguments for utils.selection.select_initial_rows.
    Returns:
        list[Structure], list[3*3 list[list[int]]], list[list[float]]:
            Initial training structures, super-cell matrices,
            and normalized correlation vectors.
    """
    mc_generator_args = mc_generator_kwargs or {}
    n_parallel = n_parallel or min(cpu_count() // 4, len(enumerated_counts))
    if n_parallel == 0:
        if cpu_count() // 4 == 0:
            warn(
                f"Number of CPUs found on the executing environment: {cpu_count()} might"
                f" not be enough for parallelization! Setting parallel processes to 1."
            )
            n_parallel = 1

    previous_sampled_structures = previous_sampled_structures or []
    previous_feature_matrix = np.array(previous_feature_matrix).tolist() or []
    if len(previous_feature_matrix) != len(previous_sampled_structures):
        raise ValueError(
            "Must provide a feature vector for each" " structure passed in!"
        )

    # Scale the number of structures to select for each comp.
    num_samples = get_num_structs_to_sample(
        [counts for _ in enumerated_matrices for counts in enumerated_counts],
        num_structs,
    )

    with Parallel(n_jobs=n_parallel) as par:
        gs_id = 0
        keeps = []
        structures = []
        femat = []
        sc_matrices = []
        sc_matrix_indices = []

        for mid, sc_matrix in enumerate(enumerated_matrices):
            # This should work on pytest.
            results = par(
                delayed(_sample_single_generator)(
                    ce,
                    previous_sampled_structures + structures,
                    previous_feature_matrix + femat,
                    mc_generator_args,
                    sc_matrix,
                    counts,
                    num_sample,
                    duplicacy_criteria=duplicacy_criteria,
                )
                for counts, num_sample in zip(
                    enumerated_counts,
                    num_samples[
                        mid
                        * len(enumerated_counts) : (mid + 1)
                        * len(enumerated_counts)
                    ],
                )
            )

            for (
                gs_struct,
                gs_occu,
                gs_feat,
                samples,
                samples_occu,
                samples_feat,
                gs_dupe,
            ) in results:
                if gs_dupe:
                    structures.extend(samples)
                    femat.extend(samples_feat)
                    sc_matrices.extend([sc_matrix for _ in samples])
                    sc_matrix_indices.extend([mid for _ in samples])
                    gs_id += len(samples)
                else:
                    structures.extend([gs_struct] + samples)
                    femat.extend([gs_feat] + samples_feat)
                    sc_matrices.extend([sc_matrix for _ in range(len(samples) + 1)])
                    sc_matrix_indices.extend([mid for _ in range(len(samples) + 1)])
                    if keep_ground_states:
                        keeps.append(gs_id)
                    gs_id += len(samples) + 1

    femat = np.array(femat)

    # External terms such as the ewald term should not be taken into comparison,
    # when selecting structures
    num_external_terms = len(ce.cluster_subspace.external_terms)

    if len(previous_sampled_structures) == 0:
        # Start from scratch.
        selected_row_ids = select_initial_rows(
            femat,
            n_select=num_structs,
            keep_indices=keeps,
            num_external_terms=num_external_terms,
            **kwargs,
        )
    else:
        # Add to existing:
        selected_row_ids = select_added_rows(
            femat,
            np.array(previous_feature_matrix),
            n_select=num_structs,
            keep_indices=keeps,
            num_external_terms=num_external_terms,
            **kwargs,
        )

    # Must sort to ensure the same ordering between feature rows and structures.
    selected_row_ids = sorted(selected_row_ids)
    selected_structures = [s for i, s in enumerate(structures) if i in selected_row_ids]
    selected_matrices = [m for i, m in enumerate(sc_matrices) if i in selected_row_ids]
    selected_femat = femat[selected_row_ids, :].tolist()
    if len(selected_row_ids) < num_structs:
        warn(
            f"Expected to add {num_structs} new structures,"
            f" but only {len(selected_row_ids)}"
            f" non duplicate structures could be added."
        )
    return selected_structures, selected_matrices, selected_femat

In this function I tried to initialize a MC sampler in every parallel process.

@qchempku2017 qchempku2017 added the bug Something isn't working label Jul 10, 2023
@qchempku2017
Copy link
Collaborator Author

@lbluque
Copy link
Collaborator

lbluque commented Jul 11, 2023

more context here

@lbluque lbluque mentioned this issue Jul 11, 2023
4 tasks
@lbluque lbluque closed this as completed Jul 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants