Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InteractionGraphNegativeJob: accountLevelShadowbanMaxTime = 1 (in days) #1882

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ object InteractionGraphNegativeJob extends ScioBeamJob[InteractionGraphNegativeO
val negativeEdgeReverseOrdering = negativeEdgeOrdering.reverse
implicit val pqMonoid: PriorityQueueMonoid[Edge] =
new PriorityQueueMonoid[Edge](maxDestinationIds)(negativeEdgeOrdering)

val accountLevelShadowbanMaxTime = 1 //in days

override protected def configurePipeline(
sc: ScioContext,
opts: InteractionGraphNegativeOption
Expand All @@ -49,17 +50,22 @@ object InteractionGraphNegativeJob extends ScioBeamJob[InteractionGraphNegativeO
val endTs = opts.interval.getEndMillis

// read input datasets

// we only count blocks in the past X days to prevent permanent "shadow-banning".
val blocks: SCollection[InteractionGraphRawInput] =
GraphUtil.getFlockFeatures(
readSnapshot(FlockBlocksEdgesScalaDataset, sc),
FeatureName.NumBlocks,
endTs)
.filter(_.age < accountLevelShadowbanMaxTime)

// we only count mutes in the past X days to prevent permanent "shadow-banning".
val mutes: SCollection[InteractionGraphRawInput] =
GraphUtil.getFlockFeatures(
readSnapshot(FlockMutesEdgesScalaDataset, sc),
FeatureName.NumMutes,
endTs)
.filter(_.age < accountLevelShadowbanMaxTime)

val abuseReports: SCollection[InteractionGraphRawInput] =
GraphUtil.getFlockFeatures(
Expand All @@ -72,8 +78,9 @@ object InteractionGraphNegativeJob extends ScioBeamJob[InteractionGraphNegativeO
readSnapshot(FlockReportAsSpamEdgesScalaDataset, sc),
FeatureName.NumReportAsSpams,
endTs)
.filter(_.age < accountLevelShadowbanMaxTime)

// we only keep unfollows in the past 90 days due to the huge size of this dataset,
// we only keep unfollows in the past X days due to the huge size of this dataset,
// and to prevent permanent "shadow-banning" in the event of accidental unfollows.
// we treat unfollows as less critical than above 4 negative signals, since it deals more with
// interest than health typically, which might change over time.
Expand All @@ -83,7 +90,7 @@ object InteractionGraphNegativeJob extends ScioBeamJob[InteractionGraphNegativeO
readSnapshot(SocialgraphUnfollowsScalaDataset, sc),
FeatureName.NumUnfollows,
endTs)
.filter(_.age < 90)
.filter(_.age < accountLevelShadowbanMaxTime)

// group all features by (src, dest)
val allEdgeFeatures: SCollection[Edge] =
Expand Down