diff --git a/outputs/.gitignore b/outputs/.gitignore index 2fcb03f7..f38f099f 100644 --- a/outputs/.gitignore +++ b/outputs/.gitignore @@ -6,3 +6,5 @@ /mind-small-recommendations.parquet /mind-small-user-metrics.csv.gz /poprox.parquet +/recommendations/* +/embeddings.parquet diff --git a/src/poprox_recommender/components/generators/context.py b/src/poprox_recommender/components/generators/context.py index 52a17f61..7797343c 100644 --- a/src/poprox_recommender/components/generators/context.py +++ b/src/poprox_recommender/components/generators/context.py @@ -11,9 +11,12 @@ model = SentenceTransformer("all-MiniLM-L6-v2") -client = OpenAI( - api_key="Put your key here", -) +dev_mode = True + +if not dev_mode: + client = OpenAI( + api_key="Put your key here", + ) class ContextGenerator(Component): @@ -24,11 +27,12 @@ def __init__(self, text_generation=False, time_decay=True, topk_similar=5, other self.other_filter = other_filter def __call__(self, clicked: ArticleSet, recommended: ArticleSet) -> ArticleSet: - for article in recommended.articles: - generated_subhead = generated_context( - article, clicked, self.time_decay, self.topk_similar, self.other_filter - ) - article.subhead = generated_subhead + if not dev_mode: + for article in recommended.articles: + generated_subhead = generated_context( + article, clicked, self.time_decay, self.topk_similar, self.other_filter + ) + article.subhead = generated_subhead return recommended diff --git a/src/poprox_recommender/data/poprox.py b/src/poprox_recommender/data/poprox.py index 366a9864..309c3c98 100644 --- a/src/poprox_recommender/data/poprox.py +++ b/src/poprox_recommender/data/poprox.py @@ -5,6 +5,7 @@ # pyright: basic from __future__ import annotations +from datetime import datetime import logging from typing import Generator from uuid import UUID @@ -21,7 +22,12 @@ class PoproxData(EvalData): - def __init__(self, archive: str = "POPROX"): + def __init__( + self, + archive: str = "POPROX", + start_date: datetime | None = None, + end_date: datetime | None = None, + ): ( articles_df, mentions_df, @@ -30,7 +36,7 @@ def __init__(self, archive: str = "POPROX"): clicked_articles_df, clicked_mentions_df, interests_df, - ) = load_poprox_frames(archive) + ) = load_poprox_frames(archive, start_date, end_date) self.newsletters_df = newsletters_df @@ -159,11 +165,19 @@ def convert_row_to_article(self, article_row, mention_rows): ) -def load_poprox_frames(archive: str = "POPROX"): +def load_poprox_frames(archive: str = "POPROX", start_date: datetime | None = None, end_date: datetime | None = None): data = project_root() / "data" logger.info("loading POPROX data from %s", archive) newsletters_df = pd.read_parquet(data / "POPROX" / "newsletters.parquet") + newsletters_df["created_at_date"] = pd.to_datetime(newsletters_df["created_at"]) + + if start_date: + logger.info("loading newsleters on or after %s", start_date) + newsletters_df = newsletters_df[newsletters_df["created_at_date"] >= start_date] + if end_date: + logger.info("loading newsleters before %s", end_date) + newsletters_df = newsletters_df[newsletters_df["created_at_date"] < end_date] articles_df = pd.read_parquet(data / "POPROX" / "articles.parquet") mentions_df = pd.read_parquet(data / "POPROX" / "mentions.parquet") diff --git a/src/poprox_recommender/evaluation/generate.py b/src/poprox_recommender/evaluation/generate.py index 055ce9fb..5a41831c 100644 --- a/src/poprox_recommender/evaluation/generate.py +++ b/src/poprox_recommender/evaluation/generate.py @@ -13,8 +13,8 @@ write output to FILE [default: outputs/recommendations.parquet] -M DATA, --mind-data=DATA read MIND test data DATA - --data_path= - path to PopRox data + -P DATA, --poprox-data=DATA + read POPROX test data DATA --subset=N test only on the first N test users --pipelines=... @@ -174,12 +174,19 @@ def generate_user_recs(data: EvalData, pipe_names: list[str] | None = None, n_us pipelines = options["--pipelines"] print("Pipelines:", pipelines) - mind_data = options["--mind-data"] - data_path = options["--data_path"] - if mind_data is not None: - user_recs = generate_user_recs(MindData(mind_data), pipelines, n_users) - elif data_path is not None: - user_recs = generate_user_recs(PoproxData(data_path), pipelines, n_users) + if options["--poprox-data"]: + eval_data = PoproxData(options["--poprox-data"]) + else: + eval_data = MindData(options["--mind-data"]) + + user_recs = generate_user_recs(eval_data, pipelines, n_users) + + # mind_data = options["--mind-data"] + # data_path = options["--data_path"] + # if mind_data is not None: + # user_recs = generate_user_recs(MindData(mind_data), pipelines, n_users) + # elif data_path is not None: + # user_recs = generate_user_recs(PoproxData(data_path), pipelines, n_users) all_recs = pd.concat(user_recs, ignore_index=True) out_fn = options["--output"] diff --git a/src/poprox_recommender/evaluation/generate/__main__.py b/src/poprox_recommender/evaluation/generate/__main__.py index 9d4c0a39..ccbeb400 100644 --- a/src/poprox_recommender/evaluation/generate/__main__.py +++ b/src/poprox_recommender/evaluation/generate/__main__.py @@ -19,10 +19,19 @@ use N parallel jobs --subset=N test only on the first N test profiles + --start_date=START_DATE + regenerate newsletters on and after START_DATE in the form mm/dd/yyyy + --end_date=END_DATE + regenerate newsleters before END_DATE in the form mm/dd/yyyy + --click_threshold=N + test only profiles with N clicks from start_date to end_date + --pipelines=... + list of pipeline names (separated by spaces) """ import logging import shutil +from datetime import datetime from pathlib import Path import pandas as pd @@ -62,12 +71,29 @@ def generate_main(): else: n_jobs = available_cpu_parallelism(4) + # parse start and end dates + start_date = None + end_date = None + if options["--start_date"]: + start_date = datetime.strptime(options["--start_date"], "%m/%d/%Y") + if options["--end_date"]: + end_date = datetime.strptime(options["--end_date"], "%m/%d/%Y") + + # subset pipelines + if options["--poprox-data"]: - dataset = PoproxData(options["--poprox-data"]) + dataset = PoproxData(options["--poprox-data"], start_date, end_date) elif options["--mind-data"]: dataset = MindData(options["--mind-data"]) - worker_usage = generate_profile_recs(dataset, outputs, n_profiles, n_jobs) + pipelines = None + if options["--pipelines"]: + pipelines = options["--pipelines"] + if isinstance(pipelines, str): + pipelines = [pipelines] + logger.info("generating pipelines: %s", pipelines) + + worker_usage = generate_profile_recs(dataset, outputs, pipelines, n_profiles, n_jobs) logger.info("de-duplicating embeddings") emb_df = pd.read_parquet(outputs.emb_temp_dir) diff --git a/src/poprox_recommender/evaluation/generate/worker.py b/src/poprox_recommender/evaluation/generate/worker.py index 206bf79e..2dc197fa 100644 --- a/src/poprox_recommender/evaluation/generate/worker.py +++ b/src/poprox_recommender/evaluation/generate/worker.py @@ -29,7 +29,7 @@ _emb_seen: set[UUID] -def _init_worker(outs: RecOutputs): +def _init_worker(outs: RecOutputs, pipelines: list[str] | None): global _worker_out, _emb_seen, _pipelines proc = mp.current_process() _worker_out = outs @@ -38,6 +38,8 @@ def _init_worker(outs: RecOutputs): _worker_out.open(proc.pid) _pipelines = recommendation_pipelines(device=default_device()) + if pipelines: + _pipelines = {name: _pipelines[name] for name in pipelines} def _finish_worker(): @@ -161,7 +163,9 @@ def extract_recs( return output_df, embeddings -def generate_profile_recs(dataset: str, outs: RecOutputs, n_profiles: int | None = None, n_jobs: int = 1): +def generate_profile_recs( + dataset: str, outs: RecOutputs, pipelines: list[str] | None = None, n_profiles: int | None = None, n_jobs: int = 1 +): logger.info("generating recommendations") profile_iter = dataset.iter_profiles() @@ -179,7 +183,7 @@ def generate_profile_recs(dataset: str, outs: RecOutputs, n_profiles: int | None with ipp.Cluster(n=n_jobs) as client: dv = client.direct_view() logger.debug("initializing workers") - dv.apply_sync(_init_worker, outs) + dv.apply_sync(_init_worker, outs, pipelines) logger.debug("dispatching jobs") lbv = client.load_balanced_view() @@ -192,7 +196,7 @@ def generate_profile_recs(dataset: str, outs: RecOutputs, n_profiles: int | None else: # directly call things in-process - _init_worker(outs) + _init_worker(outs, pipelines) for request in profile_iter: _generate_for_request(request)