Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSON Serialization + Prefab Pipelines #34

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ docs/source/reference/_autosummary/
objects.inv
_build/
.doctrees/
*.mo
*.mo
AFL-agent.code-workspace
64 changes: 28 additions & 36 deletions AFL/double_agent/Extrapolator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
- Support for different sample and grid dimensions
"""

from typing import List, Optional
from typing import List

import numpy as np
import sklearn.gaussian_process # type: ignore
Expand Down Expand Up @@ -260,9 +260,12 @@ class predictions and uncertainty estimates through entropy.
The `xarray` dimension over the discrete 'samples' in the `feature_input_variable`. This is typically
a variant of `sample` e.g., `saxs_sample`.

kernel: Optional[object]
A optional sklearn.gaussian_process.kernel to use the classifier. If not provided, will default to
kernel: str
The name of the sklearn.gaussian_process.kernel to use the classifier. If not provided, will default to
`Matern`.

kernel_kwargs: dict
Additional keyword arguments to pass to the sklearn.gaussian_process.kernel

optimizer: str
The name of the optimizer to use in optimizer the gaussian process parameters
Expand All @@ -279,7 +282,8 @@ def __init__(
grid_variable: str,
grid_dim: str,
sample_dim: str,
kernel: Optional[object] = None,
kernel: str = 'Matern',
kernel_kwargs: dict = {'length_scale':1.0, 'nu':1.5},
optimizer: str = "fmin_l_bfgs_b",
name: str = "GaussianProcessClassifier",
) -> None:
Expand All @@ -294,19 +298,11 @@ def __init__(
grid_dim=grid_dim,
sample_dim=sample_dim,
)

if kernel is None:
self.kernel = sklearn.gaussian_process.kernels.Matern(
length_scale=1.0, nu=1.5
)
else:
self.kernel = kernel

self.kernel = kernel
self.kernel_kwargs = kernel_kwargs
self.output_prefix = output_prefix
if optimizer is not None:
self.optimizer = optimizer
else:
self.optimizer = None
self.optimizer = optimizer


def calculate(self, dataset: xr.Dataset) -> Self:
"""Apply this GP classifier to the supplied dataset.
Expand Down Expand Up @@ -341,8 +337,10 @@ def calculate(self, dataset: xr.Dataset) -> Self:
)

else:
kernel = getattr(sklearn.gaussian_process.kernels, self.kernel)(**self.kernel_kwargs)

clf = sklearn.gaussian_process.GaussianProcessClassifier(
kernel=self.kernel, optimizer=self.optimizer
kernel=kernel, optimizer=self.optimizer
).fit(X.values, y.values)

# entropy approach to classification probabilities
Expand Down Expand Up @@ -395,14 +393,14 @@ class GaussianProcessRegressor(Extrapolator):
The `xarray` dimension over the discrete 'samples' in the `feature_input_variable`. This is typically
a variant of `sample` e.g., `saxs_sample`.

predictor_uncertainty_variable: Optional[str]
predictor_uncertainty_variable: str | None
Variable containing uncertainty estimates for the predictor values

optimizer: str
The name of the optimizer to use in optimizer the gaussian process parameters

kernel: Optional[object]
A optional sklearn.gaussian_process.kernel to use the regressor. If not provided, will default to
kernel: str | None
The name of the sklearn.gaussian_process.kernel to use the regressor. If not provided, will default to
`Matern`.

name: str
Expand All @@ -422,7 +420,8 @@ def __init__(
sample_dim,
predictor_uncertainty_variable=None,
optimizer="fmin_l_bfgs_b",
kernel=None,
kernel: str = 'Matern',
kernel_kwargs: dict = {'length_scale':1.0, 'nu':1.5},
name="GaussianProcessRegressor",
fix_nans=True,
) -> None:
Expand All @@ -440,19 +439,10 @@ def __init__(
self.predictor_uncertainty_variable = predictor_uncertainty_variable
if predictor_uncertainty_variable is not None:
self.input_variable.append(predictor_uncertainty_variable)

if kernel is None:
self.kernel = sklearn.gaussian_process.kernels.Matern(
#length_scale=[0.1], length_scale_bounds=(1e-3, 1e0), nu=1.5
length_scale = [0.1], length_scale_bounds = (0.2, 1e0), nu = 1.5
)
else:
self.kernel = kernel

if optimizer is not None:
self.optimizer = optimizer
else:
self.optimizer = "fmin_l_bfgs_b"

self.kernel = kernel
self.kernel_kwargs = kernel_kwargs
self.optimizer = optimizer

self.predictor_uncertainty_variable = predictor_uncertainty_variable
self._banned_from_attrs.append("predictor_uncertainty_variable")
Expand Down Expand Up @@ -491,15 +481,17 @@ def calculate(self, dataset: xr.Dataset) -> Self:
dy = dataset[self.predictor_uncertainty_variable].transpose(
self.sample_dim, ...
)
kernel = getattr(sklearn.gaussian_process.kernels, self.kernel)(**self.kernel_kwargs)
reg = sklearn.gaussian_process.GaussianProcessRegressor(
kernel=self.kernel, alpha=dy.values, optimizer=self.optimizer
kernel=kernel, alpha=dy.values, optimizer=self.optimizer
).fit(X.values, y.values)

reg_type = "heteroscedastic"

else:
kernel = getattr(sklearn.gaussian_process.kernels, self.kernel)(**self.kernel_kwargs)
reg = sklearn.gaussian_process.GaussianProcessRegressor(
kernel=self.kernel, optimizer=self.optimizer
kernel=kernel, optimizer=self.optimizer
).fit(X.values, y.values)
reg_type = "homoscedastic"

Expand Down
136 changes: 102 additions & 34 deletions AFL/double_agent/Pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import re
from typing import Generator, Optional, List
import warnings
import datetime
import json
import pathlib

import matplotlib.pyplot as plt
import networkx as nx
Expand All @@ -29,25 +32,24 @@
from AFL.double_agent.PipelineContext import PipelineContext
from AFL.double_agent.PipelineOp import PipelineOp
from AFL.double_agent.util import listify
from AFL.double_agent.util import extract_parameters


class Pipeline(PipelineContext):
"""
Container class for defining and executing computational workflows.

The Pipeline class serves as a framework for organizing and running sequences of
operations (PipelineOps) on data. Each operation in the pipeline takes input data,
performs a specific transformation, and produces output data that can be used by
subsequent operations.

Parameters
----------
name : Optional[str], default=None
Name of the pipeline. If None, defaults to "Pipeline".
ops : Optional[List], default=None
List of PipelineOp objects to initialize the pipeline with.

Attributes
----------
result : Any
Expand All @@ -60,17 +62,16 @@ class Pipeline(PipelineContext):
Edge labels for the pipeline graph visualization
"""

def __init__(self, name: Optional[str] = None, ops: Optional[List] = None) -> None:
def __init__(
self,
name: Optional[str] = None,
ops: Optional[List] = None,
description: Optional[str] = None,
) -> None:
self.result = None
if ops is None:
self.ops = []
else:
self.ops = ops

if name is None:
self.name = "Pipeline"
else:
self.name = name
self.ops = ops or []
self.description = str(description)
self.name = name or "Pipeline"

# placeholder for networkx graph
self.graph = None
Expand Down Expand Up @@ -120,36 +121,41 @@ def print(self) -> None:
def print_code(self) -> None:
"""String representation of approximate code to generate this pipeline

Run this method to produce a string of Python code that should **approximate**
this Pipeline. If all constructor parameters are not stored as class attributes,
this method will fail and the code result will need to be edited.

Warning
--------
This method is approximate. The code generated by this method may not produce
an identical Pipeline to this method. Use with caution.
Run this method to produce a string of Python code that should
recreate this Pipeline.

"""

warnings.warn(
"""Pipeline.emit_code() cannot perfectly infer pipeline code and is not"""
""" guaranteed to reproduce the same pipeline. Use with caution.""",
stacklevel=2,
)

output_string = f"with Pipeline(name = \"{self.name}\") as p:\n"
output_string = f'with Pipeline(name = "{self.name}") as p:\n'

for op in self:
params = extract_parameters(op)
args = op._stored_args
output_string += f" {type(op).__name__}(\n"
for k, v in params.items():
for k, v in args.items():
if isinstance(v, str):
output_string += f' {k}="{v}",\n'
else:
output_string += f" {k}={v},\n"
output_string += f" )\n\n"

print(output_string)
try:
shell = get_ipython().__class__.__name__
if shell == "ZMQInteractiveShell":
# Create IPython magic for creating executable code
ip = get_ipython()

# First display the code with syntax highlighting for visibility
from IPython.display import display, Code

# display(Code(output_string, language="python"))

# Define a temporary magic to create a new cell with the code
ip.set_next_input(output_string)
print("Pipeline code has been prepared in a new cell below.")
else:
print(output_string)
except NameError:
print(output_string)

def append(self, op: PipelineOp) -> Self:
"""Mirrors the behavior of python lists"""
Expand All @@ -169,9 +175,67 @@ def clear_outputs(self):
def copy(self) -> Self:
return copy.deepcopy(self)

def write(self, filename: str):
def write_json(
self, filename: str, overwrite=False, description: Optional[str] = None
):
"""Write pipeline to disk as a JSON

Parameters
----------
filename: str
Filename or filepath to be written
overwrite: bool, default=False
Whether to overwrite an existing file
description: str, optional
A descriptive text about the pipeline's purpose and functionality
"""

if not overwrite and pathlib.Path(filename).exists():
raise FileExistsError()

pipeline_dict = {
"name": self.name,
"date": datetime.datetime.now().strftime("%m/%d/%y %H:%M:%S-%f"),
"description": (
str(description) if description is not None else self.description
),
"ops": [op.to_json() for op in self],
}

with open(filename, "w") as f:
json.dump(pipeline_dict, f, indent=1)

print(f"Pipeline successfully written to {filename}.")

@staticmethod
def read_json(filename: str):
"""Read pipeline from json file on disk

Usage
-----
```python
from AFL.double_agent.Pipeline import Pipeline
pipeline1 = Pipeline.read_json('pickled_pipeline.pkl')
````
"""
with open(filename, "r") as f:
pipeline_dict = json.load(f)

pipeline = Pipeline(
name=pipeline_dict["name"],
ops=[PipelineOp.from_json(op) for op in pipeline_dict["ops"]],
description=pipeline_dict["description"],
)

return pipeline

def write_pkl(self, filename: str):
"""Write pipeline to disk as a pkl

.. warning::
Please use the read_json and write_json methods. The pickle methods
are insecure and prone to errors.

Parameters
----------
filename: str
Expand All @@ -184,9 +248,13 @@ def write(self, filename: str):
pickle.dump(pipeline, f)

@staticmethod
def read(filename: str):
def read_pkl(filename: str):
"""Read pipeline from pickle file on disk

.. warning::
Please use the `read_json` and `write_json` methods. The pickle methods
are insecure and prone to errors.

Usage
-----
```python
Expand Down
Loading
Loading