Skip to content

Commit

Permalink
Merge pull request #220 from twitter/jbaxter/2024_04_26
Browse files Browse the repository at this point in the history
Freeze rater parameters in final scoring, turn on status locking, parquet output + more column output
  • Loading branch information
jbaxter authored Apr 26, 2024
2 parents 998fa4b + e02a7ec commit adbc126
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 24 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ pandas==2.1.4
torch==2.1.2
scipy==1.11.4
scikit-learn>=1.3.0
pyarrow
8 changes: 8 additions & 0 deletions sourcecode/scoring/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,12 @@ def rater_factor_key(i):
expansionRatingStatusKey = "expansionRatingStatus"
expansionNoteInterceptMaxKey = "expansionNoteInterceptMax"
expansionNoteInterceptMinKey = "expansionNoteInterceptMin"
expansionInternalActiveRulesKey = "expansionActiveRules"
# ExpansionPlus Model
expansionPlusNoteInterceptKey = "expansionPlusNoteIntercept"
expansionPlusNoteFactor1Key = "expansionPlusNoteFactor1"
expansionPlusRatingStatusKey = "expansionPlusRatingStatus"
expansionPlusInternalActiveRulesKey = "expansionPlusActiveRules"
# Coverage / Helpfulness Reputation Model
coverageNoteInterceptKey = "coverageNoteIntercept"
coverageNoteFactor1Key = "coverageNoteFactor1"
Expand All @@ -153,11 +155,13 @@ def rater_factor_key(i):
groupNoteInterceptMinKey = "groupNoteInterceptMin"
groupRaterInterceptKey = "groupRaterIntercept"
groupRaterFactor1Key = "groupRaterFactor1"
groupInternalActiveRulesKey = "groupActiveRules"
# Topic Model
topicNoteInterceptKey = "topicNoteIntercept"
topicNoteFactor1Key = "topicNoteFactor1"
topicRatingStatusKey = "topicRatingStatus"
topicNoteConfidentKey = "topicNoteConfident"
topicInternalActiveRulesKey = "topicActiveRules"
# Harassment/Abuse Tag
harassmentNoteInterceptKey = "harassmentNoteIntercept"
harassmentNoteFactor1Key = "harassmentNoteFactor1"
Expand Down Expand Up @@ -558,6 +562,10 @@ def rater_factor_key(i):
(topicRatingStatusKey, str),
(noteTopicKey, str),
(topicNoteConfidentKey, str),
(expansionInternalActiveRulesKey, str),
(expansionPlusInternalActiveRulesKey, str),
(groupInternalActiveRulesKey, str),
(topicInternalActiveRulesKey, str),
]
noteModelOutputTSVColumns = [col for (col, dtype) in noteModelOutputTSVColumnsAndTypes]
noteModelOutputTSVTypeMapping = {col: dtype for (col, dtype) in noteModelOutputTSVColumnsAndTypes}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ def run_mf(
globalInterceptInit: Optional[float] = None,
specificNoteId: Optional[int] = None,
validatePercent: Optional[float] = None,
freezeRaterParameters: bool = False,
):
"""Train matrix factorization model.
Expand All @@ -466,6 +467,8 @@ def run_mf(
self._create_mf_model(noteInit, userInit, globalInterceptInit)
assert self.mf_model is not None

if freezeRaterParameters:
self.mf_model._freeze_parameters(set({"user"}))
if specificNoteId is not None:
self.mf_model.freeze_rater_and_global_parameters()
self.prepare_features_and_labels(specificNoteId)
Expand Down
14 changes: 14 additions & 0 deletions sourcecode/scoring/mf_base_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,18 @@ def _prescore_notes_and_users(
if self._saveIntermediateState:
self.helpfulnessScores = helpfulnessScores

## One extra final round!
# Filter ratings based on prev helpfulness scores
finalRoundRatings = helpfulness_scores.filter_ratings_by_helpfulness_scores(
ratingsForTraining, helpfulnessScores
)
# Run MF
noteParamsUnfiltered, raterParamsUnfiltered, globalBias = self._mfRanker.run_mf(
ratings=finalRoundRatings,
noteInit=noteParamsUnfiltered,
userInit=raterParamsUnfiltered,
)

raterModelOutput = raterParamsUnfiltered.merge(
helpfulnessScores[
[
Expand Down Expand Up @@ -644,6 +656,8 @@ def _score_notes_and_users(
ratings=finalRoundRatings,
noteInit=prescoringNoteModelOutput,
userInit=prescoringRaterModelOutput,
globalInterceptInit=0.17,
freezeRaterParameters=True,
)

if self._saveIntermediateState:
Expand Down
3 changes: 2 additions & 1 deletion sourcecode/scoring/mf_expansion_plus_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def _get_note_col_mapping(self) -> Dict[str, str]:
c.internalNoteInterceptKey: c.expansionPlusNoteInterceptKey,
c.internalNoteFactor1Key: c.expansionPlusNoteFactor1Key,
c.internalRatingStatusKey: c.expansionPlusRatingStatusKey,
c.internalActiveRulesKey: c.expansionPlusInternalActiveRulesKey,
}

def get_scored_notes_cols(self) -> List[str]:
Expand All @@ -46,6 +47,7 @@ def get_scored_notes_cols(self) -> List[str]:
c.expansionPlusNoteInterceptKey,
c.expansionPlusNoteFactor1Key,
c.expansionPlusRatingStatusKey,
c.expansionPlusInternalActiveRulesKey,
]

def get_helpfulness_scores_cols(self) -> List[str]:
Expand All @@ -60,7 +62,6 @@ def _get_dropped_note_cols(self) -> List[str]:
"""Returns a list of columns which should be excluded from scoredNotes and auxiliaryNoteInfo."""
return super()._get_dropped_note_cols() + (
[
c.internalActiveRulesKey,
c.activeFilterTagsKey,
c.ratingWeightKey,
c.noteInterceptMinKey,
Expand Down
3 changes: 2 additions & 1 deletion sourcecode/scoring/mf_expansion_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def _get_note_col_mapping(self) -> Dict[str, str]:
c.internalRatingStatusKey: c.expansionRatingStatusKey,
c.noteInterceptMinKey: c.expansionNoteInterceptMinKey,
c.noteInterceptMaxKey: c.expansionNoteInterceptMaxKey,
c.internalActiveRulesKey: c.expansionInternalActiveRulesKey,
}

def get_scored_notes_cols(self) -> List[str]:
Expand All @@ -54,6 +55,7 @@ def get_scored_notes_cols(self) -> List[str]:
c.expansionRatingStatusKey,
c.expansionNoteInterceptMinKey,
c.expansionNoteInterceptMaxKey,
c.expansionInternalActiveRulesKey,
]

def get_helpfulness_scores_cols(self) -> List[str]:
Expand All @@ -68,7 +70,6 @@ def _get_dropped_note_cols(self) -> List[str]:
"""Returns a list of columns which should be excluded from scoredNotes and auxiliaryNoteInfo."""
return super()._get_dropped_note_cols() + (
[
c.internalActiveRulesKey,
c.activeFilterTagsKey,
c.ratingWeightKey,
]
Expand Down
5 changes: 4 additions & 1 deletion sourcecode/scoring/mf_group_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def coalesce_group_models(
c.groupNoteInterceptMaxKey,
c.groupNoteInterceptMinKey,
c.modelingGroupKey,
c.groupInternalActiveRulesKey,
]:
scoredNotes = coalesce_columns(scoredNotes, col)

Expand Down Expand Up @@ -135,6 +136,7 @@ def __init__(
self._groupRatingStatusKey = f"{c.groupRatingStatusKey}_{self._groupNumber}"
self._groupNoteInterceptMaxKey = f"{c.groupNoteInterceptMaxKey}_{self._groupNumber}"
self._groupNoteInterceptMinKey = f"{c.groupNoteInterceptMinKey}_{self._groupNumber}"
self._groupInternalActiveRulesKey = f"{c.groupInternalActiveRulesKey}_{self._groupNumber}"
self._groupRaterInterceptKey = f"{c.groupRaterInterceptKey}_{self._groupNumber}"
self._groupRaterFactor1Key = f"{c.groupRaterFactor1Key}_{self._groupNumber}"
self._modelingGroupKey = f"{c.modelingGroupKey}_{self._groupNumber}"
Expand All @@ -151,6 +153,7 @@ def _get_note_col_mapping(self) -> Dict[str, str]:
c.internalRatingStatusKey: self._groupRatingStatusKey,
c.noteInterceptMinKey: self._groupNoteInterceptMinKey,
c.noteInterceptMaxKey: self._groupNoteInterceptMaxKey,
c.internalActiveRulesKey: self._groupInternalActiveRulesKey,
}

def _get_user_col_mapping(self) -> Dict[str, str]:
Expand All @@ -169,6 +172,7 @@ def get_scored_notes_cols(self) -> List[str]:
self._groupRatingStatusKey,
self._groupNoteInterceptMaxKey,
self._groupNoteInterceptMinKey,
self._groupInternalActiveRulesKey,
self._modelingGroupKey,
]

Expand All @@ -189,7 +193,6 @@ def _get_dropped_note_cols(self) -> List[str]:
"""Returns a list of columns which should be excluded from scoredNotes and auxiliaryNoteInfo."""
return super()._get_dropped_note_cols() + (
[
c.internalActiveRulesKey,
c.activeFilterTagsKey,
c.ratingWeightKey,
]
Expand Down
5 changes: 4 additions & 1 deletion sourcecode/scoring/mf_topic_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def coalesce_topic_models(scoredNotes: pd.DataFrame) -> pd.DataFrame:
c.topicRatingStatusKey,
c.topicNoteConfidentKey,
c.noteTopicKey,
c.topicInternalActiveRulesKey,
]:
scoredNotes = coalesce_columns(scoredNotes, col)

Expand Down Expand Up @@ -106,6 +107,7 @@ def __init__(
self._topicNoteInterceptKey = f"{c.topicNoteInterceptKey}_{self._topicName}"
self._topicNoteFactor1Key = f"{c.topicNoteFactor1Key}_{self._topicName}"
self._topicRatingStatusKey = f"{c.topicRatingStatusKey}_{self._topicName}"
self._topicInternalActiveRulesKey = f"{c.topicInternalActiveRulesKey}_{self._topicName}"
self._noteTopicKey = f"{c.noteTopicKey}_{self._topicName}"
self._noteTopicConfidentKey = f"{c.topicNoteConfidentKey}_{self._topicName}"

Expand All @@ -118,6 +120,7 @@ def _get_note_col_mapping(self) -> Dict[str, str]:
c.internalNoteInterceptKey: self._topicNoteInterceptKey,
c.internalNoteFactor1Key: self._topicNoteFactor1Key,
c.internalRatingStatusKey: self._topicRatingStatusKey,
c.internalActiveRulesKey: self._topicInternalActiveRulesKey,
}

def get_scored_notes_cols(self) -> List[str]:
Expand All @@ -129,6 +132,7 @@ def get_scored_notes_cols(self) -> List[str]:
self._topicRatingStatusKey,
self._noteTopicKey,
self._noteTopicConfidentKey,
self._topicInternalActiveRulesKey,
]

def get_helpfulness_scores_cols(self) -> List[str]:
Expand All @@ -143,7 +147,6 @@ def _get_dropped_note_cols(self) -> List[str]:
"""Returns a list of columns which should be excluded from scoredNotes and auxiliaryNoteInfo."""
return super()._get_dropped_note_cols() + (
[
c.internalActiveRulesKey,
c.activeFilterTagsKey,
c.ratingWeightKey,
c.noteInterceptMinKey,
Expand Down
20 changes: 17 additions & 3 deletions sourcecode/scoring/process_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,15 +421,29 @@ def write_tsv_local(df: pd.DataFrame, path: str) -> None:
Args:
df: pd.DataFrame to write to disk.
path: location of file on disk.
Returns:
None, because path is always None.
"""

assert path is not None
assert df.to_csv(path, index=False, header=True, sep="\t") is None


def write_parquet_local(
df: pd.DataFrame, path: str, compression: str = "snappy", engine: str = "pyarrow"
) -> None:
"""Write DF as a parquet file stored to local disk. Compress with snappy
and use pyarrow engine.
Args:
df: pd.DataFrame to write to disk.
path: location of file on disk.
compression: compression algorithm to use. Defaults to 'snappy'.
engine: engine to use. Defaults to 'pyarrow'.
"""

assert path is not None
df.to_parquet(path, compression=compression, engine=engine)


class CommunityNotesDataLoader(ABC):
"""Base class which local and prod data loaders extend.
Expand Down
16 changes: 12 additions & 4 deletions sourcecode/scoring/run_scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ def _run_scorer_parallelizable(
scoringArgs = _load_data_from_shared_memory_parallelizable(
scoringArgsSharedMemory, scoringArgs
)
print(f"{scorer.get_name()} run_scorer_parallelizable just finished loading data from shared memory.")
print(
f"{scorer.get_name()} run_scorer_parallelizable just finished loading data from shared memory."
)
elif dataLoader is not None:
print(
f"{scorer.get_name()} run_scorer_parallelizable just started in parallel: loading data with dataLoader."
Expand Down Expand Up @@ -522,19 +524,25 @@ def meta_score(
# MFExpansionPlusScorer will have the lowest priority.
rules.append(
scoring_rules.ApplyModelResult(
RuleID.EXPANSION_PLUS_MODEL, {RuleID.META_INITIAL_NMR}, c.expansionPlusRatingStatusKey
RuleID.EXPANSION_PLUS_MODEL,
{RuleID.META_INITIAL_NMR},
c.expansionPlusRatingStatusKey,
)
)
if enabledScorers is None or Scorers.MFExpansionScorer in enabledScorers:
rules.append(
scoring_rules.ApplyModelResult(
RuleID.EXPANSION_MODEL, {RuleID.META_INITIAL_NMR}, c.expansionRatingStatusKey
RuleID.EXPANSION_MODEL,
{RuleID.META_INITIAL_NMR},
c.expansionRatingStatusKey,
)
)
if enabledScorers is None or Scorers.MFCoreScorer in enabledScorers:
rules.append(
scoring_rules.ApplyModelResult(
RuleID.CORE_MODEL, {RuleID.META_INITIAL_NMR}, c.coreRatingStatusKey
RuleID.CORE_MODEL,
{RuleID.META_INITIAL_NMR},
c.coreRatingStatusKey,
)
)
if enabledScorers is None or Scorers.MFGroupScorer in enabledScorers:
Expand Down
20 changes: 19 additions & 1 deletion sourcecode/scoring/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@

from . import constants as c
from .enums import scorers_from_csv
from .process_data import LocalDataLoader, write_prescoring_output, write_tsv_local
from .process_data import (
LocalDataLoader,
write_parquet_local,
write_prescoring_output,
write_tsv_local,
)
from .run_scoring import run_scoring


Expand Down Expand Up @@ -84,6 +89,13 @@ def parse_args():
dest="prescoring_delay_hours",
help="Filter prescoring input to simulate delay in hours",
)
parser.add_argument(
"--no-parquet",
help="Disable writing parquet files.",
default=False,
action="store_true",
dest="no_parquet",
)

return parser.parse_args()

Expand Down Expand Up @@ -138,6 +150,12 @@ def prescoring_write_fn(notePath, raterPath):
write_tsv_local(newStatus, os.path.join(args.outdir, "note_status_history.tsv"))
write_tsv_local(auxNoteInfo, os.path.join(args.outdir, "aux_note_info.tsv"))

if not args.no_parquet:
write_parquet_local(scoredNotes, os.path.join(args.outdir, "scored_notes.parquet"))
write_parquet_local(helpfulnessScores, os.path.join(args.outdir, "helpfulness_scores.parquet"))
write_parquet_local(newStatus, os.path.join(args.outdir, "note_status_history.parquet"))
write_parquet_local(auxNoteInfo, os.path.join(args.outdir, "aux_note_info.parquet"))


if __name__ == "__main__":
main()
Loading

0 comments on commit adbc126

Please sign in to comment.