Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compute and write metrics for rows prior to store writes #763

Merged
merged 2 commits into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/user-guide/statistics.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,5 @@ The metrics are tagged with and can be aggregated by the following keys:
| feast\_featureSet\_name | feature set name |
| feast\_feature\_name | feature name |
| ingestion\_job\_name | id of the population job writing the feature values. |
| metrics\_namespace | either `Inflight` or `WriteToStoreSuccess` |

14 changes: 10 additions & 4 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import feast.ingestion.transform.ProcessAndValidateFeatureRows;
import feast.ingestion.transform.ReadFromSource;
import feast.ingestion.transform.metrics.WriteFailureMetricsTransform;
import feast.ingestion.transform.metrics.WriteInflightMetricsTransform;
import feast.ingestion.transform.metrics.WriteSuccessMetricsTransform;
import feast.ingestion.transform.specs.ReadFeatureSetSpecs;
import feast.ingestion.transform.specs.WriteFeatureSetSpecAck;
Expand Down Expand Up @@ -147,13 +148,18 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
for (Store store : stores) {
FeatureSink featureSink = getFeatureSink(store, featureSetSpecs);

// Step 5. Write FeatureRow to the corresponding Store.
// Step 5. Write metrics of successfully validated rows
validatedRows
.get(FEATURE_ROW_OUT)
.apply("WriteInflightMetrics", WriteInflightMetricsTransform.create(store.getName()));

// Step 6. Write FeatureRow to the corresponding Store.
WriteResult writeFeatureRows =
storeAllocatedRows
.get(storeTags.get(store))
.apply("WriteFeatureRowToStore", featureSink.writer());

// Step 6. Write FailedElements to a dead letter table in BigQuery.
// Step 7. Write FailedElements to a dead letter table in BigQuery.
if (options.getDeadLetterTableSpec() != null) {
// TODO: make deadletter destination type configurable
DeadletterSink deadletterSink =
Expand All @@ -172,7 +178,7 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
.apply("WriteFailedElements_WriteFeatureRowToStore", deadletterSink.write());
}

// Step 7. Write metrics to a metrics sink.
// Step 8. Write metrics to a metrics sink.
writeFeatureRows
.getSuccessfulInserts()
.apply("WriteSuccessMetrics", WriteSuccessMetricsTransform.create(store.getName()));
Expand All @@ -182,7 +188,7 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
.apply("WriteFailureMetrics", WriteFailureMetricsTransform.create(store.getName()));
}

// Step 8. Send ack that FeatureSetSpec state is updated
// Step 9. Send ack that FeatureSetSpec state is updated
featureSetSpecs.apply(
"WriteAck",
WriteFeatureSetSpecAck.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@
*/
package feast.ingestion.transform.metrics;

import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_NAME_TAG_KEY;
import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_PROJECT_TAG_KEY;
import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_TAG_KEY;
import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.INGESTION_JOB_NAME_KEY;
import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.METRIC_PREFIX;
import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.STORE_TAG_KEY;
import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.*;

import com.google.auto.value.AutoValue;
import com.timgroup.statsd.NonBlockingStatsDClient;
Expand Down Expand Up @@ -67,6 +62,8 @@ public abstract class WriteFeatureValueMetricsDoFn

abstract int getStatsdPort();

abstract String getMetricsNamespace();

static Builder newBuilder() {
return new AutoValue_WriteFeatureValueMetricsDoFn.Builder();
}
Expand All @@ -80,6 +77,8 @@ abstract static class Builder {

abstract Builder setStatsdPort(int statsdPort);

abstract Builder setMetricsNamespace(String metricsNamespace);

abstract WriteFeatureValueMetricsDoFn build();
}

Expand Down Expand Up @@ -158,7 +157,8 @@ public void processElement(
FEATURE_SET_PROJECT_TAG_KEY + ":" + projectName,
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
FEATURE_TAG_KEY + ":" + featureName,
INGESTION_JOB_NAME_KEY + ":" + context.getPipelineOptions().getJobName()
INGESTION_JOB_NAME_KEY + ":" + context.getPipelineOptions().getJobName(),
METRICS_NAMESPACE_KEY + ":" + getMetricsNamespace(),
};

// stats can return non finite values when there is no element
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2019 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.ingestion.transform.metrics;

import com.google.auto.value.AutoValue;
import feast.ingestion.options.ImportOptions;
import feast.proto.types.FeatureRowProto.FeatureRow;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;

@AutoValue
public abstract class WriteInflightMetricsTransform
extends PTransform<PCollection<FeatureRow>, PDone> {

public static final String METRIC_NAMESPACE = "Inflight";
public static final String ELEMENTS_WRITTEN_METRIC = "elements_count";
private static final Counter elements_count =
Metrics.counter(METRIC_NAMESPACE, ELEMENTS_WRITTEN_METRIC);

public abstract String getStoreName();

public static WriteInflightMetricsTransform create(String storeName) {
return new AutoValue_WriteInflightMetricsTransform(storeName);
}

@Override
public PDone expand(PCollection<FeatureRow> input) {
ImportOptions options = input.getPipeline().getOptions().as(ImportOptions.class);

input.apply(
"IncrementInflightElementsCounter",
MapElements.into(TypeDescriptors.booleans())
.via(
(FeatureRow row) -> {
elements_count.inc();
return true;
}));

switch (options.getMetricsExporterType()) {
case "statsd":

// Fixed window is applied so the metric collector will not be overwhelmed with the metrics
// data. For validation, only summaries of the values are usually required vs the actual
// values.
PCollection<KV<String, Iterable<FeatureRow>>> rowsGroupedByRef =
input
.apply(
"FixedWindow",
Window.into(
FixedWindows.of(
Duration.standardSeconds(
options.getWindowSizeInSecForFeatureValueMetric()))))
.apply(
"ConvertToKV_FeatureSetRefToFeatureRow",
ParDo.of(
new DoFn<FeatureRow, KV<String, FeatureRow>>() {
@ProcessElement
public void processElement(
ProcessContext c, @Element FeatureRow featureRow) {
c.output(KV.of(featureRow.getFeatureSet(), featureRow));
}
}))
.apply("GroupByFeatureSetRef", GroupByKey.create());

rowsGroupedByRef.apply(
"WriteInflightRowMetrics",
ParDo.of(
WriteRowMetricsDoFn.newBuilder()
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.setMetricsNamespace(METRIC_NAMESPACE)
.build()));

rowsGroupedByRef.apply(
"WriteInflightFeatureValueMetrics",
ParDo.of(
WriteFeatureValueMetricsDoFn.newBuilder()
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.setMetricsNamespace(METRIC_NAMESPACE)
.build()));

return PDone.in(input.getPipeline());
case "none":
default:
input.apply(
"Noop",
ParDo.of(
new DoFn<FeatureRow, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {}
}));
return PDone.in(input.getPipeline());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public abstract class WriteRowMetricsDoFn extends DoFn<KV<String, Iterable<Featu
public static final String STORE_TAG_KEY = "feast_store";
public static final String FEATURE_SET_PROJECT_TAG_KEY = "feast_project_name";
public static final String FEATURE_SET_NAME_TAG_KEY = "feast_featureSet_name";
public static final String FEATURE_SET_VERSION_TAG_KEY = "feast_featureSet_version";
public static final String FEATURE_TAG_KEY = "feast_feature_name";
public static final String INGESTION_JOB_NAME_KEY = "ingestion_job_name";
public static final String METRICS_NAMESPACE_KEY = "metrics_namespace";

public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_MIN = "feature_row_lag_ms_min";
public static final String GAUGE_NAME_FEATURE_ROW_LAG_MS_MAX = "feature_row_lag_ms_max";
Expand Down Expand Up @@ -77,6 +77,8 @@ public abstract class WriteRowMetricsDoFn extends DoFn<KV<String, Iterable<Featu

public abstract int getStatsdPort();

public abstract String getMetricsNamespace();

@Nullable
public abstract Clock getClock();

Expand Down Expand Up @@ -104,6 +106,8 @@ public abstract static class Builder {

public abstract Builder setStatsdPort(int statsdPort);

public abstract Builder setMetricsNamespace(String metricNamespace);

/**
* setClock will override the default system clock used to calculate feature row lag.
*
Expand Down Expand Up @@ -193,6 +197,7 @@ public void processElement(
FEATURE_SET_PROJECT_TAG_KEY + ":" + featureSetProject,
FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
INGESTION_JOB_NAME_KEY + ":" + c.getPipelineOptions().getJobName(),
METRICS_NAMESPACE_KEY + ":" + getMetricsNamespace(),
};

statsd.count(COUNT_NAME_FEATURE_ROW_INGESTED, featureRowLagStats.getN(), tags);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public void processElement(
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.setMetricsNamespace(METRIC_NAMESPACE)
.build()));

validRowsGroupedByRef.apply(
Expand All @@ -101,6 +102,7 @@ public void processElement(
.setStatsdHost(options.getStatsdHost())
.setStatsdPort(options.getStatsdPort())
.setStoreName(getStoreName())
.setMetricsNamespace(METRIC_NAMESPACE)
.build()));

return PDone.in(input.getPipeline());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public void shouldSendCorrectStatsDMetrics() throws IOException, InterruptedExce
.setStatsdHost("localhost")
.setStatsdPort(STATSD_SERVER_PORT)
.setStoreName("store")
.setMetricsNamespace("test")
.build()));
pipeline.run(pipelineOptions).waitUntilFinish();
// Wait until StatsD has finished processed all messages, 3 sec is a reasonable duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public void shouldSendCorrectStatsDMetrics() throws IOException, InterruptedExce
.setStatsdPort(STATSD_SERVER_PORT)
.setStoreName("store")
.setClock(Clock.fixed(Instant.ofEpochSecond(1585548645), ZoneId.of("UTC")))
.setMetricsNamespace("test")
.build()));
pipeline.run(pipelineOptions).waitUntilFinish();
// Wait until StatsD has finished processed all messages, 3 sec is a reasonable duration
Expand Down
Loading