diff --git a/core/src/main/java/feast/core/config/FeatureStreamConfig.java b/core/src/main/java/feast/core/config/FeatureStreamConfig.java index 1671a08f28..45de359ac7 100644 --- a/core/src/main/java/feast/core/config/FeatureStreamConfig.java +++ b/core/src/main/java/feast/core/config/FeatureStreamConfig.java @@ -52,8 +52,8 @@ public Source getDefaultSource(FeastProperties feastProperties) { String topicName = streamProperties.getOptions().get("topic"); Map map = new HashMap<>(); map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, - DEFAULT_KAFKA_REQUEST_TIMEOUT_MS_CONFIG); + map.put( + AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, DEFAULT_KAFKA_REQUEST_TIMEOUT_MS_CONFIG); AdminClient client = AdminClient.create(map); NewTopic newTopic = diff --git a/core/src/main/java/feast/core/dao/FeatureSetRepository.java b/core/src/main/java/feast/core/dao/FeatureSetRepository.java index 900f112190..3eba210888 100644 --- a/core/src/main/java/feast/core/dao/FeatureSetRepository.java +++ b/core/src/main/java/feast/core/dao/FeatureSetRepository.java @@ -20,20 +20,18 @@ import java.util.List; import org.springframework.data.jpa.repository.JpaRepository; -/** - * JPA repository supplying FeatureSet objects keyed by id. - */ +/** JPA repository supplying FeatureSet objects keyed by id. */ public interface FeatureSetRepository extends JpaRepository { long count(); // Find single feature set by project, name, and version - FeatureSet findFeatureSetByNameAndProject_NameAndVersion(String name, String project, - Integer version); + FeatureSet findFeatureSetByNameAndProject_NameAndVersion( + String name, String project, Integer version); // Find single latest version of a feature set by project and name (LIKE) - FeatureSet findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc(String name, - String project); + FeatureSet findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc( + String name, String project); // find all feature sets and order by name and version List findAllByOrderByNameAscVersionAsc(); @@ -42,10 +40,10 @@ FeatureSet findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc(String List findAllByProject_NameOrderByNameAscVersionAsc(String project_name); // find all versions of feature sets matching the given name pattern with a specific project. - List findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc(String name, - String project_name); + List findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( + String name, String project_name); // find all versions of feature sets matching the given name pattern and project pattern - List findAllByNameLikeAndProject_NameLikeOrderByNameAscVersionAsc(String name, - String project_name); + List findAllByNameLikeAndProject_NameLikeOrderByNameAscVersionAsc( + String name, String project_name); } diff --git a/core/src/main/java/feast/core/dao/ProjectRepository.java b/core/src/main/java/feast/core/dao/ProjectRepository.java index 5549bf71a1..5adb7d44c2 100644 --- a/core/src/main/java/feast/core/dao/ProjectRepository.java +++ b/core/src/main/java/feast/core/dao/ProjectRepository.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package feast.core.dao; import feast.core.model.Project; diff --git a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java index 2137aa314f..b8d0670d0d 100644 --- a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java +++ b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java @@ -49,9 +49,7 @@ import org.lognet.springboot.grpc.GRpcService; import org.springframework.beans.factory.annotation.Autowired; -/** - * Implementation of the feast core GRPC service. - */ +/** Implementation of the feast core GRPC service. */ @Slf4j @GRpcService(interceptors = {MonitoringInterceptor.class}) public class CoreServiceImpl extends CoreServiceImplBase { @@ -81,10 +79,8 @@ public void getFeatureSet( responseObserver.onCompleted(); } catch (RetrievalException | StatusRuntimeException e) { log.error("Exception has occurred in GetFeatureSet method: ", e); - responseObserver.onError(Status.INTERNAL - .withDescription(e.getMessage()) - .withCause(e) - .asRuntimeException()); + responseObserver.onError( + Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException()); } } @@ -97,10 +93,8 @@ public void listFeatureSets( responseObserver.onCompleted(); } catch (RetrievalException | IllegalArgumentException e) { log.error("Exception has occurred in ListFeatureSet method: ", e); - responseObserver.onError(Status.INTERNAL - .withDescription(e.getMessage()) - .withCause(e) - .asRuntimeException()); + responseObserver.onError( + Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException()); } } @@ -113,10 +107,8 @@ public void listStores( responseObserver.onCompleted(); } catch (RetrievalException e) { log.error("Exception has occurred in ListStores method: ", e); - responseObserver.onError(Status.INTERNAL - .withDescription(e.getMessage()) - .withCause(e) - .asRuntimeException()); + responseObserver.onError( + Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException()); } } @@ -132,16 +124,12 @@ public void applyFeatureSet( "Unable to persist this feature set due to a constraint violation. Please ensure that" + " field names are unique within the project namespace: ", e); - responseObserver.onError(Status.ALREADY_EXISTS - .withDescription(e.getMessage()) - .withCause(e) - .asRuntimeException()); + responseObserver.onError( + Status.ALREADY_EXISTS.withDescription(e.getMessage()).withCause(e).asRuntimeException()); } catch (Exception e) { log.error("Exception has occurred in ApplyFeatureSet method: ", e); - responseObserver.onError(Status.INTERNAL - .withDescription(e.getMessage()) - .withCause(e) - .asRuntimeException()); + responseObserver.onError( + Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException()); } } @@ -154,10 +142,8 @@ public void updateStore( responseObserver.onCompleted(); } catch (Exception e) { log.error("Exception has occurred in UpdateStore method: ", e); - responseObserver.onError(Status.INTERNAL - .withDescription(e.getMessage()) - .withCause(e) - .asRuntimeException()); + responseObserver.onError( + Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException()); } } @@ -170,10 +156,8 @@ public void createProject( responseObserver.onCompleted(); } catch (Exception e) { log.error("Exception has occurred in the createProject method: ", e); - responseObserver.onError(Status.INTERNAL - .withDescription(e.getMessage()) - .withCause(e) - .asRuntimeException()); + responseObserver.onError( + Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException()); } } @@ -186,10 +170,8 @@ public void archiveProject( responseObserver.onCompleted(); } catch (Exception e) { log.error("Exception has occurred in the createProject method: ", e); - responseObserver.onError(Status.INTERNAL - .withDescription(e.getMessage()) - .withCause(e) - .asRuntimeException()); + responseObserver.onError( + Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException()); } } @@ -198,17 +180,15 @@ public void listProjects( ListProjectsRequest request, StreamObserver responseObserver) { try { List projects = accessManagementService.listProjects(); - responseObserver.onNext(ListProjectsResponse.newBuilder() - .addAllProjects(projects.stream().map(Project::getName).collect( - Collectors.toList())).build()); + responseObserver.onNext( + ListProjectsResponse.newBuilder() + .addAllProjects(projects.stream().map(Project::getName).collect(Collectors.toList())) + .build()); responseObserver.onCompleted(); } catch (Exception e) { log.error("Exception has occurred in the listProjects method: ", e); - responseObserver.onError(Status.INTERNAL - .withDescription(e.getMessage()) - .withCause(e) - .asRuntimeException()); + responseObserver.onError( + Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException()); } } - } diff --git a/core/src/main/java/feast/core/job/JobUpdateTask.java b/core/src/main/java/feast/core/job/JobUpdateTask.java index c42d75b76b..57b2dfee4f 100644 --- a/core/src/main/java/feast/core/job/JobUpdateTask.java +++ b/core/src/main/java/feast/core/job/JobUpdateTask.java @@ -110,8 +110,7 @@ public Job call() { } } else { String jobId = createJobId(source.getId(), store.getName()); - submittedJob = - executorService.submit(() -> startJob(jobId, featureSets, sourceSpec, store)); + submittedJob = executorService.submit(() -> startJob(jobId, featureSets, sourceSpec, store)); } Job job = null; @@ -136,7 +135,10 @@ private Job startJob( .map( fsp -> FeatureSet.fromProto( - FeatureSetProto.FeatureSet.newBuilder().setSpec(fsp.getSpec()).setMeta(fsp.getMeta()).build())) + FeatureSetProto.FeatureSet.newBuilder() + .setSpec(fsp.getSpec()) + .setMeta(fsp.getMeta()) + .build())) .collect(Collectors.toList()); Job job = new Job( @@ -184,13 +186,17 @@ private Job startJob( } /** Update the given job */ - private Job updateJob(Job job, List featureSets, StoreProto.Store store) { + private Job updateJob( + Job job, List featureSets, StoreProto.Store store) { job.setFeatureSets( featureSets.stream() .map( fs -> FeatureSet.fromProto( - FeatureSetProto.FeatureSet.newBuilder().setSpec(fs.getSpec()).setMeta(fs.getMeta()).build())) + FeatureSetProto.FeatureSet.newBuilder() + .setSpec(fs.getSpec()) + .setMeta(fs.getMeta()) + .build())) .collect(Collectors.toList())); job.setStore(feast.core.model.Store.fromProto(store)); AuditLogger.log( diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index 703fecece8..2de46ae1f2 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -79,12 +79,14 @@ public Runner getRunnerType() { @Override public Job startJob(Job job) { List featureSetProtos = - job.getFeatureSets().stream() - .map(FeatureSet::toProto) - .collect(Collectors.toList()); + job.getFeatureSets().stream().map(FeatureSet::toProto).collect(Collectors.toList()); try { return submitDataflowJob( - job.getId(), featureSetProtos, job.getSource().toProto(), job.getStore().toProto(), false); + job.getId(), + featureSetProtos, + job.getSource().toProto(), + job.getStore().toProto(), + false); } catch (InvalidProtocolBufferException e) { throw new RuntimeException(String.format("Unable to start job %s", job.getId()), e); } @@ -100,9 +102,7 @@ public Job startJob(Job job) { public Job updateJob(Job job) { try { List featureSetProtos = - job.getFeatureSets().stream() - .map(FeatureSet::toProto) - .collect(Collectors.toList()); + job.getFeatureSets().stream().map(FeatureSet::toProto).collect(Collectors.toList()); return submitDataflowJob( job.getId(), featureSetProtos, job.getSource().toProto(), job.getStore().toProto(), true); @@ -156,12 +156,7 @@ public JobStatus getJobStatus(Job job) { try { com.google.api.services.dataflow.model.Job dataflowJob = - dataflow - .projects() - .locations() - .jobs() - .get(projectId, location, job.getExtId()) - .execute(); + dataflow.projects().locations().jobs().get(projectId, location, job.getExtId()).execute(); return DataflowJobStateMapper.map(dataflowJob.getCurrentState()); } catch (Exception e) { log.error( @@ -208,7 +203,10 @@ private Job submitDataflowJob( } private ImportOptions getPipelineOptions( - String jobName, List featureSets, StoreProto.Store sink, boolean update) + String jobName, + List featureSets, + StoreProto.Store sink, + boolean update) throws IOException { String[] args = TypeConversion.convertMapToArgs(defaultOptions); ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class); diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java index ff13e22455..fdf3aad9bc 100644 --- a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java +++ b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java @@ -76,10 +76,9 @@ public Runner getRunnerType() { public Job startJob(Job job) { try { List featureSetProtos = - job.getFeatureSets().stream() - .map(FeatureSet::toProto) - .collect(Collectors.toList()); - ImportOptions pipelineOptions = getPipelineOptions(featureSetProtos, job.getStore().toProto()); + job.getFeatureSets().stream().map(FeatureSet::toProto).collect(Collectors.toList()); + ImportOptions pipelineOptions = + getPipelineOptions(featureSetProtos, job.getStore().toProto()); PipelineResult pipelineResult = runPipeline(pipelineOptions); DirectJob directJob = new DirectJob(job.getId(), pipelineResult); jobs.add(directJob); diff --git a/core/src/main/java/feast/core/model/FeatureSet.java b/core/src/main/java/feast/core/model/FeatureSet.java index e42ccf2ade..e468705020 100644 --- a/core/src/main/java/feast/core/model/FeatureSet.java +++ b/core/src/main/java/feast/core/model/FeatureSet.java @@ -76,13 +76,9 @@ public class FeatureSet extends AbstractTimestampEntity implements Comparable entities; @@ -91,9 +87,7 @@ public class FeatureSet extends AbstractTimestampEntity implements Comparable features; diff --git a/core/src/main/java/feast/core/model/Field.java b/core/src/main/java/feast/core/model/Field.java index 9307e1273e..7573fcbf5e 100644 --- a/core/src/main/java/feast/core/model/Field.java +++ b/core/src/main/java/feast/core/model/Field.java @@ -44,8 +44,7 @@ public class Field { @Column(name = "project") private String project; - public Field() { - } + public Field() {} public Field(String name, ValueType.Enum type) { this.name = name; diff --git a/core/src/main/java/feast/core/model/Job.java b/core/src/main/java/feast/core/model/Job.java index ab0db6f3f9..bbd661309d 100644 --- a/core/src/main/java/feast/core/model/Job.java +++ b/core/src/main/java/feast/core/model/Job.java @@ -63,8 +63,7 @@ public class Job extends AbstractTimestampEntity { private Store store; // FeatureSets populated by the job - @ManyToMany - private List featureSets; + @ManyToMany private List featureSets; // Job Metrics @OneToMany(mappedBy = "job", cascade = CascadeType.ALL) diff --git a/core/src/main/java/feast/core/model/Project.java b/core/src/main/java/feast/core/model/Project.java index a4a1997cf6..d6e6149394 100644 --- a/core/src/main/java/feast/core/model/Project.java +++ b/core/src/main/java/feast/core/model/Project.java @@ -44,7 +44,11 @@ public class Project { @Column(name = "archived", nullable = false) private boolean archived; - @OneToMany(cascade = CascadeType.ALL, fetch = FetchType.EAGER, orphanRemoval = true, mappedBy = "project") + @OneToMany( + cascade = CascadeType.ALL, + fetch = FetchType.EAGER, + orphanRemoval = true, + mappedBy = "project") private Set featureSets; public Project() { diff --git a/core/src/main/java/feast/core/model/Store.java b/core/src/main/java/feast/core/model/Store.java index 4fa70b655a..9dc44bdc73 100644 --- a/core/src/main/java/feast/core/model/Store.java +++ b/core/src/main/java/feast/core/model/Store.java @@ -118,8 +118,9 @@ public List getSubscriptions() { } private static String convertSubscriptionToString(Subscription sub) { - if(sub.getVersion().isEmpty() || sub.getName().isEmpty() || sub.getProject().isEmpty()){ - throw new IllegalArgumentException(String.format("Missing arguments in subscription string: %s", sub.toString())); + if (sub.getVersion().isEmpty() || sub.getName().isEmpty() || sub.getProject().isEmpty()) { + throw new IllegalArgumentException( + String.format("Missing arguments in subscription string: %s", sub.toString())); } return String.format("%s:%s:%s", sub.getProject(), sub.getName(), sub.getVersion()); } @@ -129,6 +130,10 @@ private Subscription convertStringToSubscription(String sub) { return Subscription.newBuilder().build(); } String[] split = sub.split(":", 3); - return Subscription.newBuilder().setProject(split[0]).setName(split[1]).setVersion(split[2]).build(); + return Subscription.newBuilder() + .setProject(split[0]) + .setName(split[1]) + .setVersion(split[2]) + .build(); } -} \ No newline at end of file +} diff --git a/core/src/main/java/feast/core/service/AccessManagementService.java b/core/src/main/java/feast/core/service/AccessManagementService.java index e17ba156f5..6f627df33d 100644 --- a/core/src/main/java/feast/core/service/AccessManagementService.java +++ b/core/src/main/java/feast/core/service/AccessManagementService.java @@ -25,7 +25,6 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; - @Slf4j @Service public class AccessManagementService { @@ -33,8 +32,7 @@ public class AccessManagementService { private ProjectRepository projectRepository; @Autowired - public AccessManagementService( - ProjectRepository projectRepository) { + public AccessManagementService(ProjectRepository projectRepository) { this.projectRepository = projectRepository; } @@ -45,7 +43,7 @@ public AccessManagementService( */ @Transactional public void createProject(String name) { - if(projectRepository.existsById(name)){ + if (projectRepository.existsById(name)) { throw new IllegalArgumentException(String.format("Project already exists: %s", name)); } Project project = new Project(name); diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index b1212ae6e6..23ad041b81 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -95,14 +95,15 @@ public void Poll() { Set featureSets = new HashSet<>(); for (Subscription subscription : store.getSubscriptionsList()) { featureSets.addAll( - new ArrayList<>(specService - .listFeatureSets( - ListFeatureSetsRequest.Filter.newBuilder() - .setFeatureSetName(subscription.getName()) - .setFeatureSetVersion(subscription.getVersion()) - .setProject(subscription.getProject()) - .build()) - .getFeatureSetsList())); + new ArrayList<>( + specService + .listFeatureSets( + ListFeatureSetsRequest.Filter.newBuilder() + .setFeatureSetName(subscription.getName()) + .setFeatureSetVersion(subscription.getVersion()) + .setProject(subscription.getProject()) + .build()) + .getFeatureSetsList())); } if (!featureSets.isEmpty()) { featureSets.stream() diff --git a/core/src/main/java/feast/core/service/SpecService.java b/core/src/main/java/feast/core/service/SpecService.java index 17fcd10918..2d977fcd3f 100644 --- a/core/src/main/java/feast/core/service/SpecService.java +++ b/core/src/main/java/feast/core/service/SpecService.java @@ -106,13 +106,12 @@ public GetFeatureSetResponse getFeatureSet(GetFeatureSetRequest request) { // Filter the list based on version if (request.getVersion() == 0) { featureSet = - featureSetRepository - .findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc(request.getName(), - request.getProject()); + featureSetRepository.findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc( + request.getName(), request.getProject()); if (featureSet == null) { - throw new RetrievalException(String.format( - "Feature set with name \"%s\" could not be found.", request.getName())); + throw new RetrievalException( + String.format("Feature set with name \"%s\" could not be found.", request.getName())); } } else { featureSet = @@ -120,9 +119,10 @@ public GetFeatureSetResponse getFeatureSet(GetFeatureSetRequest request) { request.getName(), request.getProject(), request.getVersion()); if (featureSet == null) { - throw new RetrievalException(String.format( - "Feature set with name \"%s\" and version \"%s\" could " + "not be found.", - request.getName(), request.getVersion())); + throw new RetrievalException( + String.format( + "Feature set with name \"%s\" and version \"%s\" could " + "not be found.", + request.getName(), request.getVersion())); } } @@ -130,24 +130,22 @@ public GetFeatureSetResponse getFeatureSet(GetFeatureSetRequest request) { return GetFeatureSetResponse.newBuilder().setFeatureSet(featureSet.toProto()).build(); } - /** * Return a list of feature sets matching the feature set name, version, and project provided in * the filter. All fields are requried. Use '*' for all three arguments in order to return all * feature sets and versions in all projects. * - *

Project name can be explicitly provided, or an asterisk can be provided to match all + *

Project name can be explicitly provided, or an asterisk can be provided to match all * projects. It is not possible to provide a combination of asterisks/wildcards and text. * - *

The feature set name in the filter accepts an asterisk as a wildcard. All matching - * feature sets will be returned. Regex is not supported. Explicitly defining a feature set name - * is not possible if a project name is not set explicitly + *

The feature set name in the filter accepts an asterisk as a wildcard. All matching feature + * sets will be returned. Regex is not supported. Explicitly defining a feature set name is not + * possible if a project name is not set explicitly * - *

The version field can be one of - * - '*' - This will match all versions - * - 'latest' - This will match the latest feature set version - * - '' - This will match a specific feature set version. This property can only be set - * if both the feature set name and project name are explicitly set. + *

The version field can be one of - '*' - This will match all versions - 'latest' - This will + * match the latest feature set version - '' - This will match a specific feature set + * version. This property can only be set if both the feature set name and project name are + * explicitly set. * * @param filter filter containing the desired featureSet name and version filter * @return ListFeatureSetsResponse with list of featureSets found matching the filter @@ -159,81 +157,73 @@ public ListFeatureSetsResponse listFeatureSets(ListFeatureSetsRequest.Filter fil if (project.isEmpty() || name.isEmpty() || version.isEmpty()) { throw new IllegalArgumentException( - String - .format( - "Invalid listFeatureSetRequest, missing arguments. Must provide project, feature set name, and version.", - filter.toString())); + String.format( + "Invalid listFeatureSetRequest, missing arguments. Must provide project, feature set name, and version.", + filter.toString())); } checkValidCharactersAllowAsterisk(name, "featureSetName"); checkValidCharactersAllowAsterisk(project, "projectName"); - List featureSets = new ArrayList() { - }; + List featureSets = new ArrayList() {}; if (project.equals("*")) { // Matching all projects if (name.equals("*") && version.equals("*")) { - featureSets = featureSetRepository - .findAllByNameLikeAndProject_NameLikeOrderByNameAscVersionAsc( - name.replace('*', '%'), - project.replace('*', '%')); + featureSets = + featureSetRepository.findAllByNameLikeAndProject_NameLikeOrderByNameAscVersionAsc( + name.replace('*', '%'), project.replace('*', '%')); } else { throw new IllegalArgumentException( - String - .format( - "Invalid listFeatureSetRequest. Version and feature set name must be set to " - + "\"*\" if the project name and feature set name aren't set explicitly: \n%s", - filter.toString())); + String.format( + "Invalid listFeatureSetRequest. Version and feature set name must be set to " + + "\"*\" if the project name and feature set name aren't set explicitly: \n%s", + filter.toString())); } } else if (!project.contains("*")) { // Matching a specific project if (name.contains("*") && version.equals("*")) { // Find all feature sets matching a pattern and versions in a specific project - featureSets = featureSetRepository - .findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( - name.replace('*', '%'), - project); + featureSets = + featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( + name.replace('*', '%'), project); } else if (!name.contains("*") && version.equals("*")) { // Find all versions of a specific feature set in a specific project - featureSets = featureSetRepository - .findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( - name, - project); + featureSets = + featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( + name, project); } else if (version.equals("latest")) { - // Find the latest version of a feature set matching a specific pattern in a specific project - FeatureSet latestFeatureSet = featureSetRepository - .findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc( - name.replace('*', '%'), - project); + // Find the latest version of a feature set matching a specific pattern in a specific + // project + FeatureSet latestFeatureSet = + featureSetRepository.findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc( + name.replace('*', '%'), project); featureSets.add(latestFeatureSet); } else if (!name.contains("*") && StringUtils.isNumeric(version)) { // Find a specific version of a feature set matching a specific name in a specific project - FeatureSet specificFeatureSet = featureSetRepository - .findFeatureSetByNameAndProject_NameAndVersion(name, project, - Integer.parseInt(version)); + FeatureSet specificFeatureSet = + featureSetRepository.findFeatureSetByNameAndProject_NameAndVersion( + name, project, Integer.parseInt(version)); featureSets.add(specificFeatureSet); } else { throw new IllegalArgumentException( - String - .format( - "Invalid listFeatureSetRequest. Version must be set to \"*\" if the project " - + "name and feature set name aren't set explicitly: \n%s", - filter.toString())); + String.format( + "Invalid listFeatureSetRequest. Version must be set to \"*\" if the project " + + "name and feature set name aren't set explicitly: \n%s", + filter.toString())); } } else { throw new IllegalArgumentException( - String - .format( - "Invalid listFeatureSetRequest. Project name cannot be a pattern. It may only be" - + "a specific project name or an asterisk: \n%s", - filter.toString())); + String.format( + "Invalid listFeatureSetRequest. Project name cannot be a pattern. It may only be" + + "a specific project name or an asterisk: \n%s", + filter.toString())); } ListFeatureSetsResponse.Builder response = ListFeatureSetsResponse.newBuilder(); @@ -284,9 +274,9 @@ public ListStoresResponse listStores(ListStoresRequest.Filter filter) { * Creates or updates a feature set in the repository. If there is a change in the feature set * schema, then the feature set version will be incremented. * - *

This function is idempotent. If no changes are detected in the incoming featureSet's - * schema, this method will update the incoming featureSet spec with the latest version stored in - * the repository, and return that. + *

This function is idempotent. If no changes are detected in the incoming featureSet's schema, + * this method will update the incoming featureSet spec with the latest version stored in the + * repository, and return that. * * @param newFeatureSet Feature set that will be created or updated. */ @@ -297,28 +287,33 @@ public ApplyFeatureSetResponse applyFeatureSet(FeatureSetProto.FeatureSet newFea // Ensure that the project already exists String project_name = newFeatureSet.getSpec().getProject(); - Project project = projectRepository.findById(newFeatureSet.getSpec().getProject()) - .orElseThrow(() -> new IllegalArgumentException(String - .format("Project name does not exist. Please create a project first: %s", project_name - ))); + Project project = + projectRepository + .findById(newFeatureSet.getSpec().getProject()) + .orElseThrow( + () -> + new IllegalArgumentException( + String.format( + "Project name does not exist. Please create a project first: %s", + project_name))); // Ensure that the project is not archived if (project.isArchived()) { - throw new IllegalArgumentException(String - .format("Project is archived: %s", project_name - )); + throw new IllegalArgumentException(String.format("Project is archived: %s", project_name)); } // Retrieve all existing FeatureSet objects List existingFeatureSets = - featureSetRepository - .findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( - newFeatureSet.getSpec().getName(), project_name); + featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( + newFeatureSet.getSpec().getName(), project_name); if (existingFeatureSets.size() == 0) { // Create new feature set since it doesn't exist - newFeatureSet = newFeatureSet.toBuilder() - .setSpec(newFeatureSet.getSpec().toBuilder().setVersion(1)).build(); + newFeatureSet = + newFeatureSet + .toBuilder() + .setSpec(newFeatureSet.getSpec().toBuilder().setVersion(1)) + .build(); } else { // Retrieve the latest feature set if the name does exist existingFeatureSets = Ordering.natural().reverse().sortedCopy(existingFeatureSets); @@ -333,9 +328,11 @@ public ApplyFeatureSetResponse applyFeatureSet(FeatureSetProto.FeatureSet newFea .build(); } // TODO: There is a race condition here with incrementing the version - newFeatureSet = newFeatureSet.toBuilder() - .setSpec(newFeatureSet.getSpec().toBuilder().setVersion(latest.getVersion() + 1)) - .build(); + newFeatureSet = + newFeatureSet + .toBuilder() + .setSpec(newFeatureSet.getSpec().toBuilder().setVersion(latest.getVersion() + 1)) + .build(); } // Build a new FeatureSet object which includes the new properties @@ -371,8 +368,7 @@ public UpdateStoreResponse updateStore(UpdateStoreRequest updateStoreRequest) // Ensure that all fields in a subscription contain values if ((sub.getVersion().isEmpty() || sub.getName().isEmpty()) || sub.getProject().isEmpty()) { throw new IllegalArgumentException( - String - .format("Missing parameter in subscription: %s", sub)); + String.format("Missing parameter in subscription: %s", sub)); } } Store existingStore = storeRepository.findById(newStoreProto.getName()).orElse(null); @@ -392,5 +388,4 @@ public UpdateStoreResponse updateStore(UpdateStoreRequest updateStoreRequest) .setStore(updateStoreRequest.getStore()) .build(); } - } diff --git a/core/src/main/java/feast/core/validators/FeatureSetValidator.java b/core/src/main/java/feast/core/validators/FeatureSetValidator.java index 7fc7a476bd..213e3898d5 100644 --- a/core/src/main/java/feast/core/validators/FeatureSetValidator.java +++ b/core/src/main/java/feast/core/validators/FeatureSetValidator.java @@ -28,16 +28,17 @@ public class FeatureSetValidator { public static void validateSpec(FeatureSet featureSet) { - if(featureSet.getSpec().getProject().isEmpty()){ + if (featureSet.getSpec().getProject().isEmpty()) { throw new IllegalArgumentException("Project name must be provided"); } - if(featureSet.getSpec().getName().isEmpty()){ + if (featureSet.getSpec().getName().isEmpty()) { throw new IllegalArgumentException("Feature set name must be provided"); } checkValidCharacters(featureSet.getSpec().getProject(), "project"); checkValidCharacters(featureSet.getSpec().getName(), "name"); - checkUniqueColumns(featureSet.getSpec().getEntitiesList(), featureSet.getSpec().getFeaturesList()); + checkUniqueColumns( + featureSet.getSpec().getEntitiesList(), featureSet.getSpec().getFeaturesList()); for (EntitySpec entitySpec : featureSet.getSpec().getEntitiesList()) { checkValidCharacters(entitySpec.getName(), "entities::name"); } diff --git a/core/src/test/java/feast/core/CoreApplicationTest.java b/core/src/test/java/feast/core/CoreApplicationTest.java index 6c4daafcf6..7a35fc4369 100644 --- a/core/src/test/java/feast/core/CoreApplicationTest.java +++ b/core/src/test/java/feast/core/CoreApplicationTest.java @@ -14,8 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package feast.core; -public class CoreApplicationTest { -} +public class CoreApplicationTest {} diff --git a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java b/core/src/test/java/feast/core/job/JobUpdateTaskTest.java index ad6881eca7..19ce0858b2 100644 --- a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java +++ b/core/src/test/java/feast/core/job/JobUpdateTaskTest.java @@ -61,7 +61,8 @@ public void setUp() { .setName("test") .setType(StoreType.REDIS) .setRedisConfig(RedisConfig.newBuilder().build()) - .addSubscriptions(Subscription.newBuilder().setProject("*").setName("*").setVersion("*").build()) + .addSubscriptions( + Subscription.newBuilder().setProject("*").setName("*").setVersion("*").build()) .build(); source = @@ -80,13 +81,21 @@ public void shouldUpdateJobIfPresent() { FeatureSetProto.FeatureSet featureSet1 = FeatureSetProto.FeatureSet.newBuilder() .setSpec( - FeatureSetSpec.newBuilder().setSource(source).setProject("project1").setName("featureSet1").setVersion(1)) + FeatureSetSpec.newBuilder() + .setSource(source) + .setProject("project1") + .setName("featureSet1") + .setVersion(1)) .setMeta(FeatureSetMeta.newBuilder()) .build(); FeatureSetProto.FeatureSet featureSet2 = FeatureSetProto.FeatureSet.newBuilder() .setSpec( - FeatureSetSpec.newBuilder().setSource(source).setProject("project1").setName("featureSet2").setVersion(1)) + FeatureSetSpec.newBuilder() + .setSource(source) + .setProject("project1") + .setName("featureSet2") + .setVersion(1)) .setMeta(FeatureSetMeta.newBuilder()) .build(); Job originalJob = @@ -137,18 +146,17 @@ public void shouldCreateJobIfNotPresent() { FeatureSetProto.FeatureSet featureSet1 = FeatureSetProto.FeatureSet.newBuilder() .setSpec( - FeatureSetSpec.newBuilder().setSource(source).setProject("project1").setName("featureSet1").setVersion(1)) + FeatureSetSpec.newBuilder() + .setSource(source) + .setProject("project1") + .setName("featureSet1") + .setVersion(1)) .setMeta(FeatureSetMeta.newBuilder()) .build(); JobUpdateTask jobUpdateTask = spy( new JobUpdateTask( - Arrays.asList(featureSet1), - source, - store, - Optional.empty(), - jobManager, - 100L)); + Arrays.asList(featureSet1), source, store, Optional.empty(), jobManager, 100L)); doReturn("job").when(jobUpdateTask).createJobId("KAFKA/servers:9092/topic", "test"); Job expectedInput = @@ -183,7 +191,11 @@ public void shouldUpdateJobStatusIfNotCreateOrUpdate() { FeatureSetProto.FeatureSet featureSet1 = FeatureSetProto.FeatureSet.newBuilder() .setSpec( - FeatureSetSpec.newBuilder().setSource(source).setProject("project1").setName("featureSet1").setVersion(1)) + FeatureSetSpec.newBuilder() + .setSource(source) + .setProject("project1") + .setName("featureSet1") + .setVersion(1)) .setMeta(FeatureSetMeta.newBuilder()) .build(); Job originalJob = @@ -197,12 +209,7 @@ public void shouldUpdateJobStatusIfNotCreateOrUpdate() { JobStatus.RUNNING); JobUpdateTask jobUpdateTask = new JobUpdateTask( - Arrays.asList(featureSet1), - source, - store, - Optional.of(originalJob), - jobManager, - 100L); + Arrays.asList(featureSet1), source, store, Optional.of(originalJob), jobManager, 100L); when(jobManager.getJobStatus(originalJob)).thenReturn(JobStatus.ABORTING); Job expected = @@ -224,18 +231,17 @@ public void shouldReturnJobWithErrorStatusIfFailedToSubmit() { FeatureSetProto.FeatureSet featureSet1 = FeatureSetProto.FeatureSet.newBuilder() .setSpec( - FeatureSetSpec.newBuilder().setSource(source).setProject("project1").setName("featureSet1").setVersion(1)) + FeatureSetSpec.newBuilder() + .setSource(source) + .setProject("project1") + .setName("featureSet1") + .setVersion(1)) .setMeta(FeatureSetMeta.newBuilder()) .build(); JobUpdateTask jobUpdateTask = spy( new JobUpdateTask( - Arrays.asList(featureSet1), - source, - store, - Optional.empty(), - jobManager, - 100L)); + Arrays.asList(featureSet1), source, store, Optional.empty(), jobManager, 100L)); doReturn("job").when(jobUpdateTask).createJobId("KAFKA/servers:9092/topic", "test"); Job expectedInput = @@ -271,8 +277,13 @@ public void shouldTimeout() { FeatureSetProto.FeatureSet featureSet1 = FeatureSetProto.FeatureSet.newBuilder() .setSpec( - FeatureSetSpec.newBuilder().setSource(source).setProject("project1").setName("featureSet1").setVersion(1)) - .setMeta(FeatureSetMeta.newBuilder()).build(); + FeatureSetSpec.newBuilder() + .setSource(source) + .setProject("project1") + .setName("featureSet1") + .setVersion(1)) + .setMeta(FeatureSetMeta.newBuilder()) + .build(); JobUpdateTask jobUpdateTask = spy( diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java index 76ba2040c9..c263515ed0 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java @@ -67,11 +67,9 @@ public class DataflowJobManagerTest { - @Rule - public final ExpectedException expectedException = ExpectedException.none(); + @Rule public final ExpectedException expectedException = ExpectedException.none(); - @Mock - private Dataflow dataflow; + @Mock private Dataflow dataflow; private Map defaults; private DataflowJobManager dfJobManager; @@ -95,7 +93,8 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { .setName("SERVING") .setType(StoreType.REDIS) .setRedisConfig(RedisConfig.newBuilder().setHost("localhost").setPort(6379).build()) - .addSubscriptions(Subscription.newBuilder().setProject("*").setName("*").setVersion("*").build()) + .addSubscriptions( + Subscription.newBuilder().setProject("*").setName("*").setVersion("*").build()) .build(); SourceProto.Source source = @@ -111,9 +110,12 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { FeatureSetProto.FeatureSet featureSet = FeatureSetProto.FeatureSet.newBuilder() .setMeta(FeatureSetMeta.newBuilder()) - .setSpec(FeatureSetSpec.newBuilder().setSource(source).setName("featureSet") - .setVersion(1) - .setMaxAge(Duration.newBuilder().build())) + .setSpec( + FeatureSetSpec.newBuilder() + .setSource(source) + .setName("featureSet") + .setVersion(1) + .setMaxAge(Duration.newBuilder().build())) .build(); Printer printer = JsonFormat.printer(); @@ -146,9 +148,7 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { Runner.DATAFLOW.getName(), Source.fromProto(source), Store.fromProto(store), - Lists.newArrayList( - FeatureSet.fromProto( - featureSet)), + Lists.newArrayList(FeatureSet.fromProto(featureSet)), JobStatus.PENDING); Job actual = dfJobManager.startJob(job); @@ -196,8 +196,12 @@ public void shouldThrowExceptionWhenJobStateTerminal() throws IOException { FeatureSetProto.FeatureSet featureSet = FeatureSetProto.FeatureSet.newBuilder() .setSpec( - FeatureSetSpec.newBuilder().setName("featureSet").setVersion(1).setSource(source) - .build()).build(); + FeatureSetSpec.newBuilder() + .setName("featureSet") + .setVersion(1) + .setSource(source) + .build()) + .build(); dfJobManager = Mockito.spy(dfJobManager); @@ -213,8 +217,7 @@ public void shouldThrowExceptionWhenJobStateTerminal() throws IOException { Runner.DATAFLOW.getName(), Source.fromProto(source), Store.fromProto(store), - Lists.newArrayList( - FeatureSet.fromProto(featureSet)), + Lists.newArrayList(FeatureSet.fromProto(featureSet)), JobStatus.PENDING); expectedException.expect(JobExecutionException.class); diff --git a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java index 53983a775a..2dd87cfc6e 100644 --- a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java +++ b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java @@ -86,7 +86,8 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { .setName("SERVING") .setType(StoreType.REDIS) .setRedisConfig(RedisConfig.newBuilder().setHost("localhost").setPort(6379).build()) - .addSubscriptions(Subscription.newBuilder().setProject("*").setName("*").setVersion("*").build()) + .addSubscriptions( + Subscription.newBuilder().setProject("*").setName("*").setVersion("*").build()) .build(); SourceProto.Source source = @@ -101,7 +102,13 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { FeatureSetProto.FeatureSet featureSet = FeatureSetProto.FeatureSet.newBuilder() - .setSpec(FeatureSetSpec.newBuilder().setName("featureSet").setVersion(1).setMaxAge(Duration.newBuilder()).setSource(source).build()) + .setSpec( + FeatureSetSpec.newBuilder() + .setName("featureSet") + .setVersion(1) + .setMaxAge(Duration.newBuilder()) + .setSource(source) + .build()) .build(); Printer printer = JsonFormat.printer(); @@ -132,8 +139,7 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { Runner.DIRECT.getName(), Source.fromProto(source), Store.fromProto(store), - Lists.newArrayList( - FeatureSet.fromProto(featureSet)), + Lists.newArrayList(FeatureSet.fromProto(featureSet)), JobStatus.PENDING); Job actual = drJobManager.startJob(job); verify(drJobManager, times(1)).runPipeline(pipelineOptionsCaptor.capture()); diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index 937c0f5eba..775cb028b0 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -60,8 +60,7 @@ public class JobCoordinatorServiceTest { @Rule public final ExpectedException exception = ExpectedException.none(); - @Mock - JobRepository jobRepository; + @Mock JobRepository jobRepository; @Mock JobManager jobManager; @Mock SpecService specService; @Mock FeatureSetRepository featureSetRepository; @@ -92,12 +91,17 @@ public void shouldDoNothingIfNoMatchingFeatureSetsFound() throws InvalidProtocol .setName("test") .setType(StoreType.REDIS) .setRedisConfig(RedisConfig.newBuilder().build()) - .addSubscriptions(Subscription.newBuilder().setProject("*").setName("*").setVersion("*").build()) + .addSubscriptions( + Subscription.newBuilder().setProject("*").setName("*").setVersion("*").build()) .build(); when(specService.listStores(any())) .thenReturn(ListStoresResponse.newBuilder().addStore(store).build()); when(specService.listFeatureSets( - Filter.newBuilder().setProject("*").setFeatureSetName("*").setFeatureSetVersion("*").build())) + Filter.newBuilder() + .setProject("*") + .setFeatureSetName("*") + .setFeatureSetVersion("*") + .build())) .thenReturn(ListFeatureSetsResponse.newBuilder().build()); JobCoordinatorService jcs = new JobCoordinatorService( @@ -114,7 +118,11 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep .setType(StoreType.REDIS) .setRedisConfig(RedisConfig.newBuilder().build()) .addSubscriptions( - Subscription.newBuilder().setProject("project1").setName("features").setVersion("*").build()) + Subscription.newBuilder() + .setProject("project1") + .setName("features") + .setVersion("*") + .build()) .build(); Source source = Source.newBuilder() @@ -129,17 +137,22 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep FeatureSetProto.FeatureSet featureSet1 = FeatureSetProto.FeatureSet.newBuilder() .setSpec( - FeatureSetSpec.newBuilder().setSource(source).setProject("project1").setName("features").setVersion(1)) - .setMeta( - FeatureSetMeta.newBuilder() - ) + FeatureSetSpec.newBuilder() + .setSource(source) + .setProject("project1") + .setName("features") + .setVersion(1)) + .setMeta(FeatureSetMeta.newBuilder()) .build(); FeatureSetProto.FeatureSet featureSet2 = FeatureSetProto.FeatureSet.newBuilder() .setSpec( - FeatureSetSpec.newBuilder().setSource(source).setProject("project1").setName("features").setVersion(2)) - .setMeta( - FeatureSetMeta.newBuilder()) + FeatureSetSpec.newBuilder() + .setSource(source) + .setProject("project1") + .setName("features") + .setVersion(2)) + .setMeta(FeatureSetMeta.newBuilder()) .build(); String extId = "ext"; ArgumentCaptor jobArgCaptor = ArgumentCaptor.forClass(Job.class); @@ -165,7 +178,11 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep JobStatus.RUNNING); when(specService.listFeatureSets( - Filter.newBuilder().setProject("project1").setFeatureSetName("features").setFeatureSetVersion("*").build())) + Filter.newBuilder() + .setProject("project1") + .setFeatureSetName("features") + .setFeatureSetVersion("*") + .build())) .thenReturn( ListFeatureSetsResponse.newBuilder() .addFeatureSets(featureSet1) @@ -194,7 +211,11 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { .setType(StoreType.REDIS) .setRedisConfig(RedisConfig.newBuilder().build()) .addSubscriptions( - Subscription.newBuilder().setProject("project1").setName("features").setVersion("*").build()) + Subscription.newBuilder() + .setProject("project1") + .setName("features") + .setVersion("*") + .build()) .build(); Source source1 = Source.newBuilder() @@ -218,16 +239,22 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { FeatureSetProto.FeatureSet featureSet1 = FeatureSetProto.FeatureSet.newBuilder() .setSpec( - FeatureSetSpec.newBuilder().setSource(source1).setProject("project1").setName("features").setVersion(1)) - .setMeta( - FeatureSetMeta.newBuilder()) + FeatureSetSpec.newBuilder() + .setSource(source1) + .setProject("project1") + .setName("features") + .setVersion(1)) + .setMeta(FeatureSetMeta.newBuilder()) .build(); FeatureSetProto.FeatureSet featureSet2 = FeatureSetProto.FeatureSet.newBuilder() .setSpec( - FeatureSetSpec.newBuilder().setSource(source2).setProject("project1").setName("features").setVersion(2)) - .setMeta( - FeatureSetMeta.newBuilder()) + FeatureSetSpec.newBuilder() + .setSource(source2) + .setProject("project1") + .setName("features") + .setVersion(2)) + .setMeta(FeatureSetMeta.newBuilder()) .build(); Job expectedInput1 = @@ -272,7 +299,11 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { ArgumentCaptor jobArgCaptor = ArgumentCaptor.forClass(Job.class); when(specService.listFeatureSets( - Filter.newBuilder().setProject("project1").setFeatureSetName("features").setFeatureSetVersion("*").build())) + Filter.newBuilder() + .setProject("project1") + .setFeatureSetName("features") + .setFeatureSetVersion("*") + .build())) .thenReturn( ListFeatureSetsResponse.newBuilder() .addFeatureSets(featureSet1) diff --git a/core/src/test/java/feast/core/service/SpecServiceTest.java b/core/src/test/java/feast/core/service/SpecServiceTest.java index 6a51521271..edd99aa494 100644 --- a/core/src/test/java/feast/core/service/SpecServiceTest.java +++ b/core/src/test/java/feast/core/service/SpecServiceTest.java @@ -74,17 +74,13 @@ public class SpecServiceTest { - @Mock - private FeatureSetRepository featureSetRepository; + @Mock private FeatureSetRepository featureSetRepository; - @Mock - private StoreRepository storeRepository; + @Mock private StoreRepository storeRepository; - @Mock - private ProjectRepository projectRepository; + @Mock private ProjectRepository projectRepository; - @Rule - public final ExpectedException expectedException = ExpectedException.none(); + @Rule public final ExpectedException expectedException = ExpectedException.none(); private SpecService specService; private List featureSets; @@ -125,37 +121,32 @@ public void setUp() { featureSets = Arrays.asList(featureSet1v1, featureSet1v2, featureSet1v3, featureSet2v1, featureSet3v1); when(featureSetRepository.findAll()).thenReturn(featureSets); - when(featureSetRepository.findAllByOrderByNameAscVersionAsc()) - .thenReturn(featureSets); + when(featureSetRepository.findAllByOrderByNameAscVersionAsc()).thenReturn(featureSets); - when(featureSetRepository - .findFeatureSetByNameAndProject_NameAndVersion("f1", - "project1", 1)).thenReturn(featureSets.get(0)); - when(featureSetRepository - .findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc("f1", - "project1")).thenReturn(featureSets.subList(0, 3)); - when(featureSetRepository - .findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc("f3", - "project1")).thenReturn(featureSets.subList(4, 5)); - when(featureSetRepository - .findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc("f1", "project1")) + when(featureSetRepository.findFeatureSetByNameAndProject_NameAndVersion("f1", "project1", 1)) + .thenReturn(featureSets.get(0)); + when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( + "f1", "project1")) + .thenReturn(featureSets.subList(0, 3)); + when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( + "f3", "project1")) + .thenReturn(featureSets.subList(4, 5)); + when(featureSetRepository.findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc( + "f1", "project1")) .thenReturn(featureSet1v3); - when(featureSetRepository - .findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc("f1", "project1")) + when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( + "f1", "project1")) .thenReturn(featureSets.subList(0, 3)); - when(featureSetRepository - .findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc("asd", - "project1")).thenReturn(Lists.newArrayList()); - when(featureSetRepository - .findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc("f%", - "project1")) + when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( + "asd", "project1")) + .thenReturn(Lists.newArrayList()); + when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( + "f%", "project1")) .thenReturn(featureSets); - when(featureSetRepository - .findAllByNameLikeAndProject_NameLikeOrderByNameAscVersionAsc("%", - "%")) + when(featureSetRepository.findAllByNameLikeAndProject_NameLikeOrderByNameAscVersionAsc( + "%", "%")) .thenReturn(featureSets); - when(projectRepository.findAllByArchivedIsFalse()) .thenReturn(Collections.singletonList(new Project("project1"))); when(projectRepository.findById("project1")).thenReturn(Optional.of(new Project("project1"))); @@ -167,15 +158,18 @@ public void setUp() { when(storeRepository.findById("SERVING")).thenReturn(Optional.of(store1)); when(storeRepository.findById("NOTFOUND")).thenReturn(Optional.empty()); - specService = new SpecService(featureSetRepository, storeRepository, projectRepository, - defaultSource); + specService = + new SpecService(featureSetRepository, storeRepository, projectRepository, defaultSource); } @Test public void shouldGetAllFeatureSetsIfOnlyWildcardsProvided() { ListFeatureSetsResponse actual = specService.listFeatureSets( - Filter.newBuilder().setFeatureSetName("*").setProject("*").setFeatureSetVersion("*") + Filter.newBuilder() + .setFeatureSetName("*") + .setProject("*") + .setFeatureSetVersion("*") .build()); List list = new ArrayList<>(); for (FeatureSet featureSet : featureSets) { @@ -200,7 +194,11 @@ public void listFeatureSetShouldFailIfFeatureSetProvidedWithoutProject() { public void shouldGetAllFeatureSetsMatchingNameIfWildcardVersionProvided() { ListFeatureSetsResponse actual = specService.listFeatureSets( - Filter.newBuilder().setProject("project1").setFeatureSetName("f1").setFeatureSetVersion("*").build()); + Filter.newBuilder() + .setProject("project1") + .setFeatureSetName("f1") + .setFeatureSetVersion("*") + .build()); List expectedFeatureSets = featureSets.stream().filter(fs -> fs.getName().equals("f1")).collect(Collectors.toList()); List list = new ArrayList<>(); @@ -217,8 +215,11 @@ public void shouldGetAllFeatureSetsMatchingNameIfWildcardVersionProvided() { public void shouldGetAllFeatureSetsMatchingNameWithWildcardSearch() { ListFeatureSetsResponse actual = specService.listFeatureSets( - Filter.newBuilder().setProject("project1").setFeatureSetName("f*") - .setFeatureSetVersion("*").build()); + Filter.newBuilder() + .setProject("project1") + .setFeatureSetName("f*") + .setFeatureSetVersion("*") + .build()); List expectedFeatureSets = featureSets.stream() .filter(fs -> fs.getName().startsWith("f")) @@ -237,8 +238,11 @@ public void shouldGetAllFeatureSetsMatchingNameWithWildcardSearch() { public void shouldGetAllFeatureSetsMatchingVersionIfNoComparator() { ListFeatureSetsResponse actual = specService.listFeatureSets( - Filter.newBuilder().setProject("project1").setFeatureSetName("f1") - .setFeatureSetVersion("1").build()); + Filter.newBuilder() + .setProject("project1") + .setFeatureSetName("f1") + .setFeatureSetVersion("1") + .build()); List expectedFeatureSets = featureSets.stream() .filter(fs -> fs.getName().equals("f1")) @@ -258,8 +262,11 @@ public void shouldGetAllFeatureSetsMatchingVersionIfNoComparator() { public void shouldThrowExceptionIfGetAllFeatureSetsGivenVersionWithComparator() { expectedException.expect(IllegalArgumentException.class); specService.listFeatureSets( - Filter.newBuilder().setProject("project1").setFeatureSetName("f1") - .setFeatureSetVersion(">1").build()); + Filter.newBuilder() + .setProject("project1") + .setFeatureSetName("f1") + .setFeatureSetVersion(">1") + .build()); } @Test @@ -277,7 +284,10 @@ public void shouldGetSpecificFeatureSetGivenSpecificVersionFilter() { .thenReturn(featureSets.get(1)); GetFeatureSetResponse actual = specService.getFeatureSet( - GetFeatureSetRequest.newBuilder().setProject("project1").setName("f1").setVersion(2) + GetFeatureSetRequest.newBuilder() + .setProject("project1") + .setName("f1") + .setVersion(2) .build()); FeatureSet expected = featureSets.get(1); assertThat(actual.getFeatureSet(), equalTo(expected.toProto())); @@ -296,7 +306,10 @@ public void shouldThrowExceptionGivenMissingFeatureSet() { expectedException.expectMessage( "Feature set with name \"f1000\" and version \"2\" could not be found."); specService.getFeatureSet( - GetFeatureSetRequest.newBuilder().setName("f1000").setProject("project1").setVersion(2) + GetFeatureSetRequest.newBuilder() + .setName("f1000") + .setProject("project1") + .setVersion(2) .build()); } @@ -309,8 +322,11 @@ public void shouldThrowRetrievalExceptionGivenInvalidFeatureSetVersionComparator + "feature_set_version: \">1\"\n" + "project: \"project1\""); specService.listFeatureSets( - Filter.newBuilder().setProject("project1").setFeatureSetName("f1") - .setFeatureSetVersion(">1").build()); + Filter.newBuilder() + .setProject("project1") + .setFeatureSetName("f1") + .setFeatureSetVersion(">1") + .build()); } @Test @@ -351,8 +367,7 @@ public void applyFeatureSetShouldReturnFeatureSetWithLatestVersionIfFeatureSetHa ApplyFeatureSetResponse applyFeatureSetResponse = specService.applyFeatureSet( - FeatureSetProto.FeatureSet.newBuilder() - .setSpec(incomingFeatureSetSpec).build()); + FeatureSetProto.FeatureSet.newBuilder().setSpec(incomingFeatureSetSpec).build()); verify(featureSetRepository, times(0)).save(ArgumentMatchers.any(FeatureSet.class)); assertThat(applyFeatureSetResponse.getStatus(), equalTo(Status.NO_CHANGE)); @@ -361,9 +376,9 @@ public void applyFeatureSetShouldReturnFeatureSetWithLatestVersionIfFeatureSetHa @Test public void applyFeatureSetShouldApplyFeatureSetWithInitVersionIfNotExists() { - when(featureSetRepository - .findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( - "f2", "project1")).thenReturn(Lists.newArrayList()); + when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( + "f2", "project1")) + .thenReturn(Lists.newArrayList()); FeatureSetProto.FeatureSet incomingFeatureSet = newDummyFeatureSet("f2", 1, "project1").toProto(); @@ -373,46 +388,64 @@ public void applyFeatureSetShouldApplyFeatureSetWithInitVersionIfNotExists() { ApplyFeatureSetResponse applyFeatureSetResponse = specService.applyFeatureSet( - FeatureSetProto.FeatureSet.newBuilder() - .setSpec(incomingFeatureSet.getSpec()).build()); + FeatureSetProto.FeatureSet.newBuilder().setSpec(incomingFeatureSet.getSpec()).build()); verify(projectRepository).saveAndFlush(ArgumentMatchers.any(Project.class)); FeatureSetProto.FeatureSet expected = FeatureSetProto.FeatureSet.newBuilder() .setSpec( - incomingFeatureSetSpec.toBuilder().setVersion(1).setSource(defaultSource.toProto()) + incomingFeatureSetSpec + .toBuilder() + .setVersion(1) + .setSource(defaultSource.toProto()) .build()) .build(); assertThat(applyFeatureSetResponse.getStatus(), equalTo(Status.CREATED)); assertThat(applyFeatureSetResponse.getFeatureSet().getSpec(), equalTo(expected.getSpec())); - assertThat(applyFeatureSetResponse.getFeatureSet().getSpec().getVersion(), + assertThat( + applyFeatureSetResponse.getFeatureSet().getSpec().getVersion(), equalTo(expected.getSpec().getVersion())); } @Test public void applyFeatureSetShouldIncrementFeatureSetVersionIfAlreadyExists() { - FeatureSetProto.FeatureSet incomingFeatureSet = - featureSets - .get(2) - .toProto(); - incomingFeatureSet = incomingFeatureSet.toBuilder() - .setMeta(incomingFeatureSet.getMeta()).setSpec( - incomingFeatureSet.getSpec().toBuilder().clearVersion().addFeatures( - FeatureSpec.newBuilder().setName("feature2").setValueType(Enum.STRING)).build()) - .build(); + FeatureSetProto.FeatureSet incomingFeatureSet = featureSets.get(2).toProto(); + incomingFeatureSet = + incomingFeatureSet + .toBuilder() + .setMeta(incomingFeatureSet.getMeta()) + .setSpec( + incomingFeatureSet + .getSpec() + .toBuilder() + .clearVersion() + .addFeatures( + FeatureSpec.newBuilder().setName("feature2").setValueType(Enum.STRING)) + .build()) + .build(); - FeatureSetProto.FeatureSet expected = incomingFeatureSet.toBuilder() - .setMeta(incomingFeatureSet.getMeta().toBuilder().build()).setSpec( - incomingFeatureSet.getSpec().toBuilder().setVersion(4) - .setSource(defaultSource.toProto()).build()).build(); + FeatureSetProto.FeatureSet expected = + incomingFeatureSet + .toBuilder() + .setMeta(incomingFeatureSet.getMeta().toBuilder().build()) + .setSpec( + incomingFeatureSet + .getSpec() + .toBuilder() + .setVersion(4) + .setSource(defaultSource.toProto()) + .build()) + .build(); ApplyFeatureSetResponse applyFeatureSetResponse = specService.applyFeatureSet(incomingFeatureSet); verify(projectRepository).saveAndFlush(ArgumentMatchers.any(Project.class)); assertThat(applyFeatureSetResponse.getStatus(), equalTo(Status.CREATED)); - assertEquals(FeatureSet.fromProto(applyFeatureSetResponse.getFeatureSet()), + assertEquals( + FeatureSet.fromProto(applyFeatureSetResponse.getFeatureSet()), FeatureSet.fromProto(expected)); - assertThat(applyFeatureSetResponse.getFeatureSet().getSpec().getVersion(), + assertThat( + applyFeatureSetResponse.getFeatureSet().getSpec().getVersion(), equalTo(expected.getSpec().getVersion())); } @@ -424,14 +457,14 @@ public void applyFeatureSetShouldNotCreateFeatureSetIfFieldsUnordered() { Field f3e1 = new Field("f3e1", Enum.STRING); FeatureSetProto.FeatureSet incomingFeatureSet = (new FeatureSet( - "f3", - "project1", - 5, - 100L, - Arrays.asList(f3e1), - Arrays.asList(f3f2, f3f1), - defaultSource, - FeatureSetStatus.STATUS_READY)) + "f3", + "project1", + 5, + 100L, + Arrays.asList(f3e1), + Arrays.asList(f3f2, f3f1), + defaultSource, + FeatureSetStatus.STATUS_READY)) .toProto(); ApplyFeatureSetResponse applyFeatureSetResponse = diff --git a/ingestion/src/main/java/feast/ingestion/utils/SpecUtil.java b/ingestion/src/main/java/feast/ingestion/utils/SpecUtil.java index 5419115e2e..9163c5b2d6 100644 --- a/ingestion/src/main/java/feast/ingestion/utils/SpecUtil.java +++ b/ingestion/src/main/java/feast/ingestion/utils/SpecUtil.java @@ -38,9 +38,7 @@ public static String getFeatureSetReference(FeatureSet featureSet) { return String.format("%s/%s:%d", spec.getProject(), spec.getName(), spec.getVersion()); } - /** - * Get only feature set specs that matches the subscription - */ + /** Get only feature set specs that matches the subscription */ public static List getSubscribedFeatureSets( List subscriptions, List featureSets) { List subscribed = new ArrayList<>(); @@ -53,15 +51,16 @@ public static List getSubscribedFeatureSets( } // If all wildcards, subscribe to everything - if (sub.getProject().equals("*") || sub.getName().equals("*") || sub.getVersion() - .equals("*")) { + if (sub.getProject().equals("*") + || sub.getName().equals("*") + || sub.getVersion().equals("*")) { subscribed.add(featureSet); break; } // If all wildcards, subscribe to everything - if (sub.getProject().equals("*") && (!sub.getName().equals("*") || !sub.getVersion() - .equals("*"))) { + if (sub.getProject().equals("*") + && (!sub.getName().equals("*") || !sub.getVersion().equals("*"))) { throw new IllegalArgumentException( String.format( "Subscription cannot have feature set name and/or version set if project is not defined: %s", diff --git a/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java b/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java index dc67bd86b4..7af98fb8f0 100644 --- a/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java +++ b/ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java @@ -208,7 +208,7 @@ public static void setupBigQuery( String tableName = String.format( "%s_%s_v%d", - featureSetSpec.getProject(), featureSetSpec.getName(), featureSetSpec.getVersion()) + featureSetSpec.getProject(), featureSetSpec.getName(), featureSetSpec.getVersion()) .replaceAll("-", "_"); TableId tableId = TableId.of(bigqueryProjectId, datasetId.getDataset(), tableName); diff --git a/ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java b/ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java index 87af809120..d129c15661 100644 --- a/ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java +++ b/ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java @@ -20,7 +20,6 @@ import feast.core.FeatureSetProto.EntitySpec; import feast.core.FeatureSetProto.FeatureSet; -import feast.core.FeatureSetProto.FeatureSetMeta; import feast.core.FeatureSetProto.FeatureSetSpec; import feast.core.FeatureSetProto.FeatureSpec; import feast.ingestion.values.FailedElement; diff --git a/sdk/java/src/main/java/com/gojek/feast/FeastClient.java b/sdk/java/src/main/java/com/gojek/feast/FeastClient.java index 91ba572efc..01157306d5 100644 --- a/sdk/java/src/main/java/com/gojek/feast/FeastClient.java +++ b/sdk/java/src/main/java/com/gojek/feast/FeastClient.java @@ -61,8 +61,8 @@ public GetFeastServingInfoResponse getFeastServingInfo() { * *

See {@link #getOnlineFeatures(List, List, str)} * - * @param features list of string feature references to retrieve, feature reference follows this format - * [project]/[name]:[version] + * @param features list of string feature references to retrieve, feature reference follows this + * format [project]/[name]:[version] * @param rows list of {@link Row} to select the entities to retrieve the features for * @param defaultProject {@link String} Default project to find features in if not provided in * feature reference. @@ -75,8 +75,8 @@ public List getOnlineFeatures(List features, List rows, String /** * Get online features from Feast. * - *

Example of retrieving online features for the driver project, with features - * driver_id and driver_name, both version 1 + *

Example of retrieving online features for the driver project, with features driver_id and + * driver_name, both version 1 * *

{@code
    * FeastClient client = FeastClient.create("localhost", 6566);
@@ -97,8 +97,12 @@ public List getOnlineFeatures(List features, List rows, String
    * @return list of {@link Row} containing features
    */
   public List getOnlineFeatures(
-      List featureRefStrings, List rows, String defaultProject, boolean omitEntitiesInResponse) {
-    List features = RequestUtil.createFeatureRefs(featureRefStrings, defaultProject);
+      List featureRefStrings,
+      List rows,
+      String defaultProject,
+      boolean omitEntitiesInResponse) {
+    List features =
+        RequestUtil.createFeatureRefs(featureRefStrings, defaultProject);
     List entityRows =
         rows.stream()
             .map(
diff --git a/sdk/java/src/main/java/com/gojek/feast/RequestUtil.java b/sdk/java/src/main/java/com/gojek/feast/RequestUtil.java
index 530af05e76..075c570c4e 100644
--- a/sdk/java/src/main/java/com/gojek/feast/RequestUtil.java
+++ b/sdk/java/src/main/java/com/gojek/feast/RequestUtil.java
@@ -23,8 +23,8 @@
 @SuppressWarnings("WeakerAccess")
 public class RequestUtil {
 
-  public static List createFeatureRefs(List featureRefStrings,
-      String defaultProject) {
+  public static List createFeatureRefs(
+      List featureRefStrings, String defaultProject) {
     if (featureRefStrings == null) {
       throw new IllegalArgumentException("featureRefs cannot be null");
     }
@@ -78,11 +78,14 @@ public static List createFeatureRefs(List featureRefSt
       }
 
       featureRefs.add(
-          FeatureReference.newBuilder().setName(name).setProject(project).setVersion(version)
+          FeatureReference.newBuilder()
+              .setName(name)
+              .setProject(project)
+              .setVersion(version)
               .build());
     }
 
-  ;  return featureRefs;
-
+    ;
+    return featureRefs;
   }
 }
diff --git a/sdk/java/src/test/java/com/gojek/feast/RequestUtilTest.java b/sdk/java/src/test/java/com/gojek/feast/RequestUtilTest.java
index 47505518ae..1c58e9435c 100644
--- a/sdk/java/src/test/java/com/gojek/feast/RequestUtilTest.java
+++ b/sdk/java/src/test/java/com/gojek/feast/RequestUtilTest.java
@@ -41,9 +41,8 @@ private static Stream provideValidFeatureIds() {
                 FeatureReference.newBuilder()
                     .setProject("driver_project")
                     .setName("driver_id")
-                    .setVersion(1).build()
-            )
-        ),
+                    .setVersion(1)
+                    .build())),
         Arguments.of(
             Arrays.asList("driver_project/driver_id:1", "driver_project/driver_name:1"),
             Arrays.asList(
@@ -56,10 +55,12 @@ private static Stream provideValidFeatureIds() {
                     .setProject("driver_project")
                     .setName("driver_name")
                     .setVersion(1)
-                    .build())
-        ),
+                    .build())),
         Arguments.of(
-            Arrays.asList("driver_project/driver_id:1", "driver_project/driver_name:1", "booking_project/driver_name:1"),
+            Arrays.asList(
+                "driver_project/driver_id:1",
+                "driver_project/driver_name:1",
+                "booking_project/driver_name:1"),
             Arrays.asList(
                 FeatureReference.newBuilder()
                     .setProject("driver_project")
@@ -103,12 +104,14 @@ private static Stream provideInvalidFeatureRefs() {
   @ParameterizedTest
   @MethodSource("provideInvalidFeatureRefs")
   void createFeatureSets_ShouldThrowExceptionForInvalidFeatureRefs(List input) {
-    assertThrows(IllegalArgumentException.class, () -> RequestUtil.createFeatureRefs(input, "my-project"));
+    assertThrows(
+        IllegalArgumentException.class, () -> RequestUtil.createFeatureRefs(input, "my-project"));
   }
 
   @ParameterizedTest
   @NullSource
   void createFeatureSets_ShouldThrowExceptionForNullFeatureRefs(List input) {
-    assertThrows(IllegalArgumentException.class, () -> RequestUtil.createFeatureRefs(input, "my-project"));
+    assertThrows(
+        IllegalArgumentException.class, () -> RequestUtil.createFeatureRefs(input, "my-project"));
   }
 }