Skip to content

Commit

Permalink
Rename BQ stats package, add docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
zhilingc committed Apr 15, 2020
1 parent 8538198 commit e20b954
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 35 deletions.
3 changes: 3 additions & 0 deletions core/src/main/java/feast/core/model/Entity.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import lombok.Getter;
import lombok.Setter;

/**
* Feast entity object. Contains name, type as well as domain metadata about the entity.
*/
@Getter
@Setter
@javax.persistence.Entity
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/feast/core/model/Feature.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import lombok.Getter;
import lombok.Setter;

/**
* Feature belonging to a featureset. Contains name, type as well as domain metadata about the feature.
*/
@Getter
@Setter
@Entity
Expand Down
84 changes: 73 additions & 11 deletions core/src/main/java/feast/core/service/StatsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import feast.core.model.FieldId;
import feast.storage.api.statistics.FeatureSetStatistics;
import feast.storage.api.statistics.StatisticsRetriever;
import feast.storage.connectors.bigquery.stats.BigQueryStatisticsRetriever;
import feast.storage.connectors.bigquery.statistics.BigQueryStatisticsRetriever;
import java.io.IOException;
import java.time.Instant;
import java.util.*;
Expand All @@ -49,6 +49,7 @@
import org.tensorflow.metadata.v0.*;
import org.tensorflow.metadata.v0.FeatureNameStatistics.Builder;

/** Facilitates the retrieval of feature set statistics from historical stores. */
@Slf4j
@Service
public class StatsService {
Expand All @@ -67,6 +68,26 @@ public StatsService(
this.featureStatisticsRepository = featureStatisticsRepository;
}

/**
* Get {@link DatasetFeatureStatistics} for the requested feature set in the provided datasets or
* date range for the store provided. The {@link DatasetFeatureStatistics} will contain a list of
* {@link FeatureNameStatistics} for each feature requested. Results retrieved will be cached
* indefinitely. To force Feast to recompute the statistics, set forceRefresh to true.
*
* <p>Only one of datasetIds or startDate/endDate should be provided. If both are provided, the
* former will be used over the latter.
*
* <p>If multiple datasetIds or if the date ranges over a few days, statistics will be retrieved
* for each single unit (dataset id or day) and results aggregated across that set. As a result of
* this, in such a scenario, statistics that cannot be aggregated will be dropped. This includes
* all histograms and quantiles, unique values, and top value counts.
*
* @param request {@link GetFeatureStatisticsRequest} containing feature set name, subset of
* features, dataset ids or date range, and store to retrieve the data from.
* @return {@link GetFeatureStatisticsResponse} containing {@link DatasetFeatureStatistics} with
* the feature statistics requested.
* @throws IOException
*/
@Transactional
public GetFeatureStatisticsResponse getFeatureStatistics(GetFeatureStatisticsRequest request)
throws IOException {
Expand All @@ -86,7 +107,11 @@ public GetFeatureStatisticsResponse getFeatureStatistics(GetFeatureStatisticsReq
while (timestamp < request.getEndDate().getSeconds()) {
List<FeatureNameStatistics> featureNameStatistics =
getFeatureNameStatisticsByDate(
statisticsRetriever, featureSetSpec, features, timestamp);
statisticsRetriever,
featureSetSpec,
features,
timestamp,
request.getForceRefresh());
featureNameStatisticsList.add(featureNameStatistics);
timestamp += 86400; // advance by a day
}
Expand All @@ -95,24 +120,32 @@ public GetFeatureStatisticsResponse getFeatureStatistics(GetFeatureStatisticsReq
for (String datasetId : request.getDatasetIdsList()) {
List<FeatureNameStatistics> featureNameStatistics =
getFeatureNameStatisticsByDataset(
statisticsRetriever, featureSetSpec, features, datasetId);
statisticsRetriever,
featureSetSpec,
features,
datasetId,
request.getForceRefresh());
featureNameStatisticsList.add(featureNameStatistics);
}
}
List<FeatureNameStatistics> featureNameStatistics = mergeStatistics(featureNameStatisticsList);
long totalCount = getTotalCount(featureNameStatistics.get(0));
return GetFeatureStatisticsResponse.newBuilder()
.setDatasetFeatureStatisticsList(
DatasetFeatureStatisticsList.newBuilder()
.addDatasets(
DatasetFeatureStatistics.newBuilder().addAllFeatures(featureNameStatistics)))
DatasetFeatureStatistics.newBuilder()
.setNumExamples(totalCount)
.addAllFeatures(featureNameStatistics)))
.build();
}

private List<FeatureNameStatistics> getFeatureNameStatisticsByDataset(
StatisticsRetriever statisticsRetriever,
FeatureSetSpec featureSetSpec,
List<String> features,
String datasetId)
String datasetId,
boolean forceRefresh)
throws IOException {
List<FeatureNameStatistics> featureNameStatistics = new ArrayList<>();
List<String> featuresMissingStats = new ArrayList<>();
Expand All @@ -124,9 +157,12 @@ private List<FeatureNameStatistics> getFeatureNameStatisticsByDataset(
featureSetSpec.getName(),
featureSetSpec.getVersion(),
featureName));
Optional<FeatureStatistics> cachedFeatureStatistics =
featureStatisticsRepository.findFeatureStatisticsByFeatureAndDatasetId(
feature, datasetId);
Optional<FeatureStatistics> cachedFeatureStatistics = Optional.empty();
if (!forceRefresh) {
cachedFeatureStatistics =
featureStatisticsRepository.findFeatureStatisticsByFeatureAndDatasetId(
feature, datasetId);
}
if (cachedFeatureStatistics.isPresent()) {
featureNameStatistics.add(cachedFeatureStatistics.get().toProto());
} else {
Expand Down Expand Up @@ -154,7 +190,8 @@ private List<FeatureNameStatistics> getFeatureNameStatisticsByDate(
StatisticsRetriever statisticsRetriever,
FeatureSetSpec featureSetSpec,
List<String> features,
long timestamp)
long timestamp,
boolean forceRefresh)
throws IOException {
Date date = Date.from(Instant.ofEpochSecond(timestamp));
List<FeatureNameStatistics> featureNameStatistics = new ArrayList<>();
Expand All @@ -167,8 +204,11 @@ private List<FeatureNameStatistics> getFeatureNameStatisticsByDate(
featureSetSpec.getName(),
featureSetSpec.getVersion(),
featureName));
Optional<FeatureStatistics> cachedFeatureStatistics =
featureStatisticsRepository.findFeatureStatisticsByFeatureAndDate(feature, date);
Optional<FeatureStatistics> cachedFeatureStatistics = Optional.empty();
if (!forceRefresh) {
cachedFeatureStatistics =
featureStatisticsRepository.findFeatureStatisticsByFeatureAndDate(feature, date);
}
if (cachedFeatureStatistics.isPresent()) {
featureNameStatistics.add(cachedFeatureStatistics.get().toProto());
} else {
Expand Down Expand Up @@ -433,4 +473,26 @@ private FeatureNameStatistics mergeByteStatistics(

return mergedFeatureNameStatistics.setBytesStats(mergedBytesStatistics).build();
}

private long getTotalCount(FeatureNameStatistics featureNameStatistics) {
CommonStatistics commonStats;
switch (featureNameStatistics.getType()) {
case STRUCT:
commonStats = featureNameStatistics.getStructStats().getCommonStats();
break;
case STRING:
commonStats = featureNameStatistics.getStringStats().getCommonStats();
break;
case BYTES:
commonStats = featureNameStatistics.getBytesStats().getCommonStats();
break;
case FLOAT:
case INT:
commonStats = featureNameStatistics.getNumStats().getCommonStats();
break;
default:
throw new RuntimeException("Unable to extract dataset size; Invalid type provided");
}
return commonStats.getNumNonMissing() + commonStats.getNumMissing();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.storage.connectors.bigquery.stats;
package feast.storage.connectors.bigquery.statistics;

import static feast.storage.connectors.bigquery.stats.StatsUtil.toFeatureNameStatistics;
import static feast.storage.connectors.bigquery.statistics.StatsUtil.toFeatureNameStatistics;

import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.BigQuery;
Expand Down Expand Up @@ -99,37 +99,32 @@ private FeatureSetStatistics getFeatureSetStatistics(
featureSetSpec = featureSetSpecBuilder.build();

try {
// Generate SQL for and retrieve non-histogram statistics
String getFeatureSetStatsQuery =
StatsQueryTemplater.createGetFeatureSetStatsQuery(
featureSetStatisticsQueryInfo, projectId(), datasetId());
String getFeatureSetHistQuery =
StatsQueryTemplater.createGetFeatureSetHistQuery(
featureSetStatisticsQueryInfo, projectId(), datasetId());
QueryJobConfiguration queryJobConfiguration =
QueryJobConfiguration.newBuilder(getFeatureSetStatsQuery).build();
TableResult basicStats = bigquery().query(queryJobConfiguration);

// Generate SQL for and retrieve histogram statistics
String getFeatureSetHistQuery =
StatsQueryTemplater.createGetFeatureSetHistQuery(
featureSetStatisticsQueryInfo, projectId(), datasetId());
queryJobConfiguration = QueryJobConfiguration.newBuilder(getFeatureSetHistQuery).build();
TableResult hist = bigquery().query(queryJobConfiguration);

Map<String, FieldValueList> basicStatsValues =
Streams.stream(basicStats.getValues())
.collect(
Collectors.toMap(
fieldValueList -> fieldValueList.get(0).getStringValue(),
fieldValueList -> fieldValueList));
Map<String, FieldValueList> histValues =
Streams.stream(hist.getValues())
.collect(
Collectors.toMap(
fieldValueList -> fieldValueList.get(0).getStringValue(),
fieldValueList -> fieldValueList));
// Convert to map of feature_name:row containing the statistics
Map<String, FieldValueList> basicStatsValues = getTableResultByFeatureName(basicStats);
Map<String, FieldValueList> histValues = getTableResultByFeatureName(hist);

int totalCountIndex = basicStats.getSchema().getFields().getIndex("total_count");
FeatureSetStatistics.Builder featureSetStatisticsBuilder =
FeatureSetStatistics.newBuilder()
.setNumExamples(
basicStatsValues.get(features.get(0)).get(totalCountIndex).getLongValue());

// Convert BQ rows to FeatureNameStatistics
for (FeatureSpec featureSpec : featureSetSpec.getFeaturesList()) {
FeatureNameStatistics featureNameStatistics =
toFeatureNameStatistics(
Expand All @@ -149,4 +144,12 @@ private FeatureSetStatistics getFeatureSetStatistics(
e);
}
}

private Map<String, FieldValueList> getTableResultByFeatureName(TableResult basicStats) {
return Streams.stream(basicStats.getValues())
.collect(
Collectors.toMap(
fieldValueList -> fieldValueList.get(0).getStringValue(),
fieldValueList -> fieldValueList));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.storage.connectors.bigquery.stats;
package feast.storage.connectors.bigquery.statistics;

import com.google.protobuf.Timestamp;
import feast.core.FeatureSetProto.FeatureSpec;
Expand All @@ -25,6 +25,10 @@
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

/**
* Value class for Feature Sets containing information necessary to template stats-retrieving
* queries.
*/
public class FeatureSetStatisticsQueryInfo {
private final String project;
private final String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.storage.connectors.bigquery.stats;
package feast.storage.connectors.bigquery.statistics;

import feast.core.FeatureSetProto.FeatureSpec;
import feast.types.ValueProto.ValueType.Enum;

/**
* Value class for Features containing information necessary to template stats-retrieving queries.
*/
public class FeatureStatisticsQueryInfo {
private final String name;
private final String type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.storage.connectors.bigquery.stats;
package feast.storage.connectors.bigquery.statistics;

import com.mitchellbosecke.pebble.PebbleEngine;
import com.mitchellbosecke.pebble.template.PebbleTemplate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.storage.connectors.bigquery.stats;
package feast.storage.connectors.bigquery.statistics;

import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.FieldValue;
Expand Down Expand Up @@ -54,6 +54,22 @@ public class StatsUtil {
TFDV_TYPE_MAP.put(Enum.DOUBLE_LIST, Type.STRUCT);
}

/**
* Convert BQ-retrieved statistics to the corresponding TFDV {@link FeatureNameStatistics}
* specific to the feature type.
*
* @param featureSpec {@link FeatureSpec} of the feature
* @param basicStatsSchema BigQuery {@link Schema} of the retrieved statistics row for the
* non-histogram statistics. Used to retrieve the column names corresponding to each value in
* the row.
* @param basicStatsValues BigQuery {@link FieldValueList} containing a single row of
* non-histogram statistics retrieved from BigQuery
* @param histSchema BigQuery {@link Schema} of the retrieved statistics row for the histogram
* statistics. Used to retrieve the column names corresponding to each value in the row.
* @param histValues BigQuery {@link FieldValueList} containing a single row of histogram
* statistics retrieved from BigQuery
* @return {@link FeatureNameStatistics}
*/
public static FeatureNameStatistics toFeatureNameStatistics(
FeatureSpec featureSpec,
Schema basicStatsSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.storage.connectors.bigquery.stats;
package feast.storage.connectors.bigquery.statistics;

import com.google.cloud.bigquery.BigQueryOptions;
import com.google.protobuf.InvalidProtocolBufferException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.storage.connectors.bigquery.stats;
package feast.storage.connectors.bigquery.statistics;

import static feast.storage.connectors.bigquery.stats.StatsUtil.toFeatureNameStatistics;
import static feast.storage.connectors.bigquery.statistics.StatsUtil.toFeatureNameStatistics;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

Expand Down

0 comments on commit e20b954

Please sign in to comment.