Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cold arm definition to MAB object #65

Merged
merged 15 commits into from
Feb 1, 2023
5 changes: 5 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
MABWiser CHANGELOG
=====================

January, 24, 2022 2.5.1
-------------------------------------------------------------------------------
minor:
bkleyn marked this conversation as resolved.
Show resolved Hide resolved
- Implement tracking of cold arms

January, 19, 2022 2.5.0
-------------------------------------------------------------------------------
major:
Expand Down
2 changes: 1 addition & 1 deletion mabwiser/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@

__author__ = "FMR LLC"
__email__ = "opensource@fmr.com"
__version__ = "2.5.0"
__version__ = "2.5.1"
dorukkilitcioglu marked this conversation as resolved.
Show resolved Hide resolved
__copyright__ = "Copyright (C), FMR LLC"
97 changes: 68 additions & 29 deletions mabwiser/base_mab.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
__version__ = __version__
__copyright__ = __copyright__

STATUS_TRAINED = 'trained'
dorukkilitcioglu marked this conversation as resolved.
Show resolved Hide resolved
STATUS_WARM = 'warm'
STATUS_WARM_STARTED_BY = 'warm_started_by'


class BaseMAB(metaclass=abc.ABCMeta):
"""Abstract base class for multi-armed bandits.
Expand Down Expand Up @@ -64,11 +68,11 @@ class BaseMAB(metaclass=abc.ABCMeta):
Default value is None. In this case the default backend selected by joblib will be used.
arm_to_expectation: Dict[Arm, float]
The dictionary of arms (keys) to their expected rewards (values).
cold_arm_to_warm_arm: Dict[Arm, Arm]:
Mapping indicating what arm was used to warm-start cold arms.
trained_arms: List[Arm]
List of trained arms.
Arms for which at least one decision has been observed are deemed trained.
arm_to_status: Dict[Arm, dict]
The dictionary of arms (keys) to their status (values), where the status consists of
bkleyn marked this conversation as resolved.
Show resolved Hide resolved
- ``trained``, which indicates whether an arm was ``fit`` or ``partial_fit``;
- ``warm``, which indicates whether an arm was warm started, and therefore has a trained model associated;
- and ``warm_started_by``, which indicates the arm that originally warm started this arm.
"""

@abc.abstractmethod
Expand All @@ -83,8 +87,25 @@ def __init__(self, rng: _BaseRNG, arms: List[Arm], n_jobs: int, backend: str = N
self.backend: str = backend

self.arm_to_expectation: Dict[Arm, float] = dict.fromkeys(self.arms, 0)
self.cold_arm_to_warm_arm: Dict[Arm, Arm] = dict()
self.trained_arms: List[Arm] = list()
self._reset_arm_to_status()

@property
def cold_arm_to_warm_arm(self) -> Dict[Arm, Arm]:
"""Mapping indicating what arm was used to warm-start cold arms."""
return {cold_arm: status[STATUS_WARM_STARTED_BY] for cold_arm, status in self.arm_to_status.items()
bkleyn marked this conversation as resolved.
Show resolved Hide resolved
if status[STATUS_WARM]}

@property
def trained_arms(self) -> List[Arm]:
"""List of trained arms.

Arms for which at least one decision has been observed are deemed trained."""
return [arm for arm in self.arms if self.arm_to_status[arm][STATUS_TRAINED]]

@property
def cold_arms(self) -> List[Arm]:
bkleyn marked this conversation as resolved.
Show resolved Hide resolved
return [arm for arm in self.arms if ((not self.arm_to_status[arm][STATUS_TRAINED]) and
bkleyn marked this conversation as resolved.
Show resolved Hide resolved
(not self.arm_to_status[arm][STATUS_WARM]))]

def add_arm(self, arm: Arm, binarizer: Callable = None) -> NoReturn:
"""Introduces a new arm to the bandit.
Expand All @@ -94,12 +115,14 @@ def add_arm(self, arm: Arm, binarizer: Callable = None) -> NoReturn:
"""
self.arm_to_expectation[arm] = 0
self._uptake_new_arm(arm, binarizer)
self.arm_to_status[arm] = {STATUS_TRAINED: False, STATUS_WARM: False, STATUS_WARM_STARTED_BY: None}
bkleyn marked this conversation as resolved.
Show resolved Hide resolved

def remove_arm(self, arm: Arm) -> NoReturn:
"""Removes arm from the bandit.
"""
self.arm_to_expectation.pop(arm)
self._drop_existing_arm(arm)
self.arm_to_status.pop(arm)

@abc.abstractmethod
def fit(self, decisions: np.ndarray, rewards: np.ndarray,
Expand Down Expand Up @@ -139,10 +162,11 @@ def predict_expectations(self, contexts: Optional[np.ndarray] = None) -> Union[D
pass

def warm_start(self, arm_to_features: Dict[Arm, List[Num]], distance_quantile: float) -> NoReturn:
new_cold_arm_to_warm_arm = self._get_cold_arm_to_warm_arm(self.cold_arm_to_warm_arm, arm_to_features,
distance_quantile)
self._copy_arms(new_cold_arm_to_warm_arm)
self.cold_arm_to_warm_arm = {**self.cold_arm_to_warm_arm, **new_cold_arm_to_warm_arm}
cold_arm_to_warm_arm = self._get_cold_arm_to_warm_arm(arm_to_features, distance_quantile)
self._copy_arms(cold_arm_to_warm_arm)
for cold_arm, warm_arm in cold_arm_to_warm_arm.items():
self.arm_to_status[cold_arm][STATUS_WARM] = True
self.arm_to_status[cold_arm][STATUS_WARM_STARTED_BY] = warm_arm

@abc.abstractmethod
def _copy_arms(self, cold_arm_to_warm_arm: Dict[Arm, Arm]) -> NoReturn:
Expand Down Expand Up @@ -194,19 +218,6 @@ def _parallel_fit(self, decisions: np.ndarray, rewards: np.ndarray,
arm, decisions, rewards, contexts)
for arm in self.arms)

# Get list of arms in decisions
# If decision is observed for cold arm, drop arm from cold arm dictionary
arms = np.unique(decisions).tolist()
for arm in arms:
if arm in self.cold_arm_to_warm_arm:
self.cold_arm_to_warm_arm.pop(arm)

# Set/update list of arms for which at least one decision has been observed
if len(self.trained_arms) == 0:
self.trained_arms = arms
else:
self.trained_arms = np.unique(self.trained_arms + arms).tolist()

def _parallel_predict(self, contexts: np.ndarray, is_predict: bool):

# Total number of contexts to predict
Expand Down Expand Up @@ -362,20 +373,17 @@ def _get_distance_threshold(distance_from_to: Dict[Arm, Dict[Arm, Num]], quantil

return threshold

def _get_cold_arm_to_warm_arm(self, cold_arm_to_warm_arm, arm_to_features, distance_quantile):
def _get_cold_arm_to_warm_arm(self, arm_to_features, distance_quantile):

# Calculate from-to distances between all pairs of arms based on features
# and then find minimum distance (threshold) required to warm start an untrained arm
distance_from_to = self._get_pairwise_distances(arm_to_features)
distance_threshold = self._get_distance_threshold(distance_from_to, quantile=distance_quantile)

# Cold arms
cold_arms = [arm for arm in self.arms if ((arm not in self.trained_arms) and (arm not in cold_arm_to_warm_arm))]

# New cold arm to warm arm dictionary
new_cold_arm_to_warm_arm = dict()

for cold_arm in cold_arms:
for cold_arm in self.cold_arms:

# Collect distance from cold arm to warm arms
arm_to_distance = {}
Expand All @@ -394,3 +402,34 @@ def _get_cold_arm_to_warm_arm(self, cold_arm_to_warm_arm, arm_to_features, dista
new_cold_arm_to_warm_arm[cold_arm] = closest_arm

return new_cold_arm_to_warm_arm

def _reset_arm_to_status(self):
self.arm_to_status: Dict[Arm, dict] = {arm: {STATUS_TRAINED: False, STATUS_WARM: False,
STATUS_WARM_STARTED_BY: None}
for arm in self.arms}

def _set_arms_as_trained(self, arms: Optional[List[Arm]] = None, decisions: Optional[np.ndarray] = None,
is_partial: bool = True):
"""Sets the given arms as trained.

Uses the ``arms`` if provided, otherwise calculates the arms from the ``decisions``.
dorukkilitcioglu marked this conversation as resolved.
Show resolved Hide resolved
Either ``arms`` or ``decisions`` should be provided.
"""
# Check args
if arms is None and decisions is None:
raise ValueError("Either ``arms`` or ``decisions`` should be specified.")

# Calculate arms from decisions if ``arms`` is ``None``
if arms is None:
# Get list of arms in decisions
arms = np.unique(decisions).tolist()

for arm in self.arms:
if arm in arms:
# All system arms are now trained
self.arm_to_status[arm][STATUS_TRAINED] = True

# If fitting from scratch, arm is no longer warm started
if not is_partial:
bkleyn marked this conversation as resolved.
Show resolved Hide resolved
self.arm_to_status[arm][STATUS_WARM] = False
self.arm_to_status[arm][STATUS_WARM_STARTED_BY] = None
8 changes: 7 additions & 1 deletion mabwiser/greedy.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,19 @@ def fit(self, decisions: np.ndarray, rewards: np.ndarray, contexts: np.ndarray =
reset(self.arm_to_expectation, 0)

# Reset warm started arms
self.cold_arm_to_warm_arm = dict()
self._reset_arm_to_status()

self._parallel_fit(decisions, rewards, contexts)

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=False)

def partial_fit(self, decisions: np.ndarray, rewards: np.ndarray, contexts: np.ndarray = None) -> NoReturn:
self._parallel_fit(decisions, rewards, contexts)

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=True)

def predict(self, contexts: Optional[np.ndarray] = None) -> Union[Arm, List[Arm]]:

# Return the arm with maximum expectation
Expand Down
8 changes: 7 additions & 1 deletion mabwiser/linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,21 @@ def fit(self, decisions: np.ndarray, rewards: np.ndarray, contexts: np.ndarray =
self.arm_to_model[arm].init(num_features=self.num_features)

# Reset warm started arms
self.cold_arm_to_warm_arm = dict()
self._reset_arm_to_status()

# Perform parallel fit
self._parallel_fit(decisions, rewards, contexts)

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=False)

def partial_fit(self, decisions: np.ndarray, rewards: np.ndarray, contexts: np.ndarray = None) -> NoReturn:
# Perform parallel fit
self._parallel_fit(decisions, rewards, contexts)

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=True)

def predict(self, contexts: np.ndarray = None) -> Union[Arm, List[Arm]]:
# Return predict for the given context
return self._parallel_predict(contexts, is_predict=True)
Expand Down
25 changes: 25 additions & 0 deletions mabwiser/mab.py
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,18 @@ def neighborhood_policy(self):
else:
return None

@property
def cold_arms(self) -> List[Arm]:
if not self.neighborhood_policy:
# No neighborhood policy, cold arms are calculated at the learning policy level
return self._imp.cold_arms

else:
skadio marked this conversation as resolved.
Show resolved Hide resolved
# With neighborhood policies, we end up training and doing inference within the neighborhood.
# Each neighborhood can have a different set of trained arms, and if warm start is used,
# a different set of cold arms. Therefore, cold arms aren't defined for neighborhood policies.
return list()

def add_arm(self, arm: Arm, binarizer: Callable = None) -> NoReturn:
""" Adds an _arm_ to the list of arms.

Expand Down Expand Up @@ -1491,3 +1503,16 @@ def __convert_context(self, contexts, decisions=None) -> Union[None, np.ndarray]

else:
raise NotImplementedError("Unsupported contexts data type")

def _refresh_cold_arms(self):
if not self.neighborhood_policy:
# Cold arms are the arms that are defined in the system, but haven't been trained or warm started
trained = set(self._imp.trained_arms)
warm = set(self._imp.cold_arm_to_warm_arm)
self.cold_arms = set(self.arms) - trained - warm

else:
# With neighborhood policies, we end up training and doing inference within the neighborhood.
# Each neighborhood can have a different set of trained arms, and if warm start is used,
# a different set of cold arms. Therefore, cold arms aren't defined for neighborhood policies.
self.cold_arms = set()
8 changes: 7 additions & 1 deletion mabwiser/softmax.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,25 @@ def fit(self, decisions: np.ndarray, rewards: np.ndarray, contexts: np.ndarray =
reset(self.arm_to_mean, 0)

# Reset warm started arms
self.cold_arm_to_warm_arm = dict()
self._reset_arm_to_status()

# Calculate fit
self._parallel_fit(decisions, rewards)
self._expectation_operation()

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=False)

def partial_fit(self, decisions: np.ndarray, rewards: np.ndarray,
contexts: Optional[np.ndarray] = None) -> NoReturn:

# Calculate fit
self._parallel_fit(decisions, rewards)
self._expectation_operation()

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=True)

def predict(self, contexts: Optional[np.ndarray] = None) -> Union[Arm, List[Arm]]:

# Return the arm with maximum expectation
Expand Down
8 changes: 7 additions & 1 deletion mabwiser/thompson.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ def fit(self, decisions: np.ndarray, rewards: np.ndarray, contexts: np.ndarray =
reset(self.arm_to_fail_count, 1)

# Reset warm started arms
self.cold_arm_to_warm_arm = dict()
self._reset_arm_to_status()

# Calculate fit
self._parallel_fit(decisions, rewards)

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=False)

# Leave the calculation of expectations to predict methods

def partial_fit(self, decisions: np.ndarray, rewards: np.ndarray,
Expand All @@ -48,6 +51,9 @@ def partial_fit(self, decisions: np.ndarray, rewards: np.ndarray,
# Calculate fit
self._parallel_fit(decisions, rewards)

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=True)

def predict(self, contexts: Optional[np.ndarray] = None) -> Union[Arm, List[Arm]]:

# Return the arm with maximum expectation
Expand Down
8 changes: 7 additions & 1 deletion mabwiser/treebandit.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def fit(self, decisions: np.ndarray, rewards: np.ndarray, contexts: np.ndarray =
self.arm_to_leaf_to_rewards = {arm: defaultdict(partial(np.ndarray, 0)) for arm in self.arms}

# Reset warm started arms
self.cold_arm_to_warm_arm = dict()
self._reset_arm_to_status()

# If TS and a binarizer function is given, binarize the rewards
if isinstance(self.lp, _ThompsonSampling) and self.lp.binarizer:
Expand All @@ -51,6 +51,9 @@ def fit(self, decisions: np.ndarray, rewards: np.ndarray, contexts: np.ndarray =
# Calculate fit
self._parallel_fit(decisions, rewards, contexts)

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=False)

def partial_fit(self, decisions: np.ndarray, rewards: np.ndarray, contexts: np.ndarray = None) -> NoReturn:

# If TS and a binarizer function is given, binarize the rewards
Expand All @@ -62,6 +65,9 @@ def partial_fit(self, decisions: np.ndarray, rewards: np.ndarray, contexts: np.n
# Calculate fit
self._parallel_fit(decisions, rewards, contexts)

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=True)

def predict(self, contexts: np.ndarray = None) -> Union[Arm, List[Arm]]:

return self._parallel_predict(contexts, is_predict=True)
Expand Down
8 changes: 7 additions & 1 deletion mabwiser/ucb.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@ def fit(self, decisions: np.ndarray, rewards: np.ndarray, contexts: np.ndarray =
reset(self.arm_to_expectation, 0)

# Reset warm started arms
self.cold_arm_to_warm_arm = dict()
self._reset_arm_to_status()

# Total number of decisions
self.total_count = len(decisions)

# Calculate fit
self._parallel_fit(decisions, rewards)

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=False)

def partial_fit(self, decisions: np.ndarray, rewards: np.ndarray,
contexts: Optional[np.ndarray] = None) -> NoReturn:

Expand All @@ -49,6 +52,9 @@ def partial_fit(self, decisions: np.ndarray, rewards: np.ndarray,
# Calculate fit
self._parallel_fit(decisions, rewards)

# Update trained arms
self._set_arms_as_trained(decisions=decisions, is_partial=True)

def predict(self, contexts: Optional[np.ndarray] = None) -> Union[Arm, List[Arm]]:

# Return the arm with maximum expectation
Expand Down
Loading