Skip to content

Commit

Permalink
Add job identifier to FeatureRow (#147)
Browse files Browse the repository at this point in the history
* Add column for job_id in BQ

* Add job id for feature row in warehouse store
  • Loading branch information
mansiib authored and feast-ci-bot committed Mar 3, 2019
1 parent 81f3ede commit 0a89bd6
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class BigQueryStorageManager implements StorageManager {
private static final String FIELD_ID = "id";
private static final String FIELD_EVENT_TIMESTAMP = "event_timestamp";
private static final String FIELD_CREATED_TIMESTAMP = "created_timestamp";
private static final String FIELD_JOB_ID = "job_id";

private String id;
private final BigQuery bigQuery;
Expand Down Expand Up @@ -103,6 +104,7 @@ public void registerNewFeature(FeatureSpec featureSpec) {
createField(FIELD_ID, Enum.STRING, ""),
createField(FIELD_EVENT_TIMESTAMP, Enum.TIMESTAMP, FIELD_EVENT_TIMESTAMP),
createField(FIELD_CREATED_TIMESTAMP, Enum.TIMESTAMP, FIELD_CREATED_TIMESTAMP),
createField(FIELD_JOB_ID, Enum.STRING, FIELD_JOB_ID),
createFeatureField(featureSpec));
TableDefinition tableDefinition =
StandardTableDefinition.newBuilder()
Expand Down Expand Up @@ -137,7 +139,9 @@ public void registerNewFeature(FeatureSpec featureSpec) {
f ->
!f.equals(FIELD_ID)
&& !f.equals(FIELD_CREATED_TIMESTAMP)
&& !f.equals(FIELD_EVENT_TIMESTAMP))
&& !f.equals(FIELD_EVENT_TIMESTAMP)
&& !f.equals(FIELD_JOB_ID))

.collect(Collectors.toList()));
AuditLogger.log(
Resource.STORAGE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,17 @@ public void shouldCreateNewDatasetAndTableAndViewIfNotExist() throws Interrupted
actualTable.getTableId().getTable(),
equalTo(String.format("%s_%s", entityName, granularity.toString().toLowerCase())));
List<Field> fields = actualTable.getDefinition().getSchema().getFields();
assertThat(fields.size(), equalTo(4));
assertThat(fields.size(), equalTo(5));
Field idField = fields.get(0);
assertThat(idField.getName(), equalTo("id"));
Field etsField = fields.get(1);
assertThat(etsField.getName(), equalTo("event_timestamp"));
Field ctsField = fields.get(2);
assertThat(ctsField.getName(), equalTo("created_timestamp"));
Field field = fields.get(3);
Field field = fields.get(4);
assertThat(field.getDescription(), equalTo(description));
Field jobIdField = fields.get(3);
assertThat(jobIdField.getName(), equalTo("job_id"));
assertThat(field.getType(), equalTo(LegacySQLTypeName.INTEGER));
assertThat(field.getName(), equalTo(featureName));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class FeatureRowToBigQueryTableRowDoFn extends DoFn<FeatureRowExtended, T
private static final String ENTITY_KEY_COLUMN = "id";
private static final String EVENT_TIMESTAMP_COLUMN = "event_timestamp";
private static final String CREATED_TIMESTAMP_COLUMN = "created_timestamp";
private static final String JOB_ID_COLUMN = "job_id";


private Specs specs;

Expand All @@ -59,6 +61,7 @@ public TableRow toTableRow(FeatureRowExtended featureRowExtended) {
ValueBigQueryBuilder.bigQueryObjectOf(
Value.newBuilder()
.setTimestampVal(DateUtil.toTimestamp(DateTime.now(DateTimeZone.UTC)))));
tableRow.set(JOB_ID_COLUMN, specs.getJobName());

for (Feature feature : featureRow.getFeaturesList()) {
Object featureValue = ValueBigQueryBuilder.bigQueryObjectOf(feature.getValue());
Expand Down

0 comments on commit 0a89bd6

Please sign in to comment.