Skip to content

Commit

Permalink
Modified according to will and rick's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
garyluoex committed Mar 24, 2017
1 parent d3bff62 commit a36562f
Show file tree
Hide file tree
Showing 21 changed files with 147 additions and 170 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Current
### Added:
- [CompositePhsyicalTable Core Components Refactor](https://github.com/yahoo/fili/pull/179)
* Added `ConcretePhysicalTable` and `ConcreteAvailability` to model table in druid datasource and its availabillity in the new table availability structure
* Added class variable for `DataSourceMetadataService` and `ConfigurationLoader` for easy access
* Added class variable for `DataSourceMetadataService` and `ConfigurationLoader` into `AbstractBinderFactory` for application to access

- [Refactor DatasourceMetaDataService to fit composite table needs](https://github.com/yahoo/fili/pull/173)
* `DataSourceMetadataService` also stores interval data from segment data as intervals by column name map and provides method `getAvailableIntervalsByTable` to retrieve it
Expand Down Expand Up @@ -147,7 +147,6 @@ Current
* Removed `permissive_column_availability_enabled` feature flag support and corresponding functionality in `PartialDataHandler`, permissive availability will be a table configuration
* Removed `getIntersectSubintervalsForColumns` and `getUnionSubintervalsForColumns` from `PartialDataHandler` since the logic is pushed into `Availability` now
* Removed `getIntervalsByColumnName`, `resetColumns` and `hasLogicalMapping` methods in `PhysicalTable` since no longer needed with the availability structure
* Removed already deprecated `buildTableGroup` method in `BaseTableLoader`
* Removed `getAvailability` method on `PartialDataHandler` since the logic is pushed into `Availability`
* Removed `SegmentMetadataLoader` and corresponding tests class which is deprecated in both fili and the corresponding endpoint in druid, use `DataSourceMetadataLoader` instead
* Removed `SegmentMetadataLoaderHealthCheck` and `SegmentMetadataLoaderHealthCheckSpec` classes since `SegmentMetadataLoader` is not longer available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ public abstract class AbstractBinderFactory implements BinderFactory {
SYSTEM_CONFIG.getPackageVariableName("loader_scheduler_thread_pool_size"),
LOADER_SCHEDULER_THREAD_POOL_SIZE_DEFAULT
);

public static final String DEPRECATED_PERMISSIVE_AVAILABILITY_FLAG = SYSTEM_CONFIG.getPackageVariableName(
"permissive_column_availability_enabled");

public static final String SYSTEM_CONFIG_TIMEZONE_KEY = "timezone";

private ObjectMappersSuite objectMappers;
Expand All @@ -174,7 +178,6 @@ public abstract class AbstractBinderFactory implements BinderFactory {

private final TaskScheduler loaderScheduler = new TaskScheduler(LOADER_SCHEDULER_THREAD_POOL_SIZE);


/**
* Constructor.
*/
Expand Down Expand Up @@ -226,7 +229,7 @@ protected void configure() {
FieldConverterSupplier.sketchConverter = initializeSketchConverter();

//Initialize the metrics filter helper
FieldConverterSupplier.metricsFilterSetBuilder = initializeMetricsFilterSetBuilder();
FieldConverterSupplier.metricsFilterSetBuilder = initializeMetricsFilterSetBuilder();

// Build the datasource metadata service containing the data segments
bind(getDataSourceMetadataService()).to(DataSourceMetadataService.class);
Expand All @@ -245,23 +248,28 @@ protected void configure() {
// Bind the request mappers
Map<String, RequestMapper> requestMappers = getRequestMappers(loader.getDictionaries());
bind(requestMappers.getOrDefault(DimensionsApiRequest.REQUEST_MAPPER_NAMESPACE,
new DimensionApiRequestMapper(loader.getDictionaries())))
.named(DimensionsApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);
new DimensionApiRequestMapper(loader.getDictionaries())
)).named(DimensionsApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);

bind(requestMappers.getOrDefault(MetricsApiRequest.REQUEST_MAPPER_NAMESPACE,
new NoOpRequestMapper(loader.getDictionaries())))
.named(MetricsApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);
new NoOpRequestMapper(loader.getDictionaries())
)).named(MetricsApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);

bind(requestMappers.getOrDefault(SlicesApiRequest.REQUEST_MAPPER_NAMESPACE,
new NoOpRequestMapper(loader.getDictionaries())))
.named(SlicesApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);
new NoOpRequestMapper(loader.getDictionaries())
)).named(SlicesApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);

bind(requestMappers.getOrDefault(TablesApiRequest.REQUEST_MAPPER_NAMESPACE,
new NoOpRequestMapper(loader.getDictionaries())))
.named(TablesApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);
new NoOpRequestMapper(loader.getDictionaries())
)).named(TablesApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);

bind(requestMappers.getOrDefault(DataApiRequest.REQUEST_MAPPER_NAMESPACE,
new NoOpRequestMapper(loader.getDictionaries())))
.named(DataApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);
new NoOpRequestMapper(loader.getDictionaries())
)).named(DataApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);

bind(requestMappers.getOrDefault(JobsApiRequest.REQUEST_MAPPER_NAMESPACE,
new NoOpRequestMapper(loader.getDictionaries())))
.named(JobsApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);
new NoOpRequestMapper(loader.getDictionaries())
)).named(JobsApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);

// Setup end points and back end services
setupHealthChecks(healthCheckRegistry, loader.getDimensionDictionary());
Expand Down Expand Up @@ -293,6 +301,7 @@ protected void configure() {

setupDataSourceMetaData(healthCheckRegistry, dataSourceMetadataLoader);
}

bind(querySigningService).to(QuerySigningService.class);

bind(buildJobRowBuilder()).to(JobRowBuilder.class);
Expand All @@ -317,6 +326,11 @@ protected void configure() {
);
setupDruidDimensionsLoader(healthCheckRegistry, druidDimensionsLoader);
}
if (SYSTEM_CONFIG.getBooleanProperty(DEPRECATED_PERMISSIVE_AVAILABILITY_FLAG, false)) {
LOG.warn(
"Permissive column availability feature flag is no longer supported, please use " +
"PermissiveConcretePhysicalTable to enable permissive column availability.");
}
// Call post-binding hook to allow for additional binding
afterBinding(this);
}
Expand Down Expand Up @@ -822,7 +836,7 @@ protected DimensionLoader getDimensionLoader() {
/**
* The Builder to be used to serialize a JobRow into the the job to be returned to the user.
*
* @return A DefaultJobPayloadBuilder
* @return A DefaultJobPayloadBuilder
*/
protected JobPayloadBuilder buildJobPayloadBuilder() {
return new DefaultJobPayloadBuilder();
Expand Down Expand Up @@ -995,13 +1009,13 @@ protected void afterBinding(AbstractBinder abstractBinder) {
protected void scheduleLoader(Loader<?> loader) {
loader.setFuture(
loader.isPeriodic ?
loaderScheduler.scheduleAtFixedRate(
loader,
loader.getDefinedDelay(),
loader.getDefinedPeriod(),
TimeUnit.MILLISECONDS
) :
loaderScheduler.schedule(loader, loader.getDefinedDelay(), TimeUnit.MILLISECONDS)
loaderScheduler.scheduleAtFixedRate(
loader,
loader.getDefinedDelay(),
loader.getDefinedPeriod(),
TimeUnit.MILLISECONDS
) :
loaderScheduler.schedule(loader, loader.getDefinedDelay(), TimeUnit.MILLISECONDS)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,21 @@ public class PartialDataHandler {
* present for a given combination of request metrics and dimensions (pulled from the API request and generated
* druid query) at the specified granularity.
*
* @param constraints Constraint containing all the column names the request depends on
* @param constraint Constraint containing all the column names the request depends on
* @param physicalTables the tables whose column availabilities are checked
* @param requestedIntervals The intervals that may not be fully satisfied
* @param granularity The granularity at which to find missing intervals
*
* @return subintervals of the requested intervals with incomplete data
*/
public SimplifiedIntervalList findMissingTimeGrainIntervals(
DataSourceConstraint constraints,
DataSourceConstraint constraint,
Set<PhysicalTable> physicalTables,
@NotNull SimplifiedIntervalList requestedIntervals,
Granularity granularity
) {
SimplifiedIntervalList availableIntervals = physicalTables.stream()
.map(table -> table.getAvailableIntervals(constraints))
.map(table -> table.getAvailableIntervals(constraint))
.flatMap(SimplifiedIntervalList::stream)
.collect(SimplifiedIntervalList.getCollector());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.yahoo.bard.webservice.table.LogicalTable;
import com.yahoo.bard.webservice.table.LogicalTableDictionary;
import com.yahoo.bard.webservice.table.PhysicalTable;
import com.yahoo.bard.webservice.table.Schema;
import com.yahoo.bard.webservice.table.TableGroup;
import com.yahoo.bard.webservice.table.TableIdentifier;

Expand Down Expand Up @@ -84,7 +83,7 @@ protected BaseTableLoader(DateTimeZone defaultTimeZone, DataSourceMetadataServic
*
* @return A table group binding all the tables for a logical table view together.
*/
public TableGroup buildTableGroup(
public TableGroup buildDimensionSpanningTableGroup(
Set<ApiMetricName> apiMetrics,
Set<FieldName> druidMetrics,
Set<PhysicalTableDefinition> tableDefinitions,
Expand All @@ -93,18 +92,15 @@ public TableGroup buildTableGroup(
// Load a physical table for each of the table definitions
LinkedHashSet<PhysicalTable> physicalTables = new LinkedHashSet<>();
for (PhysicalTableDefinition def : tableDefinitions) {
PhysicalTable table;
table = loadPhysicalTable(def, druidMetrics, dictionaries);
PhysicalTable table = loadPhysicalTable(def, druidMetrics, dictionaries);
physicalTables.add(table);
}

//Derive the logical dimensions by taking the union of all the physical dimensions
Set<Dimension> dimensions = physicalTables.stream()
.map(PhysicalTable::getSchema)
.map(Schema::getColumns)
.map(schema -> schema.getColumns(DimensionColumn.class))
.flatMap(Set::stream)
.filter(column -> column instanceof DimensionColumn)
.map(column -> (DimensionColumn) column)
.map(DimensionColumn::getDimension)
.collect(Collectors.toCollection(LinkedHashSet::new));
return new TableGroup(physicalTables, apiMetrics, dimensions);
Expand Down Expand Up @@ -237,6 +233,32 @@ public LogicalTable buildLogicalTable(
);
}

/**
* Builds a table group.
* <p>
* Builds and loads the physical tables for the physical table definitions as well.
*
* @param logicalTableName The logical table for the table group
* @param apiMetrics The set of metric names surfaced to the api
* @param druidMetrics Names of druid datasource metric columns
* @param tableDefinitions A list of config objects for physical tables
* @param dictionaries The container for all the data dictionaries
*
* @return A table group binding all the tables for a logical table view together.
*
* @deprecated logicalTableName is not used in TableGroup, use buildDimensionSpanningTableGroup instead
*/
@Deprecated
public TableGroup buildTableGroup(
String logicalTableName,
Set<ApiMetricName> apiMetrics,
Set<FieldName> druidMetrics,
Set<PhysicalTableDefinition> tableDefinitions,
ResourceDictionaries dictionaries
) {
return buildDimensionSpanningTableGroup(apiMetrics, druidMetrics, tableDefinitions, dictionaries);
}

/**
* Load a new physical table into the dictionary and return the loaded physical table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public Map<Column, List<Interval>> getAllAvailableIntervals() {
}

@Override
public SimplifiedIntervalList getAvailableIntervals(DataSourceConstraint constraints) {
return getAvailability().getAvailableIntervals(constraints);
public SimplifiedIntervalList getAvailableIntervals(DataSourceConstraint constraint) {
return getAvailability().getAvailableIntervals(constraint);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ public interface PhysicalTable extends Table {
/**
* Get available intervals satisfying the given constraints.
*
* @param constraints Data constraint containing columns and api filters
* @param constraint Data constraint containing columns and api filters
*
* @return tableEntries a simplified interval list of available interval
*/
SimplifiedIntervalList getAvailableIntervals(DataSourceConstraint constraints);
SimplifiedIntervalList getAvailableIntervals(DataSourceConstraint constraint);

/**
* Get the columns from the schema for this physical table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public interface Availability {
/**
* Fetch a set of intervals given a set of column name in DataSourceConstraint.
*
* @param constraints Data constraint containing columns and api filters
* @param constraint Data constraint containing columns and api filters
*
* @return A simplified list of intervals associated with all column in constraint, empty if column is missing
*/
SimplifiedIntervalList getAvailableIntervals(DataSourceConstraint constraints);
SimplifiedIntervalList getAvailableIntervals(DataSourceConstraint constraint);
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ public Map<Column, List<Interval>> getAllAvailableIntervals() {
}

@Override
public SimplifiedIntervalList getAvailableIntervals(DataSourceConstraint constraints) {
public SimplifiedIntervalList getAvailableIntervals(DataSourceConstraint constraint) {

Set<String> requestColumns = constraints.getAllColumnNames().stream()
Set<String> requestColumns = constraint.getAllColumnNames().stream()
.filter(cachedColumnNames::contains)
.collect(Collectors.toSet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ public class AggregatableDimensionsMatcher implements PhysicalTableMatcher {

public static final ErrorMessageFormat MESSAGE_FORMAT = NO_TABLE_FOR_NON_AGGREGATABLE;

private final QueryPlanningConstraint requestConstraints;
private final QueryPlanningConstraint requestConstraint;

/**
* Constructor saves metrics, dimensions, coarsest time grain, and logical table name (for logging).
*
* @param requestConstraints Contains the request constraints extracted from DataApiRequest and TemplateDruidQuery
* @param requestConstraint Contains the request constraints extracted from DataApiRequest and TemplateDruidQuery
*/
public AggregatableDimensionsMatcher(QueryPlanningConstraint requestConstraints) {
this.requestConstraints = requestConstraints;
public AggregatableDimensionsMatcher(QueryPlanningConstraint requestConstraint) {
this.requestConstraint = requestConstraint;
}

@Override
public boolean test(PhysicalTable table) {
Set<String> columnNames = requestConstraints.getAllColumnNames();
Set<String> columnNames = requestConstraint.getAllColumnNames();

// If table contains non-agg dimensions, query must contain all these non-agg dimensions to use this table.
return table.getDimensions().stream()
Expand All @@ -50,12 +50,12 @@ public boolean test(PhysicalTable table) {

@Override
public NoMatchFoundException noneFoundException() {
Set<String> aggDimensions = requestConstraints.getRequestDimensions().stream()
Set<String> aggDimensions = requestConstraint.getRequestDimensions().stream()
.filter(Dimension::isAggregatable)
.map(Dimension::getApiName)
.collect(Collectors.toSet());

Set<String> nonAggDimensions = requestConstraints.getRequestDimensions().stream()
Set<String> nonAggDimensions = requestConstraint.getRequestDimensions().stream()
.filter(StreamUtils.not(Dimension::isAggregatable))
.map(Dimension::getApiName)
.collect(Collectors.toSet());
Expand Down
Loading

0 comments on commit a36562f

Please sign in to comment.