Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cold arm definition to MAB object #65

Merged
merged 15 commits into from
Feb 1, 2023
7 changes: 7 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
MABWiser CHANGELOG
=====================

January, 24, 2022 2.6.0
-------------------------------------------------------------------------------
minor:
bkleyn marked this conversation as resolved.
Show resolved Hide resolved
- Implement tracking of warm started and partial fitted arms
- Implement tracking of cold arms
- Limit warm start to learning policies

January, 19, 2022 2.5.0
-------------------------------------------------------------------------------
major:
Expand Down
2 changes: 1 addition & 1 deletion mabwiser/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@

__author__ = "FMR LLC"
__email__ = "opensource@fmr.com"
__version__ = "2.5.0"
__version__ = "2.6.0"
__copyright__ = "Copyright (C), FMR LLC"
92 changes: 63 additions & 29 deletions mabwiser/base_mab.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
__version__ = __version__
__copyright__ = __copyright__

STATUS_TRAINED = 'is_trained'
STATUS_WARM = 'is_warm'
STATUS_WARM_STARTED_BY = 'warm_started_by'


class BaseMAB(metaclass=abc.ABCMeta):
"""Abstract base class for multi-armed bandits.
Expand Down Expand Up @@ -64,11 +68,11 @@ class BaseMAB(metaclass=abc.ABCMeta):
Default value is None. In this case the default backend selected by joblib will be used.
arm_to_expectation: Dict[Arm, float]
The dictionary of arms (keys) to their expected rewards (values).
cold_arm_to_warm_arm: Dict[Arm, Arm]:
Mapping indicating what arm was used to warm-start cold arms.
trained_arms: List[Arm]
List of trained arms.
Arms for which at least one decision has been observed are deemed trained.
arm_to_status: Dict[Arm, dict]
The dictionary of arms (keys) to their status (values), where the status consists of
bkleyn marked this conversation as resolved.
Show resolved Hide resolved
- ``is_trained``, which indicates whether an arm was ``fit`` or ``partial_fit``;
- ``is_warm``, which indicates whether an arm was warm started, and therefore has a trained model associated;
- and ``warm_started_by``, which indicates the arm that originally warm started this arm.
"""

@abc.abstractmethod
Expand All @@ -83,8 +87,26 @@ def __init__(self, rng: _BaseRNG, arms: List[Arm], n_jobs: int, backend: str = N
self.backend: str = backend

self.arm_to_expectation: Dict[Arm, float] = dict.fromkeys(self.arms, 0)
self.cold_arm_to_warm_arm: Dict[Arm, Arm] = dict()
self.trained_arms: List[Arm] = list()
self._reset_arm_to_status()

@property
def cold_arm_to_warm_arm(self) -> Dict[Arm, Arm]:
"""Mapping indicating what arm was used to warm-start cold arms."""
return {cold_arm: status[STATUS_WARM_STARTED_BY] for cold_arm, status in self.arm_to_status.items()
bkleyn marked this conversation as resolved.
Show resolved Hide resolved
if status[STATUS_WARM]}

@property
def trained_arms(self) -> List[Arm]:
"""List of trained arms.

Arms for which at least one decision has been observed are deemed trained."""
return [arm for arm in self.arms if self.arm_to_status[arm][STATUS_TRAINED]]

@property
def cold_arms(self) -> List[Arm]:
bkleyn marked this conversation as resolved.
Show resolved Hide resolved
"""List of cold arms"""
return [arm for arm in self.arms if ((not self.arm_to_status[arm][STATUS_TRAINED]) and
bkleyn marked this conversation as resolved.
Show resolved Hide resolved
(not self.arm_to_status[arm][STATUS_WARM]))]

def add_arm(self, arm: Arm, binarizer: Callable = None) -> NoReturn:
"""Introduces a new arm to the bandit.
Expand All @@ -94,12 +116,14 @@ def add_arm(self, arm: Arm, binarizer: Callable = None) -> NoReturn:
"""
self.arm_to_expectation[arm] = 0
self._uptake_new_arm(arm, binarizer)
self.arm_to_status[arm] = {STATUS_TRAINED: False, STATUS_WARM: False, STATUS_WARM_STARTED_BY: None}
bkleyn marked this conversation as resolved.
Show resolved Hide resolved

def remove_arm(self, arm: Arm) -> NoReturn:
"""Removes arm from the bandit.
"""
self.arm_to_expectation.pop(arm)
self._drop_existing_arm(arm)
self.arm_to_status.pop(arm)

@abc.abstractmethod
def fit(self, decisions: np.ndarray, rewards: np.ndarray,
Expand Down Expand Up @@ -138,11 +162,9 @@ def predict_expectations(self, contexts: Optional[np.ndarray] = None) -> Union[D
"""
pass

@abc.abstractmethod
def warm_start(self, arm_to_features: Dict[Arm, List[Num]], distance_quantile: float) -> NoReturn:
new_cold_arm_to_warm_arm = self._get_cold_arm_to_warm_arm(self.cold_arm_to_warm_arm, arm_to_features,
distance_quantile)
self._copy_arms(new_cold_arm_to_warm_arm)
self.cold_arm_to_warm_arm = {**self.cold_arm_to_warm_arm, **new_cold_arm_to_warm_arm}
pass

@abc.abstractmethod
def _copy_arms(self, cold_arm_to_warm_arm: Dict[Arm, Arm]) -> NoReturn:
Expand Down Expand Up @@ -194,19 +216,6 @@ def _parallel_fit(self, decisions: np.ndarray, rewards: np.ndarray,
arm, decisions, rewards, contexts)
for arm in self.arms)

# Get list of arms in decisions
# If decision is observed for cold arm, drop arm from cold arm dictionary
arms = np.unique(decisions).tolist()
for arm in arms:
if arm in self.cold_arm_to_warm_arm:
self.cold_arm_to_warm_arm.pop(arm)

# Set/update list of arms for which at least one decision has been observed
if len(self.trained_arms) == 0:
self.trained_arms = arms
else:
self.trained_arms = np.unique(self.trained_arms + arms).tolist()

def _parallel_predict(self, contexts: np.ndarray, is_predict: bool):

# Total number of contexts to predict
Expand Down Expand Up @@ -362,20 +371,17 @@ def _get_distance_threshold(distance_from_to: Dict[Arm, Dict[Arm, Num]], quantil

return threshold

def _get_cold_arm_to_warm_arm(self, cold_arm_to_warm_arm, arm_to_features, distance_quantile):
def _get_cold_arm_to_warm_arm(self, arm_to_features, distance_quantile):

# Calculate from-to distances between all pairs of arms based on features
# and then find minimum distance (threshold) required to warm start an untrained arm
distance_from_to = self._get_pairwise_distances(arm_to_features)
distance_threshold = self._get_distance_threshold(distance_from_to, quantile=distance_quantile)

# Cold arms
cold_arms = [arm for arm in self.arms if ((arm not in self.trained_arms) and (arm not in cold_arm_to_warm_arm))]

# New cold arm to warm arm dictionary
new_cold_arm_to_warm_arm = dict()

for cold_arm in cold_arms:
for cold_arm in self.cold_arms:

# Collect distance from cold arm to warm arms
arm_to_distance = {}
Expand All @@ -394,3 +400,31 @@ def _get_cold_arm_to_warm_arm(self, cold_arm_to_warm_arm, arm_to_features, dista
new_cold_arm_to_warm_arm[cold_arm] = closest_arm

return new_cold_arm_to_warm_arm

def _reset_arm_to_status(self):
self.arm_to_status: Dict[Arm, dict] = {arm: {STATUS_TRAINED: False, STATUS_WARM: False,
STATUS_WARM_STARTED_BY: None}
for arm in self.arms}

def _set_arms_as_trained(self, decisions: Optional[np.ndarray] = None, is_partial: bool = True):
"""Sets the given arms as trained, where arms are calculated from the ``decisions``.
bkleyn marked this conversation as resolved.
Show resolved Hide resolved
"""
# Calculate arms from decisions
arms = np.unique(decisions).tolist()

for arm in self.arms:
if arm in arms:
# All system arms are now trained
self.arm_to_status[arm][STATUS_TRAINED] = True

# If fitting from scratch, arm is no longer warm started
if not is_partial:
bkleyn marked this conversation as resolved.
Show resolved Hide resolved
self.arm_to_status[arm][STATUS_WARM] = False
self.arm_to_status[arm][STATUS_WARM_STARTED_BY] = None

def _warm_start(self, arm_to_features: Dict[Arm, List[Num]], distance_quantile: float) -> NoReturn:
bkleyn marked this conversation as resolved.
Show resolved Hide resolved
cold_arm_to_warm_arm = self._get_cold_arm_to_warm_arm(arm_to_features, distance_quantile)
self._copy_arms(cold_arm_to_warm_arm)
for cold_arm, warm_arm in cold_arm_to_warm_arm.items():
self.arm_to_status[cold_arm][STATUS_WARM] = True
self.arm_to_status[cold_arm][STATUS_WARM_STARTED_BY] = warm_arm
3 changes: 1 addition & 2 deletions mabwiser/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ def predict_expectations(self, contexts: np.ndarray = None) -> Union[Dict[Arm, N
return self._parallel_predict(contexts, is_predict=False)

def warm_start(self, arm_to_features: Dict[Arm, List[Num]], distance_quantile: float):
for c in range(self.n_clusters):
self.lp_list[c].warm_start(arm_to_features, distance_quantile)
pass

def _copy_arms(self, cold_arm_to_warm_arm):
pass
Expand Down
11 changes: 10 additions & 1 deletion mabwiser/greedy.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,19 @@ def fit(self, decisions: np.ndarray, rewards: np.ndarray, contexts: np.ndarray =
reset(self.arm_to_expectation, 0)

# Reset warm started arms
self.cold_arm_to_warm_arm = dict()
self._reset_arm_to_status()

self._parallel_fit(decisions, rewards, contexts)

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=False)

def partial_fit(self, decisions: np.ndarray, rewards: np.ndarray, contexts: np.ndarray = None) -> NoReturn:
self._parallel_fit(decisions, rewards, contexts)

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=True)

def predict(self, contexts: Optional[np.ndarray] = None) -> Union[Arm, List[Arm]]:

# Return the arm with maximum expectation
Expand Down Expand Up @@ -63,6 +69,9 @@ def predict_expectations(self, contexts: Optional[np.ndarray] = None) -> Union[D
else self.arm_to_expectation.copy() for index, exp in enumerate(random_values)]
return expectations

def warm_start(self, arm_to_features: Dict[Arm, List[Num]], distance_quantile: float):
self._warm_start(arm_to_features, distance_quantile)

def _copy_arms(self, cold_arm_to_warm_arm):
for cold_arm, warm_arm in cold_arm_to_warm_arm.items():
self.arm_to_sum[cold_arm] = deepcopy(self.arm_to_sum[warm_arm])
Expand Down
11 changes: 10 additions & 1 deletion mabwiser/linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,21 @@ def fit(self, decisions: np.ndarray, rewards: np.ndarray, contexts: np.ndarray =
self.arm_to_model[arm].init(num_features=self.num_features)

# Reset warm started arms
self.cold_arm_to_warm_arm = dict()
self._reset_arm_to_status()

# Perform parallel fit
self._parallel_fit(decisions, rewards, contexts)

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=False)

def partial_fit(self, decisions: np.ndarray, rewards: np.ndarray, contexts: np.ndarray = None) -> NoReturn:
# Perform parallel fit
self._parallel_fit(decisions, rewards, contexts)

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=True)

def predict(self, contexts: np.ndarray = None) -> Union[Arm, List[Arm]]:
# Return predict for the given context
return self._parallel_predict(contexts, is_predict=True)
Expand All @@ -174,6 +180,9 @@ def predict_expectations(self, contexts: np.ndarray = None) -> Union[Dict[Arm, N
# Return predict expectations for the given context
return self._parallel_predict(contexts, is_predict=False)

def warm_start(self, arm_to_features: Dict[Arm, List[Num]], distance_quantile: float):
self._warm_start(arm_to_features, distance_quantile)

def _copy_arms(self, cold_arm_to_warm_arm):
for cold_arm, warm_arm in cold_arm_to_warm_arm.items():
self.arm_to_model[cold_arm] = deepcopy(self.arm_to_model[warm_arm])
Expand Down
12 changes: 12 additions & 0 deletions mabwiser/mab.py
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,18 @@ def neighborhood_policy(self):
else:
return None

@property
def cold_arms(self) -> List[Arm]:
if not self.neighborhood_policy:
# No neighborhood policy, cold arms are calculated at the learning policy level
return self._imp.cold_arms

else:
skadio marked this conversation as resolved.
Show resolved Hide resolved
# With neighborhood policies, we end up training and doing inference within the neighborhood.
# Each neighborhood can have a different set of trained arms, and if warm start is used,
# a different set of cold arms. Therefore, cold arms aren't defined for neighborhood policies.
return list()

def add_arm(self, arm: Arm, binarizer: Callable = None) -> NoReturn:
""" Adds an _arm_ to the list of arms.

Expand Down
9 changes: 1 addition & 8 deletions mabwiser/neighbors.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,9 @@ def predict_expectations(self, contexts: np.ndarray = None) -> Union[Dict[Arm, N
return self._parallel_predict(contexts, is_predict=False)

def warm_start(self, arm_to_features: Dict[Arm, List[Num]], distance_quantile: float):
# Can only execute warm start when learning policy has been fit in _get_nhood_predictions
skadio marked this conversation as resolved.
Show resolved Hide resolved
self.arm_to_features = arm_to_features
self.distance_quantile = distance_quantile
pass

def _copy_arms(self, cold_arm_to_warm_arm):
# Copy arms executed on learning policy in _get_nhood_predictions
pass

def _fit_arm(self, arm: Arm, decisions: np.ndarray, rewards: np.ndarray, contexts: Optional[np.ndarray] = None):
Expand All @@ -104,10 +101,6 @@ def _get_nhood_predictions(self, lp, indices, row_2d, is_predict):
# Fit the decisions and rewards of the neighbors
lp.fit(self.decisions[indices], self.rewards[indices], self.contexts[indices])

# Warm start
if self.arm_to_features is not None:
lp.warm_start(self.arm_to_features, self.distance_quantile)

# Predict based on the neighbors
if is_predict:
return lp.predict(row_2d)
Expand Down
11 changes: 10 additions & 1 deletion mabwiser/softmax.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,25 @@ def fit(self, decisions: np.ndarray, rewards: np.ndarray, contexts: np.ndarray =
reset(self.arm_to_mean, 0)

# Reset warm started arms
self.cold_arm_to_warm_arm = dict()
self._reset_arm_to_status()

# Calculate fit
self._parallel_fit(decisions, rewards)
self._expectation_operation()

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=False)

def partial_fit(self, decisions: np.ndarray, rewards: np.ndarray,
contexts: Optional[np.ndarray] = None) -> NoReturn:

# Calculate fit
self._parallel_fit(decisions, rewards)
self._expectation_operation()

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=True)

def predict(self, contexts: Optional[np.ndarray] = None) -> Union[Arm, List[Arm]]:

# Return the arm with maximum expectation
Expand All @@ -70,6 +76,9 @@ def predict_expectations(self, contexts: Optional[np.ndarray] = None) -> Union[D
else:
return expectations

def warm_start(self, arm_to_features: Dict[Arm, List[Num]], distance_quantile: float):
self._warm_start(arm_to_features, distance_quantile)

def _copy_arms(self, cold_arm_to_warm_arm):
for cold_arm, warm_arm in cold_arm_to_warm_arm.items():
self.arm_to_sum[cold_arm] = deepcopy(self.arm_to_sum[warm_arm])
Expand Down
11 changes: 10 additions & 1 deletion mabwiser/thompson.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ def fit(self, decisions: np.ndarray, rewards: np.ndarray, contexts: np.ndarray =
reset(self.arm_to_fail_count, 1)

# Reset warm started arms
self.cold_arm_to_warm_arm = dict()
self._reset_arm_to_status()

# Calculate fit
self._parallel_fit(decisions, rewards)

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=False)

# Leave the calculation of expectations to predict methods

def partial_fit(self, decisions: np.ndarray, rewards: np.ndarray,
Expand All @@ -48,6 +51,9 @@ def partial_fit(self, decisions: np.ndarray, rewards: np.ndarray,
# Calculate fit
self._parallel_fit(decisions, rewards)

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=True)

def predict(self, contexts: Optional[np.ndarray] = None) -> Union[Arm, List[Arm]]:

# Return the arm with maximum expectation
Expand All @@ -74,6 +80,9 @@ def predict_expectations(self, contexts: Optional[np.ndarray] = None) -> Union[D
else:
return arm_to_expectation

def warm_start(self, arm_to_features: Dict[Arm, List[Num]], distance_quantile: float):
self._warm_start(arm_to_features, distance_quantile)

def _copy_arms(self, cold_arm_to_warm_arm):
for cold_arm, warm_arm in cold_arm_to_warm_arm.items():
self.arm_to_success_count[cold_arm] = deepcopy(self.arm_to_success_count[warm_arm])
Expand Down
11 changes: 4 additions & 7 deletions mabwiser/treebandit.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ def fit(self, decisions: np.ndarray, rewards: np.ndarray, contexts: np.ndarray =
self.arm_to_tree = {arm: DecisionTreeRegressor(**self.tree_parameters) for arm in self.arms}
self.arm_to_leaf_to_rewards = {arm: defaultdict(partial(np.ndarray, 0)) for arm in self.arms}

# Reset warm started arms
self.cold_arm_to_warm_arm = dict()

# If TS and a binarizer function is given, binarize the rewards
if isinstance(self.lp, _ThompsonSampling) and self.lp.binarizer:
self.lp.is_contextual_binarized = False
Expand Down Expand Up @@ -70,11 +67,11 @@ def predict_expectations(self, contexts: np.ndarray = None) -> Union[Dict[Arm, N

return self._parallel_predict(contexts, is_predict=False)

def warm_start(self, arm_to_features: Dict[Arm, List[Num]], distance_quantile: float):
pass

def _copy_arms(self, cold_arm_to_warm_arm):
for cold_arm, warm_arm in cold_arm_to_warm_arm.items():
self.arm_to_tree[cold_arm] = deepcopy(self.arm_to_tree[warm_arm])
self.arm_to_leaf_to_rewards[cold_arm] = deepcopy(self.arm_to_leaf_to_rewards[warm_arm])
self.arm_to_expectation[cold_arm] = deepcopy(self.arm_to_expectation[warm_arm])
pass

def _fit_arm(self, arm: Arm, decisions: np.ndarray, rewards: np.ndarray, contexts: Optional[np.ndarray] = None):

Expand Down
Loading