Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incorrect BigQuery schema creation from FeatureSetSpec #340

Merged
merged 3 commits into from
Nov 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,19 @@
* need to manage any schemas. This class will not be used in that case.
*/
public class StoreUtil {

private static final Map<ValueType.Enum, StandardSQLTypeName> VALUE_TYPE_TO_STANDARD_SQL_TYPE =
new HashMap<>();
private static final Logger log = org.slf4j.LoggerFactory.getLogger(StoreUtil.class);

// Column description for reserved fields
public static final String BIGQUERY_EVENT_TIMESTAMP_FIELD_DESCRIPTION =
"Event time for the FeatureRow";
public static final String BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION =
"Processing time of the FeatureRow ingestion in Feast\"";
public static final String BIGQUERY_JOB_ID_FIELD_DESCRIPTION =
"Feast import job ID for the FeatureRow";

// Refer to protos/feast/core/Store.proto for the mapping definition.
static {
VALUE_TYPE_TO_STANDARD_SQL_TYPE.put(Enum.BYTES, StandardSQLTypeName.BYTES);
Expand Down Expand Up @@ -110,15 +119,15 @@ public static void setupStore(Store store, FeatureSetSpec featureSetSpec) {
}

@SuppressWarnings("DuplicatedCode")
private static TableDefinition createBigQueryTableDefinition(FeatureSetSpec featureSetSpec) {
public static TableDefinition createBigQueryTableDefinition(FeatureSetSpec featureSetSpec) {
List<Field> fields = new ArrayList<>();
log.info("Table will have the following fields:");

for (EntitySpec entitySpec : featureSetSpec.getEntitiesList()) {
Builder builder =
Field.newBuilder(
entitySpec.getName(), VALUE_TYPE_TO_STANDARD_SQL_TYPE.get(entitySpec.getValueType()));
if (entitySpec.getValueTypeValue() >= 7 && entitySpec.getValueTypeValue() <= 17) {
if (entitySpec.getValueType().name().toLowerCase().endsWith("_list")) {
builder.setMode(Mode.REPEATED);
}
Field field = builder.build();
Expand All @@ -130,7 +139,7 @@ private static TableDefinition createBigQueryTableDefinition(FeatureSetSpec feat
Field.newBuilder(
featureSpec.getName(),
VALUE_TYPE_TO_STANDARD_SQL_TYPE.get(featureSpec.getValueType()));
if (featureSpec.getValueTypeValue() >= 7 && featureSpec.getValueTypeValue() <= 17) {
if (featureSpec.getValueType().name().toLowerCase().endsWith("_list")) {
builder.setMode(Mode.REPEATED);
}
Field field = builder.build();
Expand All @@ -143,13 +152,12 @@ private static TableDefinition createBigQueryTableDefinition(FeatureSetSpec feat
reservedFieldNameToPairOfStandardSQLTypeAndDescription =
ImmutableMap.of(
"event_timestamp",
Pair.of(StandardSQLTypeName.TIMESTAMP, "Event time for the FeatureRow"),
Pair.of(StandardSQLTypeName.TIMESTAMP, BIGQUERY_EVENT_TIMESTAMP_FIELD_DESCRIPTION),
"created_timestamp",
Pair.of(
StandardSQLTypeName.TIMESTAMP,
"Processing time of the FeatureRow ingestion in Feast"),
StandardSQLTypeName.TIMESTAMP, BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION),
"job_id",
Pair.of(StandardSQLTypeName.STRING, "Feast import job ID for the FeatureRow"));
Pair.of(StandardSQLTypeName.STRING, BIGQUERY_JOB_ID_FIELD_DESCRIPTION));
for (Map.Entry<String, Pair<StandardSQLTypeName, String>> entry :
reservedFieldNameToPairOfStandardSQLTypeAndDescription.entrySet()) {
Field field =
Expand Down
178 changes: 177 additions & 1 deletion ingestion/src/test/java/feast/ingestion/util/StoreUtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,39 @@
*/
package feast.ingestion.util;

import static feast.types.ValueProto.ValueType.Enum.BOOL;
import static feast.types.ValueProto.ValueType.Enum.BOOL_LIST;
import static feast.types.ValueProto.ValueType.Enum.BYTES;
import static feast.types.ValueProto.ValueType.Enum.BYTES_LIST;
import static feast.types.ValueProto.ValueType.Enum.DOUBLE;
import static feast.types.ValueProto.ValueType.Enum.DOUBLE_LIST;
import static feast.types.ValueProto.ValueType.Enum.FLOAT;
import static feast.types.ValueProto.ValueType.Enum.FLOAT_LIST;
import static feast.types.ValueProto.ValueType.Enum.INT32;
import static feast.types.ValueProto.ValueType.Enum.INT32_LIST;
import static feast.types.ValueProto.ValueType.Enum.INT64;
import static feast.types.ValueProto.ValueType.Enum.INT64_LIST;
import static feast.types.ValueProto.ValueType.Enum.STRING;
import static feast.types.ValueProto.ValueType.Enum.STRING_LIST;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Field.Mode;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import feast.core.FeatureSetProto.EntitySpec;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.FeatureSetProto.FeatureSpec;
import feast.ingestion.utils.StoreUtil;
import java.util.Arrays;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class StoreUtilTest {

@Test
public void setupBigQuery_shouldCreateTable_givenFeatureSetSpec() {
public void setupBigQuery_shouldCreateTable_givenValidFeatureSetSpec() {
FeatureSetSpec featureSetSpec =
FeatureSetSpec.newBuilder()
.setName("feature_set_1")
Expand All @@ -41,4 +60,161 @@ public void setupBigQuery_shouldCreateTable_givenFeatureSetSpec() {
BigQuery mockedBigquery = Mockito.mock(BigQuery.class);
StoreUtil.setupBigQuery(featureSetSpec, "project-1", "dataset_1", mockedBigquery);
}

@Test
public void createBigQueryTableDefinition_shouldCreateCorrectSchema_givenValidFeatureSetSpec() {
FeatureSetSpec input =
FeatureSetSpec.newBuilder()
.addAllEntities(
Arrays.asList(
EntitySpec.newBuilder().setName("bytes_entity").setValueType(BYTES).build(),
EntitySpec.newBuilder().setName("string_entity").setValueType(STRING).build(),
EntitySpec.newBuilder().setName("int32_entity").setValueType(INT32).build(),
EntitySpec.newBuilder().setName("int64_entity").setValueType(INT64).build(),
EntitySpec.newBuilder().setName("double_entity").setValueType(DOUBLE).build(),
EntitySpec.newBuilder().setName("float_entity").setValueType(FLOAT).build(),
EntitySpec.newBuilder().setName("bool_entity").setValueType(BOOL).build(),
EntitySpec.newBuilder()
.setName("bytes_list_entity")
.setValueType(BYTES_LIST)
.build(),
EntitySpec.newBuilder()
.setName("string_list_entity")
.setValueType(STRING_LIST)
.build(),
EntitySpec.newBuilder()
.setName("int32_list_entity")
.setValueType(INT32_LIST)
.build(),
EntitySpec.newBuilder()
.setName("int64_list_entity")
.setValueType(INT64_LIST)
.build(),
EntitySpec.newBuilder()
.setName("double_list_entity")
.setValueType(DOUBLE_LIST)
.build(),
EntitySpec.newBuilder()
.setName("float_list_entity")
.setValueType(FLOAT_LIST)
.build(),
EntitySpec.newBuilder()
.setName("bool_list_entity")
.setValueType(BOOL_LIST)
.build()))
.addAllFeatures(
Arrays.asList(
FeatureSpec.newBuilder().setName("bytes_feature").setValueType(BYTES).build(),
FeatureSpec.newBuilder().setName("string_feature").setValueType(STRING).build(),
FeatureSpec.newBuilder().setName("int32_feature").setValueType(INT32).build(),
FeatureSpec.newBuilder().setName("int64_feature").setValueType(INT64).build(),
FeatureSpec.newBuilder().setName("double_feature").setValueType(DOUBLE).build(),
FeatureSpec.newBuilder().setName("float_feature").setValueType(FLOAT).build(),
FeatureSpec.newBuilder().setName("bool_feature").setValueType(BOOL).build(),
FeatureSpec.newBuilder()
.setName("bytes_list_feature")
.setValueType(BYTES_LIST)
.build(),
FeatureSpec.newBuilder()
.setName("string_list_feature")
.setValueType(STRING_LIST)
.build(),
FeatureSpec.newBuilder()
.setName("int32_list_feature")
.setValueType(INT32_LIST)
.build(),
FeatureSpec.newBuilder()
.setName("int64_list_feature")
.setValueType(INT64_LIST)
.build(),
FeatureSpec.newBuilder()
.setName("double_list_feature")
.setValueType(DOUBLE_LIST)
.build(),
FeatureSpec.newBuilder()
.setName("float_list_feature")
.setValueType(FLOAT_LIST)
.build(),
FeatureSpec.newBuilder()
.setName("bool_list_feature")
.setValueType(BOOL_LIST)
.build()))
.build();

Schema actual = StoreUtil.createBigQueryTableDefinition(input).getSchema();

Schema expected =
Schema.of(
Arrays.asList(
// Fields from entity
Field.newBuilder("bytes_entity", StandardSQLTypeName.BYTES).build(),
Field.newBuilder("string_entity", StandardSQLTypeName.STRING).build(),
Field.newBuilder("int32_entity", StandardSQLTypeName.INT64).build(),
Field.newBuilder("int64_entity", StandardSQLTypeName.INT64).build(),
Field.newBuilder("double_entity", StandardSQLTypeName.FLOAT64).build(),
Field.newBuilder("float_entity", StandardSQLTypeName.FLOAT64).build(),
Field.newBuilder("bool_entity", StandardSQLTypeName.BOOL).build(),
Field.newBuilder("bytes_list_entity", StandardSQLTypeName.BYTES)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("string_list_entity", StandardSQLTypeName.STRING)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("int32_list_entity", StandardSQLTypeName.INT64)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("int64_list_entity", StandardSQLTypeName.INT64)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("double_list_entity", StandardSQLTypeName.FLOAT64)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("float_list_entity", StandardSQLTypeName.FLOAT64)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("bool_list_entity", StandardSQLTypeName.BOOL)
.setMode(Mode.REPEATED)
.build(),
// Fields from feature
Field.newBuilder("bytes_feature", StandardSQLTypeName.BYTES).build(),
Field.newBuilder("string_feature", StandardSQLTypeName.STRING).build(),
Field.newBuilder("int32_feature", StandardSQLTypeName.INT64).build(),
Field.newBuilder("int64_feature", StandardSQLTypeName.INT64).build(),
Field.newBuilder("double_feature", StandardSQLTypeName.FLOAT64).build(),
Field.newBuilder("float_feature", StandardSQLTypeName.FLOAT64).build(),
Field.newBuilder("bool_feature", StandardSQLTypeName.BOOL).build(),
Field.newBuilder("bytes_list_feature", StandardSQLTypeName.BYTES)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("string_list_feature", StandardSQLTypeName.STRING)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("int32_list_feature", StandardSQLTypeName.INT64)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("int64_list_feature", StandardSQLTypeName.INT64)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("double_list_feature", StandardSQLTypeName.FLOAT64)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("float_list_feature", StandardSQLTypeName.FLOAT64)
.setMode(Mode.REPEATED)
.build(),
Field.newBuilder("bool_list_feature", StandardSQLTypeName.BOOL)
.setMode(Mode.REPEATED)
.build(),
// Reserved fields
Field.newBuilder("event_timestamp", StandardSQLTypeName.TIMESTAMP)
.setDescription(StoreUtil.BIGQUERY_EVENT_TIMESTAMP_FIELD_DESCRIPTION)
.build(),
Field.newBuilder("created_timestamp", StandardSQLTypeName.TIMESTAMP)
.setDescription(StoreUtil.BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION)
.build(),
Field.newBuilder("job_id", StandardSQLTypeName.STRING)
.setDescription(StoreUtil.BIGQUERY_JOB_ID_FIELD_DESCRIPTION)
.build()));

Assert.assertEquals(expected, actual);
}
}