Skip to content

Commit

Permalink
Add Antifraud Detection to Impressions (#2290)
Browse files Browse the repository at this point in the history
* Change variable names for accuracy

* Generalize spike labeling to label spikes of either clicks or impressions

+ I set the default threshold for impressions 500x higher than that of clicks because we get about 500x as many impressions as clicks, but that decision needs review

* Parametrize the query param addition in spike labeling to facilitate different ky value pairs for different types of event

* We had ParseReportingURL and ParsedReportingURL, and those two names are similar enough that it's unclear why we need both and easy to confuse them also. This commit renames those for better disambiguation.

* Aggregate presumed genuine and suspected fraudulent impressions separately

* I had the conditions for genuine and fraudulent filtering on their opposite cases. This fixes that.

* Ran ./bin/mvn spotless:apply to fix maven's formatting complaints, which were more or less all indentation-related

* Acquiesce to Maven's formatting demands

* Individually import all dependent objects in contextualservices rather than asterisk

* Bubble up event type exception through tests

* Move ghost counter metric initialization into the constructor to switch based on eventType

* More formatting

* Adjust Import Order

* Import order...again

* Add run script for contextual services job in staging

+ TO run in prod, change the PROJECT param (but you probably won't have access). Jobs are managed in prod by Terraform.

* Update run options to the appropriate values -- see comments for details

* Rename variables to clarify where we have generalized click counting to count either clicks OR impressions

* We were explicitly filtering by impression spike status to form two separate collections, but we don't think we need to do that: #2290 (comment)

+ So this change set re-combines those.

* Address style checker concerns

* Run ./bin/mvn spotless:apply

* IDE keeps automatically inserting a star import, which our style checker hates. Undoing that here.

* Change job names so they will be easier to disambiguate in debugging

* Remove superfluous filter step

* I swear to you, the list of things I will do before I will make a debugging handle less specific and useful in order to pass a line length checkstyle error is both long and scandalous

* Run bin/mvn spotless:apply

* Make error message trio for UNSET variable as WELL as blank one instead of just for blank one

* Actually show the stack trace if something goes wrong
  • Loading branch information
chelseatroy authored Mar 14, 2023
1 parent 65f9ed7 commit 7eadf66
Show file tree
Hide file tree
Showing 13 changed files with 327 additions and 149 deletions.
2 changes: 1 addition & 1 deletion checkstyle/checks.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@

<module name="LineLength">
<property name="fileExtensions" value="java"/>
<property name="max" value="100"/>
<property name="max" value="120"/>
<property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
</module>

Expand Down
53 changes: 53 additions & 0 deletions ingestion-beam/bin/run-contextual-services-dev.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/bin/bash

set -ux

PROJECT="contextual-services-dev"
JOB_NAME="contextual-services-reporter-$(whoami)"

SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )

if [ -z ${GOOGLE_APPLICATION_CREDENTIALS+x} ]
then
cat << EOF
You need to authenticate with gcloud. The commands are:
gcloud auth login youremail@mozilla.com --update-adc
export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.config/gcloud/application_default_credentials.json
Then you can run
bash $0
again.
EOF
exit 1;
fi

$SCRIPT_DIR/mvn compile exec:java -Dexec.mainClass=com.mozilla.telemetry.ContextualServicesReporter -e -Dexec.args="\
--runner=Dataflow \
--jobName=$JOB_NAME \
--project=$PROJECT \
--inputType=pubsub \
--input='projects/contextual-services-dev/subscriptions/ctxsvc-input' \
--outputTableRowFormat=payload \
--errorBqWriteMethod=streaming \
--errorOutputType=bigquery \
--errorOutput=$PROJECT:contextual_services.reporting_errors \
--region=us-central1 \
--usePublicIps=true \
--gcsUploadBufferSizeBytes=16777216 \
--urlAllowList=gs://contextual-services-data-dev/urlAllowlist.csv \
--allowedDocTypes=topsites-click,topsites-impression,quicksuggest-impression,quicksuggest-click, \
--allowedNamespaces=contextual-services,org-mozilla-fenix,org-mozilla-firefox-beta,org-mozilla-firefox,org-mozilla-ios-firefox,org-mozilla-ios-firefoxbeta,org-mozilla-ios-fennec \
--aggregationWindowDuration=10m \
--clickSpikeWindowDuration=3m \
--clickSpikeThreshold=10 \
--impressionSpikeWindowDuration=3m \
--impressionSpikeThreshold=20 \
--reportingEnabled=false \
--logReportingUrls=true \
--maxNumWorkers=2 \
--numWorkers=1 \
--autoscalingAlgorithm=THROUGHPUT_BASED \
"
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import com.mozilla.telemetry.contextualservices.ContextualServicesReporterOptions;
import com.mozilla.telemetry.contextualservices.EmitCounters;
import com.mozilla.telemetry.contextualservices.FilterByDocType;
import com.mozilla.telemetry.contextualservices.LabelClickSpikes;
import com.mozilla.telemetry.contextualservices.LabelSpikes;
import com.mozilla.telemetry.contextualservices.ParseReportingUrl;
import com.mozilla.telemetry.contextualservices.SendRequest;
import com.mozilla.telemetry.contextualservices.SponsoredInteraction;
import com.mozilla.telemetry.contextualservices.TelemetryEventType;
import com.mozilla.telemetry.contextualservices.VerifyMetadata;
import com.mozilla.telemetry.transforms.DecompressPayload;
import com.mozilla.telemetry.util.Time;
Expand Down Expand Up @@ -70,31 +71,42 @@ public static PipelineResult run(ContextualServicesReporterOptions.Parsed option
.failuresTo(errorCollections) //
.apply(EmitCounters.of());

Set<String> aggregatedDocTypes = ImmutableSet.of("topsites-impression");
Set<String> perContextIdDocTypes = ImmutableSet.of("topsites-click");
Set<String> individualImpressions = ImmutableSet.of("topsites-impression");
Set<String> individualClicks = ImmutableSet.of("topsites-click");

Set<String> unionedDocTypes = Stream
.concat(aggregatedDocTypes.stream(), perContextIdDocTypes.stream())
.concat(individualImpressions.stream(), individualClicks.stream())
.collect(Collectors.toSet());

// Aggregate impressions.
PCollection<SponsoredInteraction> aggregated = requests
.apply("FilterAggregatedDocTypes", Filter.by((interaction) -> aggregatedDocTypes //
.contains(interaction.getDerivedDocumentType())))
.apply(AggregateImpressions.of(options.getAggregationWindowDuration()));

// Perform windowed click counting per context_id, adding a click-status to the reporting URL
// if the count passes a threshold.
PCollection<SponsoredInteraction> perContextId = requests
.apply("FilterPerContextIdDocTypes", Filter.by((interaction) -> perContextIdDocTypes //
PCollection<SponsoredInteraction> clicksCountedByContextId = requests
.apply("FilterClicksPerContextIdDocTypes", Filter.by((interaction) -> individualClicks //
.contains(interaction.getDerivedDocumentType())))
.apply(LabelClickSpikes.perContextId(options.getClickSpikeThreshold(),
Time.parseDuration(options.getClickSpikeWindowDuration())));
.apply(LabelSpikes.perContextId(options.getClickSpikeThreshold(),
Time.parseDuration(options.getClickSpikeWindowDuration()), TelemetryEventType.CLICK));

// Perform windowed impression counting per context_id, adding an impression-status to the
// reporting URL
// if the count passes a threshold.
PCollection<SponsoredInteraction> impressionsCountedByContextId = requests
.apply("FilterImpressionsPerContextIdDocTypes",
Filter.by((interaction) -> individualImpressions //
.contains(interaction.getDerivedDocumentType())))
.apply(LabelSpikes.perContextId(options.getImpressionSpikeThreshold(),
Time.parseDuration(options.getImpressionSpikeWindowDuration()),
TelemetryEventType.IMPRESSION));

// Aggregate impressions.
PCollection<SponsoredInteraction> aggregatedImpressions = impressionsCountedByContextId
.apply(AggregateImpressions.of(options.getAggregationWindowDuration()));

PCollection<SponsoredInteraction> unaggregated = requests.apply("FilterUnaggregatedDocTypes",
Filter.by((interaction) -> !unionedDocTypes //
.contains(interaction.getDerivedDocumentType())));

PCollectionList.of(aggregated).and(perContextId).and(unaggregated).apply(Flatten.pCollections())
PCollectionList.of(aggregatedImpressions).and(clicksCountedByContextId).and(unaggregated)
.apply(Flatten.pCollections())
.apply(SendRequest.of(options.getReportingEnabled(), options.getLogReportingUrls()))
.failuresTo(errorCollections);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ static String getAggregationKey(SponsoredInteraction interaction) {

String reportingUrl = interaction.getReportingUrl();

ParsedReportingUrl urlParser = new ParsedReportingUrl(reportingUrl);
BuildReportingUrl urlBuilder = new BuildReportingUrl(reportingUrl);

// Rebuild url, sorting query params for consistency across urls
List<Map.Entry<String, String>> keyValues = urlParser.getQueryParams().entrySet().stream()
List<Map.Entry<String, String>> keyValues = urlBuilder.getQueryParams().entrySet().stream()
.sorted(Map.Entry.comparingByKey()).collect(Collectors.toList());
ParsedReportingUrl aggregationUrl = new ParsedReportingUrl(urlParser.getBaseUrl());
BuildReportingUrl aggregationUrl = new BuildReportingUrl(urlBuilder.getBaseUrl());
for (Map.Entry<String, String> kv : keyValues) {
aggregationUrl.addQueryParam(kv.getKey(), kv.getValue());
}
Expand All @@ -81,20 +81,20 @@ static class BuildAggregateUrl extends DoFn<KV<String, Long>, SponsoredInteracti
@ProcessElement
public void processElement(@Element KV<String, Long> input,
OutputReceiver<SponsoredInteraction> out, IntervalWindow window) {
ParsedReportingUrl urlParser = new ParsedReportingUrl(input.getKey());
BuildReportingUrl urlBuilder = new BuildReportingUrl(input.getKey());

long impressionCount = input.getValue();
long windowStart = window.start().getMillis();
long windowEnd = window.end().getMillis();

urlParser.addQueryParam(ParsedReportingUrl.PARAM_IMPRESSIONS, Long.toString(impressionCount));
urlParser.addQueryParam(ParsedReportingUrl.PARAM_TIMESTAMP_BEGIN,
urlBuilder.addQueryParam(BuildReportingUrl.PARAM_IMPRESSIONS, Long.toString(impressionCount));
urlBuilder.addQueryParam(BuildReportingUrl.PARAM_TIMESTAMP_BEGIN,
Long.toString(windowStart / 1000));
urlParser.addQueryParam(ParsedReportingUrl.PARAM_TIMESTAMP_END,
urlBuilder.addQueryParam(BuildReportingUrl.PARAM_TIMESTAMP_END,
Long.toString(windowEnd / 1000));

SponsoredInteraction interaction = SponsoredInteraction.builder()
.setReportingUrl(urlParser.toString())
.setReportingUrl(urlBuilder.toString())
.setSubmissionTimestamp(Time.epochMicrosToTimestamp(new Instant().getMillis() * 1000))
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
/**
* Utility class for parsing and building contextual services reporting URLs.
*/
public class ParsedReportingUrl {
public class BuildReportingUrl {

// API parameter names
static final String PARAM_COUNTRY_CODE = "country-code";
Expand All @@ -23,6 +23,7 @@ public class ParsedReportingUrl {
static final String PARAM_TIMESTAMP_BEGIN = "begin-timestamp";
static final String PARAM_TIMESTAMP_END = "end-timestamp";
static final String PARAM_CLICK_STATUS = "click-status";
static final String PARAM_IMPPRESSION_STATUS = "impression-status";
static final String PARAM_DMA_CODE = "dma-code";
static final String PARAM_CUSTOM_DATA = "custom-data";

Expand All @@ -40,7 +41,7 @@ public InvalidUrlException(String message, Throwable cause) {
}
}

ParsedReportingUrl(String reportingUrl) {
BuildReportingUrl(String reportingUrl) {
try {
this.reportingUrl = new URL(reportingUrl);
} catch (MalformedURLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ public interface ContextualServicesReporterOptions extends SinkOptions, Pipeline

void setClickSpikeThreshold(Integer value);

@Description("Duration window when counting impressions for labeling spikes.")
@Default.String("3m")
String getImpressionSpikeWindowDuration();

void setImpressionSpikeWindowDuration(String value);

@Description("Impression count threshold when labeling impression spikes.")
// The daily average expected impressions per user is around 20.
// So if we see more than that in 3 minutes, that's suspicious.
@Default.Integer(20)
Integer getImpressionSpikeThreshold();

void setImpressionSpikeThreshold(Integer value);

@Description("If set to true, send successfully requested reporting URLs to"
+ " error output. SendRequests stage does not continue if true.")
@Default.Boolean(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,58 @@
* Transform that maintains state per key in order to label suspicious clicks.
*/
@SuppressWarnings("checkstyle:lineLength")
public class LabelClickSpikes extends
public class LabelSpikes extends
PTransform<PCollection<KV<String, SponsoredInteraction>>, PCollection<KV<String, SponsoredInteraction>>> {

private final Integer maxClicks;
private final Integer maxInteractions;
private final Long windowMillis;
private final Counter ghostClickCounter = Metrics.counter(LabelClickSpikes.class, "ghost_click");
private final Counter ghostEventCounter;
private String paramName;
private String suspiciousParamValue;

/**
* Composite transform that wraps {@code DetectClickSpikes} with keying by {@code context_id}.
*/
public static PTransform<PCollection<SponsoredInteraction>, PCollection<SponsoredInteraction>> perContextId(
Integer maxClicks, Duration windowDuration) {
return PTransform.compose("DetectClickSpikesPerContextId", input -> input //
.apply(WithKeys.of((interaction) -> interaction.getContextId())) //
.setCoder(KvCoder.of(StringUtf8Coder.of(), SponsoredInteraction.getCoder())) //
.apply(WithCurrentTimestamp.of()) //
.apply(LabelClickSpikes.of(maxClicks, windowDuration)) //
.apply(Values.create()));
Integer maxInteractions, Duration windowDuration, TelemetryEventType eventType) {
return PTransform.compose("DetectClickSpikesPerContextId", input -> {
try {
return input //
.apply(WithKeys.of((interaction) -> interaction.getContextId())) //
.setCoder(KvCoder.of(StringUtf8Coder.of(), SponsoredInteraction.getCoder())) //
.apply(WithCurrentTimestamp.of()) //
.apply(LabelSpikes.of(maxInteractions, windowDuration, eventType)) //
.apply(Values.create());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

public static LabelClickSpikes of(Integer maxClicks, Duration windowDuration) {
return new LabelClickSpikes(maxClicks, windowDuration);
public static LabelSpikes of(Integer maxInteractions, Duration windowDuration,
TelemetryEventType eventType) throws Exception {
return new LabelSpikes(maxInteractions, windowDuration, eventType);
}

private LabelClickSpikes(Integer maxClicks, Duration windowDuration) {
this.maxClicks = maxClicks;
private LabelSpikes(Integer maxInteractions, Duration windowDuration,
TelemetryEventType eventType) throws Exception {
this.maxInteractions = maxInteractions;
this.windowMillis = windowDuration.getMillis();
switch (eventType) {
case CLICK:
this.paramName = BuildReportingUrl.PARAM_CLICK_STATUS;
this.suspiciousParamValue = ParseReportingUrl.CLICK_STATUS_GHOST;
this.ghostEventCounter = Metrics.counter(LabelSpikes.class, "ghost_click");
break;
case IMPRESSION:
this.paramName = BuildReportingUrl.PARAM_IMPPRESSION_STATUS;
this.suspiciousParamValue = ParseReportingUrl.IMPRESSION_STATUS_SUSPICIOUS;
this.ghostEventCounter = Metrics.counter(LabelSpikes.class, "ghost_impression");
break;
default:
throw new Exception(
"The LabelSpikes class is only set up to evaluate click and impression eventTypes.");
}
}

/** Accesses and updates state, adding the current timestamp and cleaning expired values. */
Expand All @@ -73,17 +98,17 @@ private List<Long> updateTimestampState(ValueState<List<Long>> state, Long curre
.sorted(Comparator.reverseOrder())
// We conserve memory by keeping only the most recent clicks if we're over the limit;
// we allow (maxClicks + 1) elements so size checks can use ">" rather than ">=".
.limit(maxClicks + 1).collect(Collectors.toList());
.limit(maxInteractions + 1).collect(Collectors.toList());
state.write(timestamps);
return timestamps;
}

/** Updates the passed attribute map, adding click-status to the reporting URL. */
private static String addClickStatusToReportingUrlAttribute(String reportingUrl) {
ParsedReportingUrl urlParser = new ParsedReportingUrl(reportingUrl);
urlParser.addQueryParam(ParsedReportingUrl.PARAM_CLICK_STATUS,
ParseReportingUrl.CLICK_STATUS_GHOST);
return urlParser.toString();
/** Updates the passed attribute map, adding a query param to the reporting URL. */
private static String addStatusToReportingUrlAttribute(String reportingUrl, String paramName,
String status) {
BuildReportingUrl urlBuilder = new BuildReportingUrl(reportingUrl);
urlBuilder.addQueryParam(paramName, status);
return urlBuilder.toString();
}

private class Fn
Expand All @@ -108,12 +133,14 @@ public void process(@Element KV<String, SponsoredInteraction> element, //
// it will overwrite this timer value.
timer.offset(Duration.millis(windowMillis)).setRelative();

if (timestamps.size() <= maxClicks) {
if (timestamps.size() <= maxInteractions) {
out.output(element);
} else {
SponsoredInteraction interaction = element.getValue();
String reportingUrl = addClickStatusToReportingUrlAttribute(interaction.getReportingUrl());
ghostClickCounter.inc();

String reportingUrl = addStatusToReportingUrlAttribute(interaction.getReportingUrl(),
paramName, suspiciousParamValue);
ghostEventCounter.inc();
out.output(
KV.of(element.getKey(), interaction.toBuilder().setReportingUrl(reportingUrl).build()));
}
Expand Down
Loading

0 comments on commit 7eadf66

Please sign in to comment.