diff --git a/docs/user-guide/statistics.md b/docs/user-guide/statistics.md index 2c3566fcd6..54cbdc48d7 100644 --- a/docs/user-guide/statistics.md +++ b/docs/user-guide/statistics.md @@ -160,4 +160,5 @@ The metrics are tagged with and can be aggregated by the following keys: | feast\_featureSet\_name | feature set name | | feast\_feature\_name | feature name | | ingestion\_job\_name | id of the population job writing the feature values. | +| metrics\_namespace | either `Inflight` or `WriteToStoreSuccess` | diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index 8dee52e08a..b5ee81c562 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -24,6 +24,7 @@ import feast.ingestion.transform.ProcessAndValidateFeatureRows; import feast.ingestion.transform.ReadFromSource; import feast.ingestion.transform.metrics.WriteFailureMetricsTransform; +import feast.ingestion.transform.metrics.WriteInflightMetricsTransform; import feast.ingestion.transform.metrics.WriteSuccessMetricsTransform; import feast.ingestion.transform.specs.ReadFeatureSetSpecs; import feast.ingestion.transform.specs.WriteFeatureSetSpecAck; @@ -147,13 +148,18 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti for (Store store : stores) { FeatureSink featureSink = getFeatureSink(store, featureSetSpecs); - // Step 5. Write FeatureRow to the corresponding Store. + // Step 5. Write metrics of successfully validated rows + validatedRows + .get(FEATURE_ROW_OUT) + .apply("WriteInflightMetrics", WriteInflightMetricsTransform.create(store.getName())); + + // Step 6. Write FeatureRow to the corresponding Store. WriteResult writeFeatureRows = storeAllocatedRows .get(storeTags.get(store)) .apply("WriteFeatureRowToStore", featureSink.writer()); - // Step 6. Write FailedElements to a dead letter table in BigQuery. + // Step 7. Write FailedElements to a dead letter table in BigQuery. if (options.getDeadLetterTableSpec() != null) { // TODO: make deadletter destination type configurable DeadletterSink deadletterSink = @@ -172,7 +178,7 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti .apply("WriteFailedElements_WriteFeatureRowToStore", deadletterSink.write()); } - // Step 7. Write metrics to a metrics sink. + // Step 8. Write metrics to a metrics sink. writeFeatureRows .getSuccessfulInserts() .apply("WriteSuccessMetrics", WriteSuccessMetricsTransform.create(store.getName())); @@ -182,7 +188,7 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti .apply("WriteFailureMetrics", WriteFailureMetricsTransform.create(store.getName())); } - // Step 8. Send ack that FeatureSetSpec state is updated + // Step 9. Send ack that FeatureSetSpec state is updated featureSetSpecs.apply( "WriteAck", WriteFeatureSetSpecAck.newBuilder() diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java index 809687a6b2..45fa4d25cf 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java @@ -16,12 +16,7 @@ */ package feast.ingestion.transform.metrics; -import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_NAME_TAG_KEY; -import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_PROJECT_TAG_KEY; -import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_TAG_KEY; -import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.INGESTION_JOB_NAME_KEY; -import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.METRIC_PREFIX; -import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.STORE_TAG_KEY; +import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.*; import com.google.auto.value.AutoValue; import com.timgroup.statsd.NonBlockingStatsDClient; @@ -67,6 +62,8 @@ public abstract class WriteFeatureValueMetricsDoFn abstract int getStatsdPort(); + abstract String getMetricsNamespace(); + static Builder newBuilder() { return new AutoValue_WriteFeatureValueMetricsDoFn.Builder(); } @@ -80,6 +77,8 @@ abstract static class Builder { abstract Builder setStatsdPort(int statsdPort); + abstract Builder setMetricsNamespace(String metricsNamespace); + abstract WriteFeatureValueMetricsDoFn build(); } @@ -158,7 +157,8 @@ public void processElement( FEATURE_SET_PROJECT_TAG_KEY + ":" + projectName, FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName, FEATURE_TAG_KEY + ":" + featureName, - INGESTION_JOB_NAME_KEY + ":" + context.getPipelineOptions().getJobName() + INGESTION_JOB_NAME_KEY + ":" + context.getPipelineOptions().getJobName(), + METRICS_NAMESPACE_KEY + ":" + getMetricsNamespace(), }; // stats can return non finite values when there is no element diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteInflightMetricsTransform.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteInflightMetricsTransform.java new file mode 100644 index 0000000000..602045e46d --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteInflightMetricsTransform.java @@ -0,0 +1,120 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.transform.metrics; + +import com.google.auto.value.AutoValue; +import feast.ingestion.options.ImportOptions; +import feast.proto.types.FeatureRowProto.FeatureRow; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.transforms.*; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.joda.time.Duration; + +@AutoValue +public abstract class WriteInflightMetricsTransform + extends PTransform, PDone> { + + public static final String METRIC_NAMESPACE = "Inflight"; + public static final String ELEMENTS_WRITTEN_METRIC = "elements_count"; + private static final Counter elements_count = + Metrics.counter(METRIC_NAMESPACE, ELEMENTS_WRITTEN_METRIC); + + public abstract String getStoreName(); + + public static WriteInflightMetricsTransform create(String storeName) { + return new AutoValue_WriteInflightMetricsTransform(storeName); + } + + @Override + public PDone expand(PCollection input) { + ImportOptions options = input.getPipeline().getOptions().as(ImportOptions.class); + + input.apply( + "IncrementInflightElementsCounter", + MapElements.into(TypeDescriptors.booleans()) + .via( + (FeatureRow row) -> { + elements_count.inc(); + return true; + })); + + switch (options.getMetricsExporterType()) { + case "statsd": + + // Fixed window is applied so the metric collector will not be overwhelmed with the metrics + // data. For validation, only summaries of the values are usually required vs the actual + // values. + PCollection>> rowsGroupedByRef = + input + .apply( + "FixedWindow", + Window.into( + FixedWindows.of( + Duration.standardSeconds( + options.getWindowSizeInSecForFeatureValueMetric())))) + .apply( + "ConvertToKV_FeatureSetRefToFeatureRow", + ParDo.of( + new DoFn>() { + @ProcessElement + public void processElement( + ProcessContext c, @Element FeatureRow featureRow) { + c.output(KV.of(featureRow.getFeatureSet(), featureRow)); + } + })) + .apply("GroupByFeatureSetRef", GroupByKey.create()); + + rowsGroupedByRef.apply( + "WriteInflightRowMetrics", + ParDo.of( + WriteRowMetricsDoFn.newBuilder() + .setStatsdHost(options.getStatsdHost()) + .setStatsdPort(options.getStatsdPort()) + .setStoreName(getStoreName()) + .setMetricsNamespace(METRIC_NAMESPACE) + .build())); + + rowsGroupedByRef.apply( + "WriteInflightFeatureValueMetrics", + ParDo.of( + WriteFeatureValueMetricsDoFn.newBuilder() + .setStatsdHost(options.getStatsdHost()) + .setStatsdPort(options.getStatsdPort()) + .setStoreName(getStoreName()) + .setMetricsNamespace(METRIC_NAMESPACE) + .build())); + + return PDone.in(input.getPipeline()); + case "none": + default: + input.apply( + "Noop", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) {} + })); + return PDone.in(input.getPipeline()); + } + } +} diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java index 3c7f04a98e..8650285445 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java @@ -44,9 +44,9 @@ public abstract class WriteRowMetricsDoFn extends DoFn