diff --git a/src/poprox_recommender/components/diversifiers/__init__.py b/src/poprox_recommender/components/diversifiers/__init__.py index c297858b..23be651c 100644 --- a/src/poprox_recommender/components/diversifiers/__init__.py +++ b/src/poprox_recommender/components/diversifiers/__init__.py @@ -1,5 +1,7 @@ +from poprox_recommender.components.diversifiers.calibration import Calibrator +from poprox_recommender.components.diversifiers.locality_calibration import LocalityCalibrator from poprox_recommender.components.diversifiers.mmr import MMRDiversifier from poprox_recommender.components.diversifiers.pfar import PFARDiversifier from poprox_recommender.components.diversifiers.topic_calibration import TopicCalibrator -__all__ = ["MMRDiversifier", "PFARDiversifier", "TopicCalibrator"] +__all__ = ["MMRDiversifier", "PFARDiversifier", "Calibrator", "TopicCalibrator", "LocalityCalibrator"] diff --git a/src/poprox_recommender/components/diversifiers/calibration.py b/src/poprox_recommender/components/diversifiers/calibration.py new file mode 100644 index 00000000..3a7f09f2 --- /dev/null +++ b/src/poprox_recommender/components/diversifiers/calibration.py @@ -0,0 +1,77 @@ +from collections import defaultdict + +import numpy as np + +from poprox_concepts import Article +from poprox_recommender.lkpipeline import Component +from poprox_recommender.topics import normalized_category_count + + +# General calibration uses MMR +# to rerank recommendations according to +# certain calibration context (e.g. news topic, locality) +class Calibrator(Component): + def __init__(self, theta: float = 0.1, num_slots=10): + # Theta term controls the score and calibration tradeoff, the higher + # the theta the higher the resulting recommendation will be calibrated. + self.theta = theta + self.num_slots = num_slots + + def __call__(): + pass + + def add_article_to_categories(self, rec_categories_with_candidate, article): + pass + + def normalized_categories_with_candidate(self, rec_categories, article): + rec_categories_with_candidate = rec_categories.copy() + self.add_article_to_categories(rec_categories_with_candidate, article) + return normalized_category_count(rec_categories_with_candidate) + + def calibration(self, relevance_scores, articles, preferences, theta, topk) -> list[Article]: + # MR_i = \theta * reward_i - (1 - \theta)*C(S + i) # C is calibration + # R is all candidates (not selected yet) + + recommendations = [] # final recommendation (topk index) + rec_categories = defaultdict(int) # frequency distribution of categories of S + + for _ in range(topk): + candidate = None # next item + best_candidate_score = float("-inf") + + for article_idx, article_score in enumerate(relevance_scores): # iterate R for next item + if article_idx in recommendations: + continue + + normalized_candidate_topics = self.normalized_categories_with_candidate( + rec_categories, articles[article_idx] + ) + calibration = compute_kl_divergence(preferences, normalized_candidate_topics) + + adjusted_candidate_score = (1 - theta) * article_score - (theta * calibration) + if adjusted_candidate_score > best_candidate_score: + best_candidate_score = adjusted_candidate_score + candidate = article_idx + + if candidate is not None: + recommendations.append(candidate) + self.add_article_to_categories(rec_categories, articles[candidate]) + + return recommendations + + +# from https://github.com/CCRI-POPROX/poprox-recommender/blob/feature/experiment0/tests/test_calibration.ipynb +def compute_kl_divergence(interacted_distr, reco_distr, kl_div=0.0, alpha=0.01): + """ + KL (p || q), the lower the better. + + alpha is not really a tuning parameter, it's just there to make the + computation more numerically stable. + """ + for category, score in interacted_distr.items(): + reco_score = reco_distr.get(category, 0.0) + reco_score = (1 - alpha) * reco_score + alpha * score + if reco_score != 0.0: + kl_div += score * np.log2(score / reco_score) + + return kl_div diff --git a/src/poprox_recommender/components/diversifiers/locality_calibration.py b/src/poprox_recommender/components/diversifiers/locality_calibration.py new file mode 100644 index 00000000..fa7a7299 --- /dev/null +++ b/src/poprox_recommender/components/diversifiers/locality_calibration.py @@ -0,0 +1,37 @@ +import torch as th + +from poprox_concepts import ArticleSet, InterestProfile +from poprox_recommender.components.diversifiers.calibration import Calibrator +from poprox_recommender.topics import extract_locality, normalized_category_count + + +# Locality Calibration uses MMR +# to rerank recommendations according to +# locality calibration +class LocalityCalibrator(Calibrator): + def __init__(self, theta: float = 0.1, num_slots=10): + super().__init__(theta, num_slots) + + def __call__(self, candidate_articles: ArticleSet, interest_profile: InterestProfile) -> ArticleSet: + normalized_locality_prefs = normalized_category_count(interest_profile.click_locality_counts) + + if candidate_articles.scores is not None: + article_scores = th.sigmoid(th.tensor(candidate_articles.scores)) + else: + article_scores = th.zeros(len(candidate_articles.articles)) + + article_scores = article_scores.cpu().detach().numpy() + + article_indices = self.calibration( + article_scores, + candidate_articles.articles, + normalized_locality_prefs, + self.theta, + topk=self.num_slots, + ) + return ArticleSet(articles=[candidate_articles.articles[int(idx)] for idx in article_indices]) + + def add_article_to_categories(self, rec_categories, article): + locality_list = extract_locality(article) + for locality in locality_list: + rec_categories[locality] = rec_categories.get(locality, 0) + 1 diff --git a/src/poprox_recommender/components/diversifiers/pfar.py b/src/poprox_recommender/components/diversifiers/pfar.py index 11443f8f..79ee9929 100644 --- a/src/poprox_recommender/components/diversifiers/pfar.py +++ b/src/poprox_recommender/components/diversifiers/pfar.py @@ -5,7 +5,7 @@ from poprox_concepts import Article, ArticleSet, InterestProfile from poprox_recommender.lkpipeline import Component from poprox_recommender.pytorch.decorators import torch_inference -from poprox_recommender.topics import GENERAL_TOPICS, extract_general_topics, normalized_topic_count +from poprox_recommender.topics import GENERAL_TOPICS, extract_general_topics, normalized_category_count class PFARDiversifier(Component): @@ -30,7 +30,7 @@ def __call__(self, candidate_articles: ArticleSet, interest_profile: InterestPro for topic, click_count in interest_profile.click_topic_counts.items(): topic_preferences[topic] = click_count - normalized_topic_prefs = normalized_topic_count(topic_preferences) + normalized_topic_prefs = normalized_category_count(topic_preferences) article_indices = pfar_diversification( article_scores, diff --git a/src/poprox_recommender/components/diversifiers/topic_calibration.py b/src/poprox_recommender/components/diversifiers/topic_calibration.py index 23031684..c2412ddd 100644 --- a/src/poprox_recommender/components/diversifiers/topic_calibration.py +++ b/src/poprox_recommender/components/diversifiers/topic_calibration.py @@ -1,23 +1,16 @@ from collections import defaultdict -import numpy as np import torch as th -from poprox_concepts import Article, ArticleSet, InterestProfile -from poprox_recommender.lkpipeline import Component -from poprox_recommender.topics import extract_general_topics, normalized_topic_count +from poprox_concepts import ArticleSet, InterestProfile +from poprox_recommender.components.diversifiers.calibration import Calibrator +from poprox_recommender.topics import extract_general_topics, normalized_category_count # Topic Calibration uses MMR # to rerank recommendations according to # topic calibration -class TopicCalibrator(Component): - def __init__(self, theta: float = 0.1, num_slots=10): - # Theta term controls the score and calibration tradeoff, the higher - # the theta the higher the resulting recommendation will be calibrated. - self.theta = theta - self.num_slots = num_slots - +class TopicCalibrator(Calibrator): def __call__(self, candidate_articles: ArticleSet, interest_profile: InterestProfile) -> ArticleSet: normalized_topic_prefs = self.compute_topic_dist(interest_profile) @@ -28,7 +21,7 @@ def __call__(self, candidate_articles: ArticleSet, interest_profile: InterestPro article_scores = article_scores.cpu().detach().numpy() - article_indices = topic_calibration( + article_indices = self.calibration( article_scores, candidate_articles.articles, normalized_topic_prefs, @@ -48,66 +41,10 @@ def compute_topic_dist(self, interest_profile): for topic, click_count in interest_profile.click_topic_counts.items(): topic_preferences[topic] += click_count - normalized_topic_prefs = normalized_topic_count(topic_preferences) + normalized_topic_prefs = normalized_category_count(topic_preferences) return normalized_topic_prefs - -def topic_calibration(relevance_scores, articles, topic_preferences, theta, topk) -> list[Article]: - # MR_i = \theta * reward_i - (1 - \theta)*C(S + i) # C is calibration - # R is all candidates (not selected yet) - - recommendations = [] # final recommendation (topk index) - rec_topics = defaultdict(int) # frequency distribution of topics of S - - for k in range(topk): - candidate = None # next item - best_candidate_score = float("-inf") - - for article_idx, article_score in enumerate(relevance_scores): # iterate R for next item - if article_idx in recommendations: - continue - - normalized_candidate_topics = normalized_topics_with_candidate(rec_topics, articles[article_idx]) - calibration = compute_kl_divergence(topic_preferences, normalized_candidate_topics) - - adjusted_candidate_score = (1 - theta) * article_score - (theta * calibration) - if adjusted_candidate_score > best_candidate_score: - best_candidate_score = adjusted_candidate_score - candidate = article_idx - - if candidate is not None: - recommendations.append(candidate) - add_article_to_topics(rec_topics, articles[candidate]) - - return recommendations - - -def add_article_to_topics(rec_topics, article): - topics = extract_general_topics(article) - for topic in topics: - rec_topics[topic] = rec_topics.get(topic, 0) + 1 - - -def normalized_topics_with_candidate(rec_topics, article): - rec_topics_with_candidate = rec_topics.copy() - add_article_to_topics(rec_topics_with_candidate, article) - return normalized_topic_count(rec_topics_with_candidate) - - -# from https://github.com/CCRI-POPROX/poprox-recommender/blob/feature/experiment0/tests/test_calibration.ipynb -def compute_kl_divergence(interacted_distr, reco_distr): - """ - KL (p || q), the lower the better. - - alpha is not really a tuning parameter, it's just there to make the - computation more numerically stable. - """ - kl_div = 0.0 - alpha = 0.01 - for genre, score in interacted_distr.items(): - reco_score = reco_distr.get(genre, 0.0) - reco_score = (1 - alpha) * reco_score + alpha * score - if reco_score != 0.0: - kl_div += score * np.log2(score / reco_score) - - return kl_div + def add_article_to_categories(self, rec_topics, article): + topics = extract_general_topics(article) + for topic in topics: + rec_topics[topic] = rec_topics.get(topic, 0) + 1 diff --git a/src/poprox_recommender/handler.py b/src/poprox_recommender/handler.py index 0450101a..f6f12ad2 100644 --- a/src/poprox_recommender/handler.py +++ b/src/poprox_recommender/handler.py @@ -4,7 +4,7 @@ from poprox_concepts import ArticleSet from poprox_concepts.api.recommendations import RecommendationRequest, RecommendationResponse from poprox_recommender.recommenders import select_articles -from poprox_recommender.topics import user_topic_preference +from poprox_recommender.topics import user_locality_preference, user_topic_preference logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -49,6 +49,7 @@ def generate_recs(event, context): clicked_articles = ArticleSet(articles=clicked_articles) profile.click_topic_counts = user_topic_preference(req.past_articles, profile.click_history) + profile.click_locality_counts = user_locality_preference(req.past_articles, profile.click_history) outputs = select_articles( candidate_articles, diff --git a/src/poprox_recommender/recommenders.py b/src/poprox_recommender/recommenders.py index 1d755850..f9103da8 100644 --- a/src/poprox_recommender/recommenders.py +++ b/src/poprox_recommender/recommenders.py @@ -3,7 +3,12 @@ from typing import Any from poprox_concepts import ArticleSet, InterestProfile -from poprox_recommender.components.diversifiers import MMRDiversifier, PFARDiversifier, TopicCalibrator +from poprox_recommender.components.diversifiers import ( + LocalityCalibrator, + MMRDiversifier, + PFARDiversifier, + TopicCalibrator, +) from poprox_recommender.components.embedders import NRMSArticleEmbedder, NRMSUserEmbedder from poprox_recommender.components.filters import TopicFilter from poprox_recommender.components.joiners import Fill @@ -85,7 +90,8 @@ def build_pipelines(num_slots: int, device: str) -> dict[str, Pipeline]: topk_ranker = TopkRanker(num_slots=num_slots) mmr = MMRDiversifier(num_slots=num_slots) pfar = PFARDiversifier(num_slots=num_slots) - calibrator = TopicCalibrator(num_slots=num_slots) + locality_calibrator = LocalityCalibrator(num_slots=num_slots) + topic_calibrator = TopicCalibrator(num_slots=num_slots) sampler = SoftmaxSampler(num_slots=num_slots, temperature=30.0) nrms_pipe = build_pipeline( @@ -112,11 +118,19 @@ def build_pipelines(num_slots: int, device: str) -> dict[str, Pipeline]: num_slots=num_slots, ) - cali_pipe = build_pipeline( - "NRMS+Calibration", + topic_cali_pipe = build_pipeline( + "NRMS+Topic+Calibration", article_embedder=article_embedder, user_embedder=user_embedder, - ranker=calibrator, + ranker=topic_calibrator, + num_slots=num_slots, + ) + + locality_cali_pipe = build_pipeline( + "NRMS+Locality+Calibration", + article_embedder=article_embedder, + user_embedder=user_embedder, + ranker=locality_calibrator, num_slots=num_slots, ) @@ -132,7 +146,8 @@ def build_pipelines(num_slots: int, device: str) -> dict[str, Pipeline]: "nrms": nrms_pipe, "mmr": mmr_pipe, "pfar": pfar_pipe, - "topic-cali": cali_pipe, + "topic-cali": topic_cali_pipe, + "locality-cali": locality_cali_pipe, "softmax": softmax_pipe, } diff --git a/src/poprox_recommender/topics.py b/src/poprox_recommender/topics.py index 9694ce27..b8c2be9e 100644 --- a/src/poprox_recommender/topics.py +++ b/src/poprox_recommender/topics.py @@ -16,6 +16,38 @@ def extract_general_topics(article: Article) -> set[str]: return article_topics.intersection(GENERAL_TOPICS) +def extract_locality_topics(article: Article) -> set[str]: + article_topics = set([mention.entity.name for mention in article.mentions]) + locality_topics = ["U.S. news", "World news", "Washington news"] + return article_topics.intersection(locality_topics) + + +def extract_locality_codes(article: Article) -> set[str]: + if "raw_data" in article and "subject" in article.raw_data: + article_codes = set([sub.code for sub in article.raw_data.subject if sub.code and len(sub.code) == 1]) + locality_codes = ["a", "i", "w"] + return article_codes.intersection(locality_codes) + return [] + + +def extract_locality(article: Article) -> list[str]: + topics = extract_general_topics(article) + codes = extract_locality_codes(article) + + us_criteria = ("U.S. news" in topics) or ("a" in codes) + world_criteria = ("World news" in topics) or ("i" in codes) + washington_criteria = ("Washington news" in topics) or ("w" in codes) + + if (us_criteria or washington_criteria) and world_criteria: + return ["US", "World"] + elif us_criteria or washington_criteria: + return ["US"] + elif world_criteria: + return ["World"] + else: + return ["Neither"] + + def find_topic(past_articles: list[Article], article_id: UUID): # each article might correspond to multiple topic for article in past_articles: @@ -23,9 +55,19 @@ def find_topic(past_articles: list[Article], article_id: UUID): return extract_general_topics(article) -def normalized_topic_count(topic_counts: dict[str, int]): - total_count = sum(topic_counts.values()) - normalized_counts = {key: value / total_count for key, value in topic_counts.items()} +def find_locality(past_articles: list[Article], article_id: UUID): + # each article might correspond to multiple locality: U.S., World, or neither + for article in past_articles: + if article.article_id == article_id: + return extract_locality(article) + + +def normalized_category_count(counts: dict[str, int]): + try: + total_count = sum(counts.values()) + normalized_counts = {key: value / total_count for key, value in counts.items()} + except Exception as _: + normalized_counts = {} return normalized_counts @@ -43,6 +85,19 @@ def user_topic_preference(past_articles: list[Article], click_history: list[Clic return topic_count_dict +def user_locality_preference(past_articles: list[Article], click_history: list[Click]) -> dict[str, int]: + clicked_articles = [c.article_id for c in click_history] # List[UUID] + + locality_count_dict = defaultdict(int) + + for article_id in clicked_articles: + clicked_locality = find_locality(past_articles, article_id) or set() + for locality in clicked_locality: + locality_count_dict[locality] += 1 + + return locality_count_dict + + def classify_news_topic(model, tokenizer, general_topics, topic): inputs = tokenizer.batch_encode_plus([topic] + general_topics, return_tensors="pt", pad_to_max_length=True) input_ids = inputs["input_ids"] diff --git a/tests/components/test_topic_calibration.py b/tests/components/test_calibration.py similarity index 51% rename from tests/components/test_topic_calibration.py rename to tests/components/test_calibration.py index 4e3d6a3a..670e0c7e 100644 --- a/tests/components/test_topic_calibration.py +++ b/tests/components/test_calibration.py @@ -11,6 +11,7 @@ from poprox_recommender.config import allow_data_test_failures from poprox_recommender.paths import project_root from poprox_recommender.recommenders import PipelineLoadError, select_articles +from poprox_recommender.topics import user_locality_preference, user_topic_preference logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -23,6 +24,9 @@ def test_request_with_topic_calibrator(): skip("request file does not exist") req = RecommendationRequest.model_validate_json(req_f.read_text()) + req.interest_profile.click_topic_counts = user_topic_preference( + req.past_articles, req.interest_profile.click_history + ) try: base_outputs = select_articles( @@ -53,3 +57,44 @@ def test_request_with_topic_calibrator(): # are the recommendation lists different? assert base_article_ids != calibrated_article_ids + + +def test_request_with_locality_calibrator(): + test_dir = project_root() / "tests" + req_f = test_dir / "request_data" / "request_body.json" + if allow_data_test_failures() and not req_f.exists(): + skip("request file does not exist") + + req = RecommendationRequest.model_validate_json(req_f.read_text()) + req.interest_profile.click_locality_counts = user_locality_preference( + req.past_articles, req.interest_profile.click_history + ) + try: + base_outputs = select_articles( + ArticleSet(articles=req.todays_articles), + ArticleSet(articles=req.past_articles), + req.interest_profile, + ) + locality_calibrated_outputs = select_articles( + ArticleSet(articles=req.todays_articles), + ArticleSet(articles=req.past_articles), + req.interest_profile, + pipeline_params={"pipeline": "locality-cali"}, + ) + except PipelineLoadError as e: + if allow_data_test_failures(): + xfail("data not pulled") + else: + raise e + + # do we get recommendations? + tco_recs = locality_calibrated_outputs.default.articles + bo_recs = base_outputs.default.articles + assert len(tco_recs) > 0 + assert len(bo_recs) == len(tco_recs) + + base_article_ids = [article.article_id for article in bo_recs] + calibrated_article_ids = [article.article_id for article in tco_recs] + + # are the recommendation lists different? + assert base_article_ids != calibrated_article_ids diff --git a/tests/test_topic_classification.py b/tests/test_topic_classification.py index 0f1a9557..4cd8285b 100644 --- a/tests/test_topic_classification.py +++ b/tests/test_topic_classification.py @@ -2,6 +2,7 @@ import logging import random +import pytest from pytest import skip from poprox_concepts import Article, ArticleSet, Click @@ -29,6 +30,7 @@ def load_test_articles(): return candidate, past, click_history, num_recs +@pytest.mark.skip(reason="too time intensive and not used by current prod logic") def test_topic_classification(): candidate, _, _, _ = load_test_articles() topic_matched_dict, todays_article_matched_topics = match_news_topics_to_general(candidate.articles) @@ -39,6 +41,7 @@ def test_topic_classification(): assert len(topic_matched_dict[article_topic]) > 0 +@pytest.mark.skip(reason="too time intensive and not used by current prod logic") def test_extract_generalized_topic(): candidate, _, _, _ = load_test_articles() for article in candidate.articles: