Skip to content

Commit

Permalink
Create data loader for exported POPROX data
Browse files Browse the repository at this point in the history
This loads data exported from the POPROX database to be used with the generation and evaluation scripts in place of the MIND data.
  • Loading branch information
karlhigley committed Nov 12, 2024
1 parent ff5510a commit 740a1b4
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 20 deletions.
21 changes: 21 additions & 0 deletions src/poprox_recommender/data/eval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# pyright: basic
from __future__ import annotations

from typing import Generator
from uuid import UUID

import pandas as pd

from poprox_concepts.api.recommendations import RecommendationRequest


class EvalData:
def profile_truth(self, newsletter_id: UUID) -> pd.DataFrame | None: ...

def iter_profiles(self) -> Generator[RecommendationRequest]: ...

@property
def n_profiles(self) -> int: ...

@property
def n_articles(self) -> int: ...
13 changes: 7 additions & 6 deletions src/poprox_recommender/data/mind.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

from poprox_concepts import Article, Click, Entity, InterestProfile, Mention
from poprox_concepts.api.recommendations import RecommendationRequest
from poprox_recommender.data.eval import EvalData
from poprox_recommender.paths import project_root

logger = logging.getLogger(__name__)
TEST_REC_COUNT = 10


class MindData:
class MindData(EvalData):
"""
News and behavior data loaded from MIND data.
"""
Expand Down Expand Up @@ -65,17 +66,17 @@ def behavior_uuid_for_id(self, id: str) -> UUID:
return cast(UUID, self.behavior_df.loc[id, "uuid"])

@property
def n_users(self) -> int:
def n_profiles(self) -> int:
return self.behavior_df.shape[0]

@property
def n_articles(self) -> int:
return self.news_df.shape[0]

def user_truth(self, user: UUID) -> pd.DataFrame | None:
def profile_truth(self, user: UUID) -> pd.DataFrame | None:
"""
Look up the ground-truth data for a particular user, in LensKit format,
with item UUIDs for item IDs.
Look up the ground-truth data for a particular user profile,
in LensKit format with item UUIDs for item IDs.
"""
try:
uid = self.behavior_id_for_uuid(user)
Expand All @@ -96,7 +97,7 @@ def split_records():
truth["item"] = [self.news_uuid_for_id(aid) for aid in truth["mind_item_id"]]
return truth.set_index("item")

def iter_users(self) -> Generator[RecommendationRequest]:
def iter_profiles(self) -> Generator[RecommendationRequest]:
for row in self.behavior_df.itertuples():
clicked_ids: list[str] = row.clicked_news.split() # type: ignore
cand_pairs: list[str] = row.impressions.split() # type: ignore
Expand Down
156 changes: 156 additions & 0 deletions src/poprox_recommender/data/poprox.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""
Support for loading POPROX data for evaluation.
"""

# pyright: basic
from __future__ import annotations

import logging
from typing import Generator
from uuid import UUID, uuid4

import pandas as pd

from poprox_concepts import Article, Click, Entity, InterestProfile, Mention
from poprox_concepts.api.recommendations import RecommendationRequest
from poprox_recommender.data.eval import EvalData
from poprox_recommender.paths import project_root

logger = logging.getLogger(__name__)
TEST_REC_COUNT = 10


class PoproxData(EvalData):
clicks_df: pd.DataFrame
articles_df: pd.DataFrame

def __init__(self, archive: str = "POPROX"):
articles_df, mentions_df, newsletters_df, clicks_df, clicked_articles_df, clicked_mentions_df = (
load_poprox_frames(archive)
)

self.newsletters_df = newsletters_df

# index data frames for quick lookup of users & articles
self.mentions_df = mentions_df
self.articles_df = articles_df.set_index("article_id", drop=False)
if not self.articles_df.index.unique:
logger.warning("article data has non-unique index")

self.clicks_df = clicks_df
self.clicked_mentions_df = clicked_mentions_df
self.clicked_articles_df = clicked_articles_df.set_index("article_id", drop=False)
if not self.clicked_articles_df.index.unique:
logger.warning("clicked article data has non-unique index")

@property
def n_profiles(self) -> int:
return len(self.newsletters_df["newsletter_id"].unique())

@property
def n_articles(self) -> int:
return self.articles_df.shape[0]

def profile_truth(self, newsletter_id: UUID) -> pd.DataFrame | None:
# Create one row per clicked article with this newsletter_id
# Returned dataframe must have an "item" column containing the clicked article ids
# and the "item" column must be the index of the dataframe
newsletter_clicks = self.clicks_df[self.clicks_df["newsletter_id"] == newsletter_id]
return pd.DataFrame({"item": newsletter_clicks["article_id"]}).set_index("item")

def iter_profiles(self) -> Generator[RecommendationRequest]:
newsletter_ids = self.newsletters_df["newsletter_id"].unique()

for newsletter_id in newsletter_ids:
impressions_df = self.newsletters_df.loc[self.newsletters_df["newsletter_id"] == newsletter_id]
# TODO: Change `account_id` to `profile_id` in the export
profile_id = impressions_df.iloc[0]["account_id"]
newsletter_created_at = impressions_df.iloc[0]["created_at"]

# Filter clicks to those before the newsletter
profile_clicks_df = self.clicks_df.loc[self.clicks_df["profile_id"] == profile_id]
# TODO: Change `timestamp` to `created_at` in the export
filtered_clicks_df = profile_clicks_df[profile_clicks_df["timestamp"] < newsletter_created_at]

# Create Article and Click objects from dataframe rows
clicks = []
past_articles = []
for article_row in filtered_clicks_df.itertuples():
article = self.lookup_clicked_article(article_row.article_id)
past_articles.append(article)

clicks.append(
Click(
article_id=article_row.article_id,
newsletter_id=article_row.newsletter_id,
timestamp=article_row.timestamp,
)
)

# TODO: Fill in the onboarding topics
profile = InterestProfile(profile_id=uuid4(), click_history=clicks, onboarding_topics=[])

# Filter candidate articles to those ingested on the same day as the newsletter (today's articles)
candidate_articles = []
newsletter_date = newsletter_created_at.date()

for article_row in self.articles_df[
self.articles_df["created_at"].apply(lambda c: c.date()) == newsletter_date
].itertuples():
candidate_articles.append(self.lookup_candidate_article(article_row.article_id))

yield RecommendationRequest(
todays_articles=candidate_articles,
past_articles=past_articles,
interest_profile=profile,
num_recs=TEST_REC_COUNT,
)

def lookup_candidate_article(self, article_id: UUID):
article_row = self.articles_df.loc[str(article_id)]
mention_rows = self.mentions_df[self.mentions_df["article_id"] == article_row.article_id]
return self.convert_row_to_article(article_row, mention_rows)

def lookup_clicked_article(self, article_id: UUID):
article_row = self.clicked_articles_df.loc[str(article_id)]
mention_rows = self.clicked_mentions_df[self.clicked_mentions_df["article_id"] == article_row.article_id]
return self.convert_row_to_article(article_row, mention_rows)

def convert_row_to_article(self, article_row, mention_rows):
mentions = [
Mention(
mention_id=row.mention_id,
article_id=row.article_id,
source=row.source,
relevance=row.relevance,
entity=Entity(**row.entity),
)
for row in mention_rows.itertuples()
]

return Article(
article_id=article_row.article_id,
headline=article_row.headline,
subhead=article_row.subhead,
published_at=article_row.published_at,
mentions=mentions,
source="AP",
external_id="",
raw_data=article_row.raw_data,
)


def load_poprox_frames(archive: str = "POPROX"):
data = project_root() / "data"
logger.info("loading POPROX data from %s", archive)

newsletters_df = pd.read_parquet(data / "POPROX" / "newsletters.parquet")

articles_df = pd.read_parquet(data / "POPROX" / "articles.parquet")
mentions_df = pd.read_parquet(data / "POPROX" / "mentions.parquet")

clicks_df = pd.read_parquet(data / "POPROX" / "clicks.parquet")
clicked_articles_df = pd.read_parquet(data / "POPROX" / "clicked" / "articles.parquet")
clicked_mentions_df = pd.read_parquet(data / "POPROX" / "clicked" / "mentions.parquet")

return articles_df, mentions_df, newsletters_df, clicks_df, clicked_articles_df, clicked_mentions_df
25 changes: 17 additions & 8 deletions src/poprox_recommender/evaluation/evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
--log-file=FILE write log messages to FILE
-M DATA, --mind-data=DATA
read MIND test data DATA [default: MINDsmall_dev]
-P DATA, --poprox-data=DATA
read POPROX test data DATA
<name> the name of the evaluation to measure
"""

Expand All @@ -25,47 +27,54 @@
from docopt import docopt
from progress_api import make_progress

# from poprox_recommender.data.mind import MindData
from poprox_recommender.config import available_cpu_parallelism
from poprox_recommender.data.eval import EvalData
from poprox_recommender.data.mind import MindData
from poprox_recommender.data.poprox import PoproxData
from poprox_recommender.evaluation.metrics import UserRecs, measure_user_recs
from poprox_recommender.logging_config import setup_logging
from poprox_recommender.paths import project_root

logger = logging.getLogger("poprox_recommender.evaluation.evaluate")


def rec_users(mind_data: MindData, user_recs: pd.DataFrame) -> Iterator[UserRecs]:
def rec_users(eval_data: EvalData, user_recs: pd.DataFrame) -> Iterator[UserRecs]:
"""
Iterate over rec users, yielding each recommendation list with its truth and
whether the user is personalized. This supports parallel computation of the
final metrics.
"""
for user_id, recs in user_recs.groupby("user"):
user_id = UUID(str(user_id))
truth = mind_data.user_truth(user_id)
truth = eval_data.profile_truth(user_id)
assert truth is not None
yield UserRecs(user_id, recs.copy(), truth)


def user_eval_results(mind_data: MindData, user_recs: pd.DataFrame, n_procs: int) -> Iterator[list[dict[str, Any]]]:
def user_eval_results(eval_data: EvalData, user_recs: pd.DataFrame, n_procs: int) -> Iterator[list[dict[str, Any]]]:
if n_procs > 1:
logger.info("starting parallel measurement with %d workers", n_procs)
with ipp.Cluster(n=n_procs) as client:
lb = client.load_balanced_view()
yield from lb.imap(
measure_user_recs, rec_users(mind_data, user_recs), ordered=False, max_outstanding=n_procs * 10
measure_user_recs, rec_users(eval_data, user_recs), ordered=False, max_outstanding=n_procs * 10
)
else:
for user in rec_users(mind_data, user_recs):
for user in rec_users(eval_data, user_recs):
yield measure_user_recs(user)


def main():
options = docopt(__doc__) # type: ignore
setup_logging(verbose=options["--verbose"], log_file=options["--log-file"])

global mind_data
mind_data = MindData(options["--mind-data"])
global eval_data

if options["--poprox-data"]:
eval_data = PoproxData(options["--poprox-data"])
else:
eval_data = MindData(options["--mind-data"])

eval_name = options["<name>"]
logger.info("measuring evaluation %s", eval_name)
Expand All @@ -82,7 +91,7 @@ def main():
with (
make_progress(logger, "evaluate", total=n_users, unit="users") as pb,
):
for user_rows in user_eval_results(mind_data, recs_df, n_procs):
for user_rows in user_eval_results(eval_data, recs_df, n_procs):
records += user_rows
pb.update()

Expand Down
11 changes: 10 additions & 1 deletion src/poprox_recommender/evaluation/generate/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
write output to PATH [default: outputs/]
-M DATA, --mind-data=DATA
read MIND test data DATA [default: MINDsmall_dev]
-P DATA, --poprox-data=DATA
read POPROX test data DATA
-j N, --jobs=N
use N parallel jobs
--subset=N
Expand All @@ -27,6 +29,8 @@
from docopt import docopt

from poprox_recommender.config import available_cpu_parallelism
from poprox_recommender.data.mind import MindData
from poprox_recommender.data.poprox import PoproxData
from poprox_recommender.evaluation.generate.outputs import RecOutputs
from poprox_recommender.evaluation.generate.worker import generate_user_recs
from poprox_recommender.logging_config import setup_logging
Expand Down Expand Up @@ -58,7 +62,12 @@ def generate_main():
else:
n_jobs = available_cpu_parallelism(4)

worker_usage = generate_user_recs(options["--mind-data"], outputs, n_users, n_jobs)
if options["--poprox-data"]:
dataset = PoproxData(options["--poprox-data"])
elif options["--mind-data"]:
dataset = MindData(options["--mind-data"])

worker_usage = generate_user_recs(dataset, outputs, n_users, n_jobs)

logger.info("de-duplicating embeddings")
emb_df = pd.read_parquet(outputs.emb_temp_dir)
Expand Down
8 changes: 3 additions & 5 deletions src/poprox_recommender/evaluation/generate/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from poprox_concepts.api.recommendations import RecommendationRequest
from poprox_concepts.domain import ArticleSet
from poprox_recommender.config import default_device
from poprox_recommender.data.mind import TEST_REC_COUNT, MindData
from poprox_recommender.data.mind import TEST_REC_COUNT
from poprox_recommender.evaluation.generate.outputs import RecOutputs
from poprox_recommender.lkpipeline import Pipeline
from poprox_recommender.lkpipeline.state import PipelineState
Expand Down Expand Up @@ -162,13 +162,11 @@ def extract_recs(


def generate_user_recs(dataset: str, outs: RecOutputs, n_users: int | None = None, n_jobs: int = 1):
mind_data = MindData(dataset)

logger.info("generating recommendations")

user_iter = mind_data.iter_users()
user_iter = dataset.iter_users()
if n_users is None:
n_users = mind_data.n_users
n_users = dataset.n_users
logger.info("recommending for all %d users", n_users)
else:
logger.info("running on subset of %d users", n_users)
Expand Down

0 comments on commit 740a1b4

Please sign in to comment.