Skip to content

Commit

Permalink
Merge pull request #16 from zentavious/zent/generate_poprox
Browse files Browse the repository at this point in the history
Adding support for date and pipeline splitting to generate
  • Loading branch information
zentavious authored Dec 5, 2024
2 parents 06465da + 017f331 commit 5a9a701
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 25 deletions.
2 changes: 2 additions & 0 deletions outputs/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@
/mind-small-recommendations.parquet
/mind-small-user-metrics.csv.gz
/poprox.parquet
/recommendations/*
/embeddings.parquet
20 changes: 12 additions & 8 deletions src/poprox_recommender/components/generators/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@

model = SentenceTransformer("all-MiniLM-L6-v2")

client = OpenAI(
api_key="Put your key here",
)
dev_mode = True

if not dev_mode:
client = OpenAI(
api_key="Put your key here",
)


class ContextGenerator(Component):
Expand All @@ -24,11 +27,12 @@ def __init__(self, text_generation=False, time_decay=True, topk_similar=5, other
self.other_filter = other_filter

def __call__(self, clicked: ArticleSet, recommended: ArticleSet) -> ArticleSet:
for article in recommended.articles:
generated_subhead = generated_context(
article, clicked, self.time_decay, self.topk_similar, self.other_filter
)
article.subhead = generated_subhead
if not dev_mode:
for article in recommended.articles:
generated_subhead = generated_context(
article, clicked, self.time_decay, self.topk_similar, self.other_filter
)
article.subhead = generated_subhead

return recommended

Expand Down
20 changes: 17 additions & 3 deletions src/poprox_recommender/data/poprox.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# pyright: basic
from __future__ import annotations

from datetime import datetime
import logging
from typing import Generator
from uuid import UUID
Expand All @@ -21,7 +22,12 @@


class PoproxData(EvalData):
def __init__(self, archive: str = "POPROX"):
def __init__(
self,
archive: str = "POPROX",
start_date: datetime | None = None,
end_date: datetime | None = None,
):
(
articles_df,
mentions_df,
Expand All @@ -30,7 +36,7 @@ def __init__(self, archive: str = "POPROX"):
clicked_articles_df,
clicked_mentions_df,
interests_df,
) = load_poprox_frames(archive)
) = load_poprox_frames(archive, start_date, end_date)

self.newsletters_df = newsletters_df

Expand Down Expand Up @@ -159,11 +165,19 @@ def convert_row_to_article(self, article_row, mention_rows):
)


def load_poprox_frames(archive: str = "POPROX"):
def load_poprox_frames(archive: str = "POPROX", start_date: datetime | None = None, end_date: datetime | None = None):
data = project_root() / "data"
logger.info("loading POPROX data from %s", archive)

newsletters_df = pd.read_parquet(data / "POPROX" / "newsletters.parquet")
newsletters_df["created_at_date"] = pd.to_datetime(newsletters_df["created_at"])

if start_date:
logger.info("loading newsleters on or after %s", start_date)
newsletters_df = newsletters_df[newsletters_df["created_at_date"] >= start_date]
if end_date:
logger.info("loading newsleters before %s", end_date)
newsletters_df = newsletters_df[newsletters_df["created_at_date"] < end_date]

articles_df = pd.read_parquet(data / "POPROX" / "articles.parquet")
mentions_df = pd.read_parquet(data / "POPROX" / "mentions.parquet")
Expand Down
23 changes: 15 additions & 8 deletions src/poprox_recommender/evaluation/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
write output to FILE [default: outputs/recommendations.parquet]
-M DATA, --mind-data=DATA
read MIND test data DATA
--data_path=<data_path>
path to PopRox data
-P DATA, --poprox-data=DATA
read POPROX test data DATA
--subset=N
test only on the first N test users
--pipelines=<pipelines>...
Expand Down Expand Up @@ -174,12 +174,19 @@ def generate_user_recs(data: EvalData, pipe_names: list[str] | None = None, n_us
pipelines = options["--pipelines"]
print("Pipelines:", pipelines)

mind_data = options["--mind-data"]
data_path = options["--data_path"]
if mind_data is not None:
user_recs = generate_user_recs(MindData(mind_data), pipelines, n_users)
elif data_path is not None:
user_recs = generate_user_recs(PoproxData(data_path), pipelines, n_users)
if options["--poprox-data"]:
eval_data = PoproxData(options["--poprox-data"])
else:
eval_data = MindData(options["--mind-data"])

user_recs = generate_user_recs(eval_data, pipelines, n_users)

# mind_data = options["--mind-data"]
# data_path = options["--data_path"]
# if mind_data is not None:
# user_recs = generate_user_recs(MindData(mind_data), pipelines, n_users)
# elif data_path is not None:
# user_recs = generate_user_recs(PoproxData(data_path), pipelines, n_users)

all_recs = pd.concat(user_recs, ignore_index=True)
out_fn = options["--output"]
Expand Down
30 changes: 28 additions & 2 deletions src/poprox_recommender/evaluation/generate/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,19 @@
use N parallel jobs
--subset=N
test only on the first N test profiles
--start_date=START_DATE
regenerate newsletters on and after START_DATE in the form mm/dd/yyyy
--end_date=END_DATE
regenerate newsleters before END_DATE in the form mm/dd/yyyy
--click_threshold=N
test only profiles with N clicks from start_date to end_date
--pipelines=<pipelines>...
list of pipeline names (separated by spaces)
"""

import logging
import shutil
from datetime import datetime
from pathlib import Path

import pandas as pd
Expand Down Expand Up @@ -62,12 +71,29 @@ def generate_main():
else:
n_jobs = available_cpu_parallelism(4)

# parse start and end dates
start_date = None
end_date = None
if options["--start_date"]:
start_date = datetime.strptime(options["--start_date"], "%m/%d/%Y")
if options["--end_date"]:
end_date = datetime.strptime(options["--end_date"], "%m/%d/%Y")

# subset pipelines

if options["--poprox-data"]:
dataset = PoproxData(options["--poprox-data"])
dataset = PoproxData(options["--poprox-data"], start_date, end_date)
elif options["--mind-data"]:
dataset = MindData(options["--mind-data"])

worker_usage = generate_profile_recs(dataset, outputs, n_profiles, n_jobs)
pipelines = None
if options["--pipelines"]:
pipelines = options["--pipelines"]
if isinstance(pipelines, str):
pipelines = [pipelines]
logger.info("generating pipelines: %s", pipelines)

worker_usage = generate_profile_recs(dataset, outputs, pipelines, n_profiles, n_jobs)

logger.info("de-duplicating embeddings")
emb_df = pd.read_parquet(outputs.emb_temp_dir)
Expand Down
12 changes: 8 additions & 4 deletions src/poprox_recommender/evaluation/generate/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
_emb_seen: set[UUID]


def _init_worker(outs: RecOutputs):
def _init_worker(outs: RecOutputs, pipelines: list[str] | None):
global _worker_out, _emb_seen, _pipelines
proc = mp.current_process()
_worker_out = outs
Expand All @@ -38,6 +38,8 @@ def _init_worker(outs: RecOutputs):
_worker_out.open(proc.pid)

_pipelines = recommendation_pipelines(device=default_device())
if pipelines:
_pipelines = {name: _pipelines[name] for name in pipelines}


def _finish_worker():
Expand Down Expand Up @@ -161,7 +163,9 @@ def extract_recs(
return output_df, embeddings


def generate_profile_recs(dataset: str, outs: RecOutputs, n_profiles: int | None = None, n_jobs: int = 1):
def generate_profile_recs(
dataset: str, outs: RecOutputs, pipelines: list[str] | None = None, n_profiles: int | None = None, n_jobs: int = 1
):
logger.info("generating recommendations")

profile_iter = dataset.iter_profiles()
Expand All @@ -179,7 +183,7 @@ def generate_profile_recs(dataset: str, outs: RecOutputs, n_profiles: int | None
with ipp.Cluster(n=n_jobs) as client:
dv = client.direct_view()
logger.debug("initializing workers")
dv.apply_sync(_init_worker, outs)
dv.apply_sync(_init_worker, outs, pipelines)

logger.debug("dispatching jobs")
lbv = client.load_balanced_view()
Expand All @@ -192,7 +196,7 @@ def generate_profile_recs(dataset: str, outs: RecOutputs, n_profiles: int | None

else:
# directly call things in-process
_init_worker(outs)
_init_worker(outs, pipelines)

for request in profile_iter:
_generate_for_request(request)
Expand Down

0 comments on commit 5a9a701

Please sign in to comment.