Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Neural Click Models #8

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,671 changes: 1,671 additions & 0 deletions notebooks/nn_response_evaluation.ipynb

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions pyproject.toml
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ repository = "https://github.com/sb-ai-lab/Sim4Rec"
python = ">=3.8, <3.10"
pyarrow = "*"
sdv = "0.15.0"
torch = "*"
torch = "1.9.1"
torchmetrics="*"
pandas = "*"
pyspark = ">=3.0"
pyspark = "3.1.3"
numpy = ">=1.20.0"
scipy = "*"
scipy = "1.5.4"
lightfm = {git = "https://github.com/lyst/lightfm", rev = "0c9c31e"}
notebook = "7.0.8"
torchvision = "0.10.1"

[tool.poetry.dev-dependencies]
# visualization
Expand Down
9 changes: 7 additions & 2 deletions sim4rec/response/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@
NoiseResponse,
CosineSimilatiry,
BernoulliResponse,
ParametricResponseFunction
ParametricResponseFunction,
)

from .nn_response import NNResponseTransformer, NNResponseEstimator


__all__ = [
'ActionModelEstimator',
'ActionModelTransformer',
'ConstantResponse',
'NoiseResponse',
'CosineSimilatiry',
'BernoulliResponse',
'ParametricResponseFunction'
'ParametricResponseFunction',
'NNResponseTransformer',
'NNResponseEstimator',
]
204 changes: 204 additions & 0 deletions sim4rec/response/nn_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import os
import pickle
import pyspark.sql.functions as sf

from .response import ActionModelEstimator, ActionModelTransformer
from .nn_utils.models import ResponseModel
from .nn_utils.embeddings import IndexEmbedding
from .nn_utils.datasets import (
RecommendationData,
# PandasRecommendationData,
)

from pyspark.sql.types import (
StructType,
StructField,
IntegerType,
DoubleType,
)

# move this to simulator core(?)
SIM_LOG_SCHEMA = StructType(
[
StructField("user_idx", IntegerType(), True),
StructField("item_idx", IntegerType(), True),
StructField("relevance", DoubleType(), True),
StructField("response_proba", DoubleType(), True),
StructField("response", IntegerType(), True),
StructField("__iter", IntegerType(), True),
]
)
SIM_LOG_COLS = [field.name for field in SIM_LOG_SCHEMA.fields]


class NNResponseTransformer(ActionModelTransformer):
def __init__(self, **kwargs):
super().__init__()
self.hist_data = None
for param, value in kwargs.items():
setattr(self, param, value)

@classmethod
def load(cls, checkpoint_dir):
with open(os.path.join(checkpoint_dir, "_params.pkl"), "rb") as f:
params_dict = pickle.load(f)
params_dict["backbone_response_model"] = ResponseModel.load(checkpoint_dir)
with open(os.path.join(checkpoint_dir, "_item_indexer.pkl"), "rb") as f:
params_dict["item_indexer"] = pickle.load(f)
with open(os.path.join(checkpoint_dir, "_user_indexer.pkl"), "rb") as f:
params_dict["user_indexer"] = pickle.load(f)
return cls(**params_dict)

def save(self, path):
"""Save model at given path."""
os.makedirs(path)
self.backbone_response_model.dump(path)
with open(os.path.join(path, "_item_indexer.pkl"), "wb") as f:
pickle.dump(self.item_indexer, f, pickle.HIGHEST_PROTOCOL)
with open(os.path.join(path, "_user_indexer.pkl"), "wb") as f:
pickle.dump(self.user_indexer, f, pickle.HIGHEST_PROTOCOL)
with open(os.path.join(path, "_params.pkl"), "wb") as f:
pickle.dump(
{
"outputCol": self.outputCol,
"log_dir": self.log_dir,
"hist_data_dir": self.hist_data_dir,
},
f,
pickle.HIGHEST_PROTOCOL,
)

def _transform(self, new_recs):
"""
Predict responses for given dataframe with recommendations.

:param dataframe: new recommendations.
"""

def predict_udf(df):
# if not do this, something unstable happens to the Method Resolution Order
from .nn_utils.datasets import PandasRecommendationData

dataset = PandasRecommendationData(
log=df,
item_indexer=self.item_indexer,
user_indexer=self.user_indexer,
)

# replacing clicks in datset with predicted
dataset = self.backbone_response_model.transform(dataset=dataset)

return dataset._log[SIM_LOG_COLS]

spark = new_recs.sql_ctx.sparkSession

# read the historical data
hist_data = spark.read.schema(SIM_LOG_SCHEMA).parquet(self.hist_data_dir)
if not hist_data:
print("Warning: the historical data is empty")
hist_data = spark.createDataFrame([], schema=SIM_LOG_SCHEMA)
# filter users whom we don't need
hist_data = hist_data.join(new_recs, on="user_idx", how="inner").select(
hist_data["*"]
)

# read the updated simulator log
simlog = spark.read.schema(SIM_LOG_SCHEMA).parquet(self.log_dir)
if not simlog:
print("Warning: the simulator log is empty")
simlog = spark.createDataFrame([], schema=SIM_LOG_SCHEMA)
# filter users whom we don't need
simlog = simlog.join(new_recs, on="user_idx", how="inner").select(simlog["*"])

NEW_ITER_NO = 9999999

# since all the historical records are older than simulated by design,
# and new slates are newer than simulated, i can simply concat it
combined_data = hist_data.unionByName(simlog).unionByName(
new_recs.withColumn("response_proba", sf.lit(0.0))
.withColumn("response", sf.lit(0.0))
.withColumn(
"__iter",
sf.lit(
NEW_ITER_NO
), # this is just a large number, TODO: add correct "__iter" field to sim4rec.sample_responses to avoid this constants
)
)

# not very optimal way, it makes one worker to
# operate with one user, discarding batched computations.
# TODO: add batch_id column and use one worker ?
groupping_column = "user_idx"
result_df = combined_data.groupby(groupping_column).applyInPandas(
predict_udf, SIM_LOG_SCHEMA
)
filtered_df = result_df.filter(sf.col("__iter") == NEW_ITER_NO)
return filtered_df.select(new_recs.columns + [self.outputCol])


class NNResponseEstimator(ActionModelEstimator):
def __init__(
self,
log_dir: str,
model_name: str,
hist_data_dir=None,
val_data_dir=None,
outputCol: str = "response_proba",
**kwargs,
):
"""
:param log_dir: The directory containing simulation logs.
:param model_name: Backbone model name.
:param hist_data_dir: (Optional) Spark DataFrame with historical data.
:param val_data_dir: (Optional) Spark DataFrame with validation data.
TODO: split automatically.
:param outputCol: Output column for MLLib pipeline.

"""
self.fit_params = kwargs
self.outputCol = outputCol

# sim log is not loaded immideately, because
# it can be not created when the response model is initialized
self.log_dir = log_dir
self.hist_data_dir = hist_data_dir
self.val_data_dir = val_data_dir

# create new model
self.item_indexer = self.user_indexer = None
self.model_name = model_name
self.backbone_response_model = None

def _fit(self, train_data):
"""
Fits the model on given data.

:param DataFrame train_data: Data to train on
"""
train_dataset = RecommendationData(
log=train_data,
item_indexer=self.item_indexer,
user_indexer=self.user_indexer,
)
self.item_indexer = train_dataset._item_indexer
self.user_indexer = train_dataset._user_indexer
val_dataset = RecommendationData(
log=train_data.sql_ctx.sparkSession.read.parquet(self.val_data_dir),
item_indexer=self.item_indexer,
user_indexer=self.user_indexer,
)
n_items = train_dataset.n_items
backbone_response_model = ResponseModel(
self.model_name, IndexEmbedding(n_items)
)
backbone_response_model.fit(
train_dataset, val_data=val_dataset, **self.fit_params
)
return NNResponseTransformer(
backbone_response_model=backbone_response_model,
item_indexer=self.item_indexer,
user_indexer=self.user_indexer,
hist_data_dir=self.hist_data_dir,
log_dir=self.log_dir,
outputCol=self.outputCol,
)
1 change: 1 addition & 0 deletions sim4rec/response/nn_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# __init__
Loading