diff --git a/serving/src/main/java/feast/serving/specs/CachedSpecService.java b/serving/src/main/java/feast/serving/specs/CachedSpecService.java index 2f68711bf2..a117754e84 100644 --- a/serving/src/main/java/feast/serving/specs/CachedSpecService.java +++ b/serving/src/main/java/feast/serving/specs/CachedSpecService.java @@ -47,7 +47,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; -/** In-memory cache of specs. */ +/** In-memory cache of specs hosted in Feast Core. */ public class CachedSpecService { private static final int MAX_SPEC_COUNT = 1000; @@ -76,7 +76,7 @@ public class CachedSpecService { public CachedSpecService(CoreSpecService coreService, StoreProto.Store store) { this.coreService = coreService; - this.store = store; + this.store = coreService.registerStore(store); Map featureSets = getFeatureSetMap(); featureToFeatureSetMapping = diff --git a/serving/src/main/java/feast/serving/specs/CoreSpecService.java b/serving/src/main/java/feast/serving/specs/CoreSpecService.java index 2f5cef342e..259aa3f3f0 100644 --- a/serving/src/main/java/feast/serving/specs/CoreSpecService.java +++ b/serving/src/main/java/feast/serving/specs/CoreSpecService.java @@ -23,11 +23,12 @@ import feast.core.CoreServiceProto.ListFeatureSetsResponse; import feast.core.CoreServiceProto.UpdateStoreRequest; import feast.core.CoreServiceProto.UpdateStoreResponse; +import feast.core.StoreProto.Store; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import org.slf4j.Logger; -/** Client for spec retrieval from core. */ +/** Client for interfacing with specs in Feast Core. */ public class CoreSpecService { private static final Logger log = org.slf4j.LoggerFactory.getLogger(CoreSpecService.class); @@ -50,4 +51,24 @@ public ListFeatureSetsResponse listFeatureSets(ListFeatureSetsRequest ListFeatur public UpdateStoreResponse updateStore(UpdateStoreRequest updateStoreRequest) { return blockingStub.updateStore(updateStoreRequest); } + + /** + * Register the given store entry in Feast Core. If store already exists in Feast Core, updates + * the store entry in feast core. + * + * @param store entry to register/update in Feast Core. + * @return The register/updated store entry + */ + public Store registerStore(Store store) { + UpdateStoreRequest request = UpdateStoreRequest.newBuilder().setStore(store).build(); + try { + UpdateStoreResponse updateStoreResponse = this.updateStore(request); + if (!updateStoreResponse.getStore().equals(store)) { + throw new RuntimeException("Core store config not matching current store config"); + } + return updateStoreResponse.getStore(); + } catch (Exception e) { + throw new RuntimeException("Unable to update store configuration", e); + } + } } diff --git a/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java b/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java index f4f795ed32..144b967c9f 100644 --- a/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java +++ b/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java @@ -19,14 +19,14 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; import com.google.common.collect.Lists; import feast.core.CoreServiceProto.ListFeatureSetsRequest; import feast.core.CoreServiceProto.ListFeatureSetsResponse; -import feast.core.CoreServiceProto.UpdateStoreRequest; -import feast.core.CoreServiceProto.UpdateStoreResponse; import feast.core.FeatureSetProto; import feast.core.FeatureSetProto.FeatureSetSpec; import feast.core.FeatureSetProto.FeatureSpec; @@ -82,8 +82,7 @@ public void setUp() { .build()) .build(); - when(coreService.updateStore(UpdateStoreRequest.newBuilder().setStore(store).build())) - .thenReturn(UpdateStoreResponse.newBuilder().setStore(store).build()); + when(coreService.registerStore(store)).thenReturn(store); featureSetSpecs = new LinkedHashMap<>(); featureSetSpecs.put( @@ -143,6 +142,11 @@ public void setUp() { cachedSpecService = new CachedSpecService(coreService, store); } + @Test + public void shouldRegisterStoreWithCore() { + verify(coreService, times(1)).registerStore(cachedSpecService.getStore()); + } + @Test public void shouldPopulateAndReturnStore() { cachedSpecService.populateCache();