Skip to content

Commit

Permalink
Apply spotless
Browse files Browse the repository at this point in the history
  • Loading branch information
zhilingc committed Dec 27, 2019
1 parent 3e40a61 commit f6c7ca0
Show file tree
Hide file tree
Showing 28 changed files with 434 additions and 368 deletions.
4 changes: 2 additions & 2 deletions core/src/main/java/feast/core/config/FeatureStreamConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public Source getDefaultSource(FeastProperties feastProperties) {
String topicName = streamProperties.getOptions().get("topic");
Map<String, Object> map = new HashMap<>();
map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,
DEFAULT_KAFKA_REQUEST_TIMEOUT_MS_CONFIG);
map.put(
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, DEFAULT_KAFKA_REQUEST_TIMEOUT_MS_CONFIG);
AdminClient client = AdminClient.create(map);

NewTopic newTopic =
Expand Down
20 changes: 9 additions & 11 deletions core/src/main/java/feast/core/dao/FeatureSetRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,18 @@
import java.util.List;
import org.springframework.data.jpa.repository.JpaRepository;

/**
* JPA repository supplying FeatureSet objects keyed by id.
*/
/** JPA repository supplying FeatureSet objects keyed by id. */
public interface FeatureSetRepository extends JpaRepository<FeatureSet, String> {

long count();

// Find single feature set by project, name, and version
FeatureSet findFeatureSetByNameAndProject_NameAndVersion(String name, String project,
Integer version);
FeatureSet findFeatureSetByNameAndProject_NameAndVersion(
String name, String project, Integer version);

// Find single latest version of a feature set by project and name (LIKE)
FeatureSet findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc(String name,
String project);
FeatureSet findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc(
String name, String project);

// find all feature sets and order by name and version
List<FeatureSet> findAllByOrderByNameAscVersionAsc();
Expand All @@ -42,10 +40,10 @@ FeatureSet findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc(String
List<FeatureSet> findAllByProject_NameOrderByNameAscVersionAsc(String project_name);

// find all versions of feature sets matching the given name pattern with a specific project.
List<FeatureSet> findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc(String name,
String project_name);
List<FeatureSet> findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc(
String name, String project_name);

// find all versions of feature sets matching the given name pattern and project pattern
List<FeatureSet> findAllByNameLikeAndProject_NameLikeOrderByNameAscVersionAsc(String name,
String project_name);
List<FeatureSet> findAllByNameLikeAndProject_NameLikeOrderByNameAscVersionAsc(
String name, String project_name);
}
1 change: 0 additions & 1 deletion core/src/main/java/feast/core/dao/ProjectRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package feast.core.dao;

import feast.core.model.Project;
Expand Down
66 changes: 23 additions & 43 deletions core/src/main/java/feast/core/grpc/CoreServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@
import org.lognet.springboot.grpc.GRpcService;
import org.springframework.beans.factory.annotation.Autowired;

/**
* Implementation of the feast core GRPC service.
*/
/** Implementation of the feast core GRPC service. */
@Slf4j
@GRpcService(interceptors = {MonitoringInterceptor.class})
public class CoreServiceImpl extends CoreServiceImplBase {
Expand Down Expand Up @@ -81,10 +79,8 @@ public void getFeatureSet(
responseObserver.onCompleted();
} catch (RetrievalException | StatusRuntimeException e) {
log.error("Exception has occurred in GetFeatureSet method: ", e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

Expand All @@ -97,10 +93,8 @@ public void listFeatureSets(
responseObserver.onCompleted();
} catch (RetrievalException | IllegalArgumentException e) {
log.error("Exception has occurred in ListFeatureSet method: ", e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

Expand All @@ -113,10 +107,8 @@ public void listStores(
responseObserver.onCompleted();
} catch (RetrievalException e) {
log.error("Exception has occurred in ListStores method: ", e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

Expand All @@ -132,16 +124,12 @@ public void applyFeatureSet(
"Unable to persist this feature set due to a constraint violation. Please ensure that"
+ " field names are unique within the project namespace: ",
e);
responseObserver.onError(Status.ALREADY_EXISTS
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
responseObserver.onError(
Status.ALREADY_EXISTS.withDescription(e.getMessage()).withCause(e).asRuntimeException());
} catch (Exception e) {
log.error("Exception has occurred in ApplyFeatureSet method: ", e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

Expand All @@ -154,10 +142,8 @@ public void updateStore(
responseObserver.onCompleted();
} catch (Exception e) {
log.error("Exception has occurred in UpdateStore method: ", e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

Expand All @@ -170,10 +156,8 @@ public void createProject(
responseObserver.onCompleted();
} catch (Exception e) {
log.error("Exception has occurred in the createProject method: ", e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

Expand All @@ -186,10 +170,8 @@ public void archiveProject(
responseObserver.onCompleted();
} catch (Exception e) {
log.error("Exception has occurred in the createProject method: ", e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

Expand All @@ -198,17 +180,15 @@ public void listProjects(
ListProjectsRequest request, StreamObserver<ListProjectsResponse> responseObserver) {
try {
List<Project> projects = accessManagementService.listProjects();
responseObserver.onNext(ListProjectsResponse.newBuilder()
.addAllProjects(projects.stream().map(Project::getName).collect(
Collectors.toList())).build());
responseObserver.onNext(
ListProjectsResponse.newBuilder()
.addAllProjects(projects.stream().map(Project::getName).collect(Collectors.toList()))
.build());
responseObserver.onCompleted();
} catch (Exception e) {
log.error("Exception has occurred in the listProjects method: ", e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

}
16 changes: 11 additions & 5 deletions core/src/main/java/feast/core/job/JobUpdateTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ public Job call() {
}
} else {
String jobId = createJobId(source.getId(), store.getName());
submittedJob =
executorService.submit(() -> startJob(jobId, featureSets, sourceSpec, store));
submittedJob = executorService.submit(() -> startJob(jobId, featureSets, sourceSpec, store));
}

Job job = null;
Expand All @@ -136,7 +135,10 @@ private Job startJob(
.map(
fsp ->
FeatureSet.fromProto(
FeatureSetProto.FeatureSet.newBuilder().setSpec(fsp.getSpec()).setMeta(fsp.getMeta()).build()))
FeatureSetProto.FeatureSet.newBuilder()
.setSpec(fsp.getSpec())
.setMeta(fsp.getMeta())
.build()))
.collect(Collectors.toList());
Job job =
new Job(
Expand Down Expand Up @@ -184,13 +186,17 @@ private Job startJob(
}

/** Update the given job */
private Job updateJob(Job job, List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store store) {
private Job updateJob(
Job job, List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store store) {
job.setFeatureSets(
featureSets.stream()
.map(
fs ->
FeatureSet.fromProto(
FeatureSetProto.FeatureSet.newBuilder().setSpec(fs.getSpec()).setMeta(fs.getMeta()).build()))
FeatureSetProto.FeatureSet.newBuilder()
.setSpec(fs.getSpec())
.setMeta(fs.getMeta())
.build()))
.collect(Collectors.toList()));
job.setStore(feast.core.model.Store.fromProto(store));
AuditLogger.log(
Expand Down
26 changes: 12 additions & 14 deletions core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,14 @@ public Runner getRunnerType() {
@Override
public Job startJob(Job job) {
List<FeatureSetProto.FeatureSet> featureSetProtos =
job.getFeatureSets().stream()
.map(FeatureSet::toProto)
.collect(Collectors.toList());
job.getFeatureSets().stream().map(FeatureSet::toProto).collect(Collectors.toList());
try {
return submitDataflowJob(
job.getId(), featureSetProtos, job.getSource().toProto(), job.getStore().toProto(), false);
job.getId(),
featureSetProtos,
job.getSource().toProto(),
job.getStore().toProto(),
false);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(String.format("Unable to start job %s", job.getId()), e);
}
Expand All @@ -100,9 +102,7 @@ public Job startJob(Job job) {
public Job updateJob(Job job) {
try {
List<FeatureSetProto.FeatureSet> featureSetProtos =
job.getFeatureSets().stream()
.map(FeatureSet::toProto)
.collect(Collectors.toList());
job.getFeatureSets().stream().map(FeatureSet::toProto).collect(Collectors.toList());

return submitDataflowJob(
job.getId(), featureSetProtos, job.getSource().toProto(), job.getStore().toProto(), true);
Expand Down Expand Up @@ -156,12 +156,7 @@ public JobStatus getJobStatus(Job job) {

try {
com.google.api.services.dataflow.model.Job dataflowJob =
dataflow
.projects()
.locations()
.jobs()
.get(projectId, location, job.getExtId())
.execute();
dataflow.projects().locations().jobs().get(projectId, location, job.getExtId()).execute();
return DataflowJobStateMapper.map(dataflowJob.getCurrentState());
} catch (Exception e) {
log.error(
Expand Down Expand Up @@ -208,7 +203,10 @@ private Job submitDataflowJob(
}

private ImportOptions getPipelineOptions(
String jobName, List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink, boolean update)
String jobName,
List<FeatureSetProto.FeatureSet> featureSets,
StoreProto.Store sink,
boolean update)
throws IOException {
String[] args = TypeConversion.convertMapToArgs(defaultOptions);
ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,9 @@ public Runner getRunnerType() {
public Job startJob(Job job) {
try {
List<FeatureSetProto.FeatureSet> featureSetProtos =
job.getFeatureSets().stream()
.map(FeatureSet::toProto)
.collect(Collectors.toList());
ImportOptions pipelineOptions = getPipelineOptions(featureSetProtos, job.getStore().toProto());
job.getFeatureSets().stream().map(FeatureSet::toProto).collect(Collectors.toList());
ImportOptions pipelineOptions =
getPipelineOptions(featureSetProtos, job.getStore().toProto());
PipelineResult pipelineResult = runPipeline(pipelineOptions);
DirectJob directJob = new DirectJob(job.getId(), pipelineResult);
jobs.add(directJob);
Expand Down
10 changes: 2 additions & 8 deletions core/src/main/java/feast/core/model/FeatureSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,9 @@ public class FeatureSet extends AbstractTimestampEntity implements Comparable<Fe
@Column(name = "max_age")
private long maxAgeSeconds;


// Entity fields inside this feature set
@ElementCollection(fetch = FetchType.EAGER)
@CollectionTable(
name = "entities",
joinColumns = @JoinColumn(name = "feature_set_id")
)
@CollectionTable(name = "entities", joinColumns = @JoinColumn(name = "feature_set_id"))
@Fetch(FetchMode.SUBSELECT)
private Set<Field> entities;

Expand All @@ -91,9 +87,7 @@ public class FeatureSet extends AbstractTimestampEntity implements Comparable<Fe
@CollectionTable(
name = "features",
joinColumns = @JoinColumn(name = "feature_set_id"),
uniqueConstraints =
@UniqueConstraint(columnNames = {"name", "project", "version"})
)
uniqueConstraints = @UniqueConstraint(columnNames = {"name", "project", "version"}))
@Fetch(FetchMode.SUBSELECT)
private Set<Field> features;

Expand Down
3 changes: 1 addition & 2 deletions core/src/main/java/feast/core/model/Field.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ public class Field {
@Column(name = "project")
private String project;

public Field() {
}
public Field() {}

public Field(String name, ValueType.Enum type) {
this.name = name;
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/java/feast/core/model/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public class Job extends AbstractTimestampEntity {
private Store store;

// FeatureSets populated by the job
@ManyToMany
private List<FeatureSet> featureSets;
@ManyToMany private List<FeatureSet> featureSets;

// Job Metrics
@OneToMany(mappedBy = "job", cascade = CascadeType.ALL)
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/feast/core/model/Project.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ public class Project {
@Column(name = "archived", nullable = false)
private boolean archived;

@OneToMany(cascade = CascadeType.ALL, fetch = FetchType.EAGER, orphanRemoval = true, mappedBy = "project")
@OneToMany(
cascade = CascadeType.ALL,
fetch = FetchType.EAGER,
orphanRemoval = true,
mappedBy = "project")
private Set<FeatureSet> featureSets;

public Project() {
Expand Down
13 changes: 9 additions & 4 deletions core/src/main/java/feast/core/model/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ public List<Subscription> getSubscriptions() {
}

private static String convertSubscriptionToString(Subscription sub) {
if(sub.getVersion().isEmpty() || sub.getName().isEmpty() || sub.getProject().isEmpty()){
throw new IllegalArgumentException(String.format("Missing arguments in subscription string: %s", sub.toString()));
if (sub.getVersion().isEmpty() || sub.getName().isEmpty() || sub.getProject().isEmpty()) {
throw new IllegalArgumentException(
String.format("Missing arguments in subscription string: %s", sub.toString()));
}
return String.format("%s:%s:%s", sub.getProject(), sub.getName(), sub.getVersion());
}
Expand All @@ -129,6 +130,10 @@ private Subscription convertStringToSubscription(String sub) {
return Subscription.newBuilder().build();
}
String[] split = sub.split(":", 3);
return Subscription.newBuilder().setProject(split[0]).setName(split[1]).setVersion(split[2]).build();
return Subscription.newBuilder()
.setProject(split[0])
.setName(split[1])
.setVersion(split[2])
.build();
}
}
}
Loading

0 comments on commit f6c7ca0

Please sign in to comment.