Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add automatic differentiation algorithm #10977

Merged
merged 14 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
327 changes: 327 additions & 0 deletions machine_learning/automatic_differentiation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,327 @@
"""
Demonstration of the Automatic Differentiation (Reverse mode).

Reference: https://en.wikipedia.org/wiki/Automatic_differentiation

Author: Poojan Smart
Email: smrtpoojan@gmail.com
"""
from __future__ import annotations

from collections import defaultdict
from enum import Enum
from types import TracebackType
from typing import Any

import numpy as np
from typing_extensions import Self # noqa: UP035


class OpType(Enum):
"""
Class represents list of supported operations on Variable for gradient calculation.
"""

ADD = 0
SUB = 1
MUL = 2
DIV = 3
MATMUL = 4
POWER = 5
NOOP = 6


class Variable:
"""
Class represents n-dimensional object which is used to wrap numpy array on which
operations will be performed and the gradient will be calculated.

Examples:
>>> Variable(5.0)
Variable(5.0)
>>> Variable([5.0, 2.9])
Variable([5. 2.9])
>>> Variable([5.0, 2.9]) + Variable([1.0, 5.5])
Variable([6. 8.4])
>>> Variable([[8.0, 10.0]])
Variable([[ 8. 10.]])
"""

def __init__(self, value: Any) -> None:
self.value = np.array(value)

# pointers to the operations to which the Variable is input
self.param_to: list[Operation] = []
# pointer to the operation of which the Variable is output of
self.result_of: Operation = Operation(OpType.NOOP)

def __repr__(self) -> str:
return f"Variable({self.value})"

def to_ndarray(self) -> np.ndarray:
return self.value

def __add__(self, other: Variable) -> Variable:
result = Variable(self.value + other.value)

with GradientTracker() as tracker:
# if tracker is enabled, computation graph will be updated
if tracker.enabled:
tracker.append(OpType.ADD, params=[self, other], output=result)
return result

def __sub__(self, other: Variable) -> Variable:
result = Variable(self.value - other.value)

with GradientTracker() as tracker:
# if tracker is enabled, computation graph will be updated
if tracker.enabled:
tracker.append(OpType.SUB, params=[self, other], output=result)
return result

def __mul__(self, other: Variable) -> Variable:
result = Variable(self.value * other.value)

with GradientTracker() as tracker:
# if tracker is enabled, computation graph will be updated
if tracker.enabled:
tracker.append(OpType.MUL, params=[self, other], output=result)
return result

def __truediv__(self, other: Variable) -> Variable:
result = Variable(self.value / other.value)

with GradientTracker() as tracker:
# if tracker is enabled, computation graph will be updated
if tracker.enabled:
tracker.append(OpType.DIV, params=[self, other], output=result)
return result

def __matmul__(self, other: Variable) -> Variable:
result = Variable(self.value @ other.value)

with GradientTracker() as tracker:
# if tracker is enabled, computation graph will be updated
if tracker.enabled:
tracker.append(OpType.MATMUL, params=[self, other], output=result)
return result

def __pow__(self, power: int) -> Variable:
result = Variable(self.value**power)

with GradientTracker() as tracker:
# if tracker is enabled, computation graph will be updated
if tracker.enabled:
tracker.append(
OpType.POWER,
params=[self],
output=result,
other_params={"power": power},
)
return result

def add_param_to(self, param_to: Operation) -> None:
self.param_to.append(param_to)

def add_result_of(self, result_of: Operation) -> None:
self.result_of = result_of


class Operation:
"""
Class represents operation between single or two Variable objects.
Operation objects contains type of operation, pointers to input Variable
objects and pointer to resulting Variable from the operation.
"""

def __init__(
self,
op_type: OpType,
other_params: dict | None = None,
) -> None:
self.op_type = op_type
self.other_params = {} if other_params is None else other_params

def add_params(self, params: list[Variable]) -> None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file machine_learning/automatic_differentiation.py, please provide doctest for the function add_params

self.params = params

def add_output(self, output: Variable) -> None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file machine_learning/automatic_differentiation.py, please provide doctest for the function add_output

self.output = output

def __eq__(self, value) -> bool:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide type hint for the parameter: value

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide type hint for the parameter: value

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide type hint for the parameter: value

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file machine_learning/automatic_differentiation.py, please provide doctest for the function __eq__

Please provide type hint for the parameter: value

return self.op_type == value if isinstance(value, OpType) else False


class GradientTracker:
"""
Class contains methods to compute partial derivatives of Variable
based on the computation graph.

Examples:

>>> with GradientTracker() as tracker:
... a = Variable([2.0, 5.0])
... b = Variable([1.0, 2.0])
... m = Variable([1.0, 2.0])
... c = a + b
... d = a * b
... e = c / d
>>> tracker.gradient(e, a)
array([-0.25, -0.04])
>>> tracker.gradient(e, b)
array([-1. , -0.25])
>>> tracker.gradient(e, m) is None
True

>>> with GradientTracker() as tracker:
... a = Variable([[2.0, 5.0]])
... b = Variable([[1.0], [2.0]])
... c = a @ b
>>> tracker.gradient(c, a)
array([[1., 2.]])
>>> tracker.gradient(c, b)
array([[2.],
[5.]])

>>> with GradientTracker() as tracker:
... a = Variable([[2.0, 5.0]])
... b = a ** 3
>>> tracker.gradient(b, a)
array([[12., 75.]])
"""

instance = None

def __new__(cls) -> Self:
"""
Executes at the creation of class object and returns if
object is already created. This class follows singleton
design pattern.
"""
if cls.instance is None:
cls.instance = super().__new__(cls)
return cls.instance

def __init__(self) -> None:
self.enabled = False

def __enter__(self) -> Self:
self.enabled = True
return self

def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
traceback: TracebackType | None,
) -> None:
self.enabled = False

def append(
self,
op_type: OpType,
params: list[Variable],
output: Variable,
other_params: dict | None = None,
) -> None:
"""
Adds Operation object to the related Variable objects for
creating computational graph for calculating gradients.

Args:
op_type: Operation type
params: Input parameters to the operation
output: Output variable of the operation
"""
operation = Operation(op_type, other_params=other_params)
param_nodes = []
for param in params:
param.add_param_to(operation)
param_nodes.append(param)
output.add_result_of(operation)

operation.add_params(param_nodes)
operation.add_output(output)

def gradient(self, target: Variable, source: Variable) -> np.ndarray | None:
"""
Reverse accumulation of partial derivatives to calculate gradients
of target variable with respect to source variable.

Args:
target: target variable for which gradients are calculated.
source: source variable with respect to which the gradients are
calculated.

Returns:
Gradient of the source variable with respect to the target variable
"""

# partial derivatives with respect to target
partial_deriv = defaultdict(lambda: 0)
partial_deriv[target] = np.ones_like(target.to_ndarray())

# iterating through each operations in the computation graph
operation_queue = [target.result_of]
while len(operation_queue) > 0:
operation = operation_queue.pop()
for param in operation.params:
# as per the chain rule, multiplying partial derivatives
# of variables with respect to the target
dparam_doutput = self.derivative(param, operation)
dparam_dtarget = dparam_doutput * partial_deriv[operation.output]
partial_deriv[param] += dparam_dtarget

if param.result_of and param.result_of != OpType.NOOP:
operation_queue.append(param.result_of)

return partial_deriv.get(source)

def derivative(self, param: Variable, operation: Operation) -> np.ndarray:
"""
Compute the derivative of given operation/function

Args:
param: variable to be differentiated
operation: function performed on the input variable

Returns:
Derivative of input variable with respect to the output of
the operation
"""
params = operation.params

if operation == OpType.ADD:
return np.ones_like(params[0].to_ndarray(), dtype=np.float64)
if operation == OpType.SUB:
if params[0] == param:
return np.ones_like(params[0].to_ndarray(), dtype=np.float64)
return -np.ones_like(params[1].to_ndarray(), dtype=np.float64)
if operation == OpType.MUL:
return (
params[1].to_ndarray().T
if params[0] == param
else params[0].to_ndarray().T
)
if operation == OpType.DIV:
if params[0] == param:
return 1 / params[1].to_ndarray()
return -params[0].to_ndarray() / (params[1].to_ndarray() ** 2)
if operation == OpType.MATMUL:
return (
params[1].to_ndarray().T
if params[0] == param
else params[0].to_ndarray().T
)
if operation == OpType.POWER:
power = operation.other_params["power"]
return power * (params[0].to_ndarray() ** (power - 1))

err_msg = f"invalid operation type: {operation.op_type}"
raise ValueError(err_msg)


if __name__ == "__main__":
import doctest

doctest.testmod()
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ statsmodels
sympy
tensorflow ; python_version < '3.12'
tweepy
xgboost
# yulewalker # uncomment once audio_filters/equal_loudness_filter.py is fixed
typing_extensions
xgboost