Skip to content

Commit

Permalink
Enrich ApiJobStore to support filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
Venisa Correia authored and Andrew Cholewa committed Sep 8, 2016
1 parent 873553f commit ec2c7d0
Show file tree
Hide file tree
Showing 8 changed files with 493 additions and 1 deletion.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ Current

### Added:

- [Enrich the ApiJobStore interface] (https://github.com/yahoo/fili/pull/23)
* `ApiJobStore` Interface now supports filtering `JobRows` in the store
* Added support for filtering JobRows in `HashJobStore`
* Added `JobRowFilter` to hold filter information

- [Lookup Dimension Serializer]()
* Created `LookupDimensionToDimensionSpec` serializer for `LookupDimension`
* Created corresponding tests for `LookupDimensionToDimensionSpec` in `LookupDimensionToDimensionSpecSpec`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import rx.Observable;

import java.util.Set;

/**
* An ApiJobStore is responsible for storing the metadata about Bard jobs. Conceptually, the ApiJobStore is a table
* where each row is the metadata of a particular job, and the columns are the metadata stored with each job
Expand Down Expand Up @@ -51,4 +53,21 @@ public interface ApiJobStore {
* @return An Observable that emits a stream of all the JobRows in the store
*/
Observable<JobRow> getAllRows();

/**
* This method takes a Set of JobRowFilters, ANDS them by default, and returns a cold observable that emits a
* stream of JobRows which satisfy the given filter.
* <p>
* Any of the fields may be not filterable for every implementation of the {@code ApiJobStore} as the efficiency of
* filtering is dependent on the backing store. An IllegalArgumentException is thrown if filtering on any given
* field is not supported.
*
* @param jobRowFilters A Set of JobRowFilters where each JobRowFilter contains the JobField to be
* filtered on, the filter operation and the values to be compared to.
*
* @return An Observable that emits a stream of JobRows that satisfy the given filters
*
* @throws IllegalArgumentException if filtering on any field is not supported
*/
Observable<JobRow> getFilteredRows(Set<JobRowFilter> jobRowFilters) throws IllegalArgumentException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,21 @@
// Licensed under the terms of the Apache license. Please see LICENSE file distributed with this work for terms.
package com.yahoo.bard.webservice.async.jobs.stores;

import static com.yahoo.bard.webservice.web.ErrorMessageFormat.FILTER_OPERATOR_INVALID;
import static com.yahoo.bard.webservice.web.ErrorMessageFormat.FILTER_JOBFIELD_UNDEFINED;

import com.yahoo.bard.webservice.async.jobs.jobrows.JobField;
import com.yahoo.bard.webservice.async.jobs.jobrows.JobRow;
import com.yahoo.bard.webservice.web.FilterOperation;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.Observable;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;

/**
* An ApiJobStore backed by an in-memory map. This is meant as a stub implementation for
Expand All @@ -17,6 +26,8 @@
*/
public class HashJobStore implements ApiJobStore {

private static final Logger LOG = LoggerFactory.getLogger(HashJobStore.class);

private final Map<String, JobRow> store;

/**
Expand Down Expand Up @@ -51,4 +62,66 @@ public Observable<JobRow> save(JobRow metadata) {
public Observable<JobRow> getAllRows() {
return Observable.from(store.values());
}

@Override
public Observable<JobRow> getFilteredRows(Set<JobRowFilter> jobRowFilters) throws IllegalArgumentException {
return getAllRows().filter(jobRow -> satisfiesFilters(jobRowFilters, jobRow));
}

/**
* This method checks if the given JobRow satisfies all the JobRowFilters and returns true if it does.
* If a JobField in any of the filters is not a part of the JobRow, this method throws an IllegalArgumentException.
*
* @param jobRowFilters A Set of JobRowFilters specifying the different conditions to be satisfied
* @param jobRow The JobRow which needs to be inspected
*
* @return true if the JobRow satisfies all the filters, false otherwise
*
* @throws IllegalArgumentException if a JobField in any of the filters is not a part the JobRow
*/
private boolean satisfiesFilters(Set<JobRowFilter> jobRowFilters, JobRow jobRow) throws IllegalArgumentException {
return jobRowFilters.stream().allMatch(filter -> satisfiesFilter(jobRow, filter));
}

/**
* This method checks if the given JobRow satisfies the given JobRowFilter and returns true if it does.
* If a JobField in the filter is not a part the JobRow, this method throws an IllegalArgumentException.
*
* @param jobRow The JobRow which needs to be inspected
* @param jobRowFilter A JobRowFilter specifying the filter condition
*
* @return true if the JobRow satisfies the filter, false otherwise
*
* @throws IllegalArgumentException if a JobField in the filter is not a part the JobRow
*/
private boolean satisfiesFilter(JobRow jobRow, JobRowFilter jobRowFilter) throws IllegalArgumentException {
JobField filterJobField = jobRowFilter.getJobField();
FilterOperation filterOperation = jobRowFilter.getOperation();
Set<String> filterValues = jobRowFilter.getValues();

if (!jobRow.containsKey(filterJobField)) {
Set<JobField> actualJobFields = jobRow.keySet();
LOG.debug(FILTER_JOBFIELD_UNDEFINED.logFormat(filterJobField, actualJobFields));
throw new IllegalArgumentException(
FILTER_JOBFIELD_UNDEFINED.format(filterJobField, actualJobFields)
);
}

String actualValue = jobRow.get(filterJobField);

switch (filterOperation) {
case notin:
return !filterValues.contains(actualValue);
case startswith:
return filterValues.stream().anyMatch(actualValue::startsWith);
case contains :
return filterValues.stream().anyMatch(actualValue::contains);
case in: // the fall-through is intentional because in is a synonym for eq
case eq:
return filterValues.contains(actualValue);
default:
LOG.debug(FILTER_OPERATOR_INVALID.logFormat(filterOperation));
throw new IllegalArgumentException(FILTER_OPERATOR_INVALID.format(filterOperation));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Copyright 2016 Yahoo Inc.
// Licensed under the terms of the Apache license. Please see LICENSE file distributed with this work for terms.
package com.yahoo.bard.webservice.async.jobs.stores;

import static com.yahoo.bard.webservice.web.ErrorMessageFormat.FILTER_ERROR;
import static com.yahoo.bard.webservice.web.ErrorMessageFormat.FILTER_INVALID;
import static com.yahoo.bard.webservice.web.ErrorMessageFormat.FILTER_JOBFIELD_UNDEFINED;
import static com.yahoo.bard.webservice.web.ErrorMessageFormat.FILTER_OPERATOR_INVALID;

import com.yahoo.bard.webservice.async.jobs.jobrows.DefaultJobField;
import com.yahoo.bard.webservice.async.jobs.jobrows.JobField;
import com.yahoo.bard.webservice.util.FilterTokenizer;
import com.yahoo.bard.webservice.web.BadFilterException;
import com.yahoo.bard.webservice.web.FilterOperation;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.validation.constraints.NotNull;

/**
* Class containing filter information to filter JobRows in ApiJobStore.
*/
public class JobRowFilter {
private static final Logger LOG = LoggerFactory.getLogger(JobRowFilter.class);

private final JobField jobField;
private final FilterOperation operation;
private final Set<String> values;

/* url filter query pattern: (JobField name)-(operation)[(value or comma separated numeric values)]?
*
* e.g. userId-eq[Foo]
*
* JobField name: userId
* operation: eq
* values: Foo
*/
private static final Pattern QUERY_PATTERN = Pattern.compile("([^\\|]+)-([^\\[]+)\\[([^\\]]+)\\]?");

/**
* Parses the URL ApiJobStore filter query and generates the JobRowFilter object.
*
* @param filterQuery Expects a URL ApiJobStore filter query String in the format:
* <p>
* <code>(JobField name)-(operation)[?(value or comma separated values)]?</code>
*
* @throws BadFilterException when filter pattern is not matched or when any of its properties are not
* valid.
*/
public JobRowFilter(@NotNull String filterQuery) throws BadFilterException {
LOG.trace("filterQuery: {}", filterQuery);

Matcher tokenizedQuery = QUERY_PATTERN.matcher(filterQuery);

// if pattern match found, extract values else throw exception
if (!tokenizedQuery.matches()) {
LOG.debug(FILTER_INVALID.logFormat(filterQuery));
throw new BadFilterException(FILTER_INVALID.format(filterQuery));
}

jobField = extractJobField(tokenizedQuery);
operation = extractOperation(tokenizedQuery);
values = extractValues(tokenizedQuery, filterQuery);
}

/**
* Constructor for an JobRowFilter object whose data has already been parsed.
*
* @param jobField The JobField to perform the filtering on
* @param operation The operation to perform (eg: eq)
* @param values A Set of Strings to compare the JobField's value to.
*/
private JobRowFilter(JobField jobField, FilterOperation operation, Set<String> values) {
this.jobField = jobField;
this.operation = operation;
this.values = values;
}

public JobField getJobField() {
return jobField;
}

public FilterOperation getOperation() {
return operation;
}

public Set<String> getValues() {
return values;
}

/**
* Construct an JobRowFilter object using the same FilterOperation and values as the object on
* which this method is called and using the supplied JobField.
*
* @param jobField The JobField to perform the filtering on
*
* @return An instance of JobRowFilter created using the supplied JobField
*/
public JobRowFilter withJobField(JobField jobField) {
return new JobRowFilter(jobField, operation, values);
}

/**
* Construct an JobRowFilter object using the same JobField and values as the object on
* which this method is called and using the supplied FilterOperation.
*
* @param operation The operation to perform (eg: eq)
*
* @return An instance of JobRowFilter created using the supplied FilterOperation
*/
public JobRowFilter withOperation(FilterOperation operation) {
return new JobRowFilter(jobField, operation, values);
}

/**
* Construct an JobRowFilter object using the same JobField and FilterOperation as the object on
* which this method is called and using the supplied values.
*
* @param values A Set of Strings to compare the JobField's value to
*
* @return An instance of JobRowFilter created using the supplied values
*/
public JobRowFilter withValues(Set<String> values) {
return new JobRowFilter(jobField, operation, values);
}

/**
* Extracts the JobField to be examined from the tokenizedQuery.
*
* @param tokenizedQuery The tokenized filter expression.
*
* @return The JobField to be examined
* @throws BadFilterException is the JobField does not exist
*/
private JobField extractJobField(Matcher tokenizedQuery) throws BadFilterException {
String fieldName = tokenizedQuery.group(1);

return Arrays.stream(DefaultJobField.values())
.filter(field -> field.getName().equals(fieldName))
.findFirst()
.orElseThrow(() -> {
LOG.debug(FILTER_JOBFIELD_UNDEFINED.logFormat(fieldName, DefaultJobField.values()));
return new BadFilterException(FILTER_JOBFIELD_UNDEFINED.format(
fieldName, DefaultJobField.values()
));
});
}

/**
* Extracts the operation to be performed by the ApiJobStore filter query.
*
* @param tokenizedQuery The tokenized filter expression.
*
* @return The operation to be performed by the ApiJobStore filter query.
* @throws BadFilterException if the operation does not exist
*/
private FilterOperation extractOperation(Matcher tokenizedQuery) throws BadFilterException {
String operationName = tokenizedQuery.group(2);
try {
return FilterOperation.valueOf(operationName);
} catch (IllegalArgumentException ignored) {
LOG.debug(FILTER_OPERATOR_INVALID.logFormat(operationName));
throw new BadFilterException(FILTER_OPERATOR_INVALID.format(operationName));
}
}

/**
* Extracts the values to be used in the JobRowFilter query from the query.
*
* @param tokenizedQuery The tokenized filter expression..
* @param filterQuery The raw query. Used for logging.
*
* @return The set of values to be used in the JobRowFilter query.
* @throws BadFilterException If the fragment of the query that specifies the values is malformed.
*/
private Set<String> extractValues(Matcher tokenizedQuery, String filterQuery) throws BadFilterException {
try {
// replaceAll takes care of any leading ['s or trailing ]'s which might mess up the values set.
return new LinkedHashSet<>(
FilterTokenizer.split(
tokenizedQuery.group(3)
.replaceAll("\\[", "")
.replaceAll("\\]", "")
.trim()
)
);
} catch (IllegalArgumentException e) {
LOG.debug(FILTER_ERROR.logFormat(filterQuery, e.getMessage()), e);
throw new BadFilterException(FILTER_ERROR.format(filterQuery, e.getMessage()), e);
}
}

@Override
public boolean equals(final Object o) {
if (this == o) { return true; }
if (!(o instanceof JobRowFilter)) { return false; }

JobRowFilter jobRowFilter = (JobRowFilter) o;

return
Objects.equals(jobField, jobRowFilter.jobField) &&
Objects.equals(operation, jobRowFilter.operation) &&
Objects.equals(values, jobRowFilter.values);
}

@Override
public int hashCode() {
return Objects.hash(jobField, operation, values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import rx.Observable;

import java.util.Set;

import javax.inject.Singleton;

/**
Expand All @@ -30,4 +32,17 @@ public Observable<JobRow> save(JobRow metadata) {
public Observable<JobRow> getAllRows() {
return Observable.empty();
}

/**
* This method ignores the filters and returns an empty Observable.
*
* @param jobRowFilter A Set of JobRowFilters where each JobRowFilter contains the JobField to be
* filtered on, the filter operation and the values to be compared to.
*
* @return An empty Observable.
*/
@Override
public Observable<JobRow> getFilteredRows(Set<JobRowFilter> jobRowFilter) {
return Observable.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ public enum ErrorMessageFormat implements MessageFormatter {

MISSING_JOB_ID("Bard experienced an internal error. Sorry.", "Missing id for job row %s"),

INVALID_ASYNC_AFTER( "Invalid 'asyncAfter' parameter: '%s'. 'asyncAfter' must be either 'never' or an integer number of milliseconds.");
INVALID_ASYNC_AFTER( "Invalid 'asyncAfter' parameter: '%s'. 'asyncAfter' must be either 'never' or an integer number of milliseconds."),

FILTER_JOBFIELD_UNDEFINED("Filter field '%s' does not exist. The possible fields to filter on are '%s'"),
;
// CHECKSTYLE:ON

private final String messageFormat;
Expand Down
Loading

0 comments on commit ec2c7d0

Please sign in to comment.