Skip to content

Commit

Permalink
Refactor DatasourceMetaDataService to fit composite table needs
Browse files Browse the repository at this point in the history
  • Loading branch information
garyluoex authored and michael-mclawhorn committed Mar 16, 2017
1 parent 741d316 commit a5f572e
Show file tree
Hide file tree
Showing 13 changed files with 259 additions and 61 deletions.
14 changes: 11 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ pull request if there was one.

Current
-------

### Added:
- [Refactor DatasourceMetadataService to fit composite table needs](https://github.com/yahoo/fili/pull/173)
* `DataSourceMetadataService` also stores interval data from segment data as intervals by column name map and provides method `getAvailableIntervalsByTable` to retrieve it

- [QueryPlanningConstraint and DataSourceConstraint](https://github.com/yahoo/fili/pull/169)
* Added `QueryPlanningConstraint` to replace current interface of Matchers and Resolvers arguments during query planning
* Added `DataSourceConstraint` to allow implementation of `PartitionedFactTable`'s availability in the near future


- [Major refactor for availability and schemas and tables](https://github.com/yahoo/fili/pull/165)
* `ImmutableAvailability` - provides immutable, typed replacement for maps of column availabilities
* New Table Implementations:
Expand All @@ -23,11 +26,10 @@ Current
* `PhysicalTableSchema` has base plus `ZonedTimeGrain`, name mappings
* `LogicalTableSchema` base with builder from table group
* `ResultSetSchema` base with transforming with-ers

* `ApiName`, `TableName`: Added static factory from String to Name

* `ErrorMessageFormat` for errors during `ResultSetMapper` cycle


- [Added default base class for all dimension types](https://github.com/yahoo/fili/pull/177)
* Added base classes `DefaultKeyValueStoreDimensionConfig`, `DefaultLookupDimensionConfig` and `DefaultRegisteredLookupDimensionConfig`
to create default dimensions.
Expand All @@ -46,10 +48,15 @@ Current
- [Support timeouts for lucene search provider](https://github.com/yahoo/fili/pull/183)

### Changed:
- [Refactor DatasourceMetadataService to fit composite table needs](https://github.com/yahoo/fili/pull/173)
* `BasePhysicalTable` now stores table name as the `TableName` instead of `String`
* `SegmentInfo` now stores dimension and metrics from segment data for constructing column to available interval map

- [QueryPlanningConstraint and DataSourceConstraint](https://github.com/yahoo/fili/pull/169)
* `QueryPlanningConstraint` replaces current interface of Matchers and Resolvers `DataApiRequest` and `TemplateDruidQuery` arguments during query planning
* Modified `findMissingTimeGrainIntervals` method in `PartialDataHandler` to take a set of columns instead of `DataApiRequest` and `DruidAggregationQuery`


- [Major refactor for availability and schemas and tables](https://github.com/yahoo/fili/pull/165)
* `Schema` and `Table` became interfaces
* `Table` has-a `Schema`
Expand Down Expand Up @@ -80,6 +87,7 @@ Current
* TestDruidWebService assumes unknown query types behave like GroupBy, TimeSeries, and TopN
* ResultSetResponseProcessor delegates to DruidResponseProcessor to build expected query schema,
allowing subclasses to override and extend the schema behavior

- [Add dimension fields to fullView table format](https://github.com/yahoo/fili/pull/155)

- [Make healthcheck filter reject message nicer](https://github.com/yahoo/fili/pull/153)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ protected PhysicalTable buildPhysicalTable(
).collect(Collectors.toCollection(LinkedHashSet::new));

return new ConcretePhysicalTable(
definition.getName().asName(),
definition.getName(),
definition.getGrain(),
columns,
definition.getLogicalToPhysicalNames()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
// Copyright 2016 Yahoo Inc.
// Copyright 2017 Yahoo Inc.
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.metadata;

import com.yahoo.bard.webservice.data.config.names.TableName;
import com.yahoo.bard.webservice.table.PhysicalTable;
import com.yahoo.bard.webservice.util.SimplifiedIntervalList;

import com.google.common.collect.ImmutableMap;

import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.druid.timeline.DataSegment;

import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
Expand All @@ -24,12 +34,15 @@
*/
@Singleton
public class DataSourceMetadataService {
private static final Logger LOG = LoggerFactory.getLogger(DataSourceMetadataService.class);

/**
* The container that holds the segment metadata for every table. It should support concurrent access.
*/
private final Map<PhysicalTable, AtomicReference<ConcurrentSkipListMap<DateTime, Map<String, SegmentInfo>>>>
allSegments;
private final Map<TableName, AtomicReference<ConcurrentSkipListMap<DateTime, Map<String, SegmentInfo>>>>
allSegmentsByTime;
private final Map<TableName, AtomicReference<ImmutableMap<String, Set<Interval>>>>
allSegmentsByColumn;

/**
* The collector that accumulates partitions of a segment.
Expand All @@ -40,7 +53,46 @@ public class DataSourceMetadataService {
* Creates a service to store segment metadata.
*/
public DataSourceMetadataService() {
this.allSegments = new ConcurrentHashMap<>();
this.allSegmentsByTime = new ConcurrentHashMap<>();
this.allSegmentsByColumn = new ConcurrentHashMap<>();
}

/**
* Get all the segments associated with the given Set of physical tables.
*
* @param physicalTableNames A Set of physical TableNames used by the DruidQuery
*
* @return A set of segments associated with the given tables
*/
public Set<SortedMap<DateTime, Map<String, SegmentInfo>>> getTableSegments(Set<TableName> physicalTableNames) {
return physicalTableNames.stream()
.map(allSegmentsByTime::get)
.filter(Objects::nonNull)
.map(AtomicReference::get)
.collect(Collectors.toSet());
}

/**
* Get a set of intervals available for each column in the table.
*
* @param physicalTableName The table to get the column and availability for
*
* @return a map of column name to a set of avialable intervals
*/
public Map<String, Set<Interval>> getAvailableIntervalsByTable(TableName physicalTableName) {
if (!allSegmentsByColumn.containsKey(physicalTableName)) {
LOG.error(
"Trying to access {} physical table datasource that is not available in metadata service",
physicalTableName.asName()
);
throw new IllegalStateException(
String.format(
"Trying to access %s physical table datasource that is not available in metadata service",
physicalTableName.asName()
)
);
}
return allSegmentsByColumn.get(physicalTableName).get();
}

/**
Expand All @@ -53,24 +105,23 @@ public DataSourceMetadataService() {
public void update(PhysicalTable table, DataSourceMetadata metadata) {
// Group all the segments by the starting date of their interval.
// Accumulate all the partitions of a segment in a map indexed by their identifier.
ConcurrentSkipListMap<DateTime, Map<String, SegmentInfo>> current = metadata.getSegments().stream()
.collect(
Collectors.groupingBy(
seg -> seg.getInterval().getStart(),
ConcurrentSkipListMap::new,
Collectors.mapping(SegmentInfo::new, COLLECTOR)
)
);
ConcurrentSkipListMap<DateTime, Map<String, SegmentInfo>> currentByTime = groupSegmentByTime(metadata);

// Group segment interval by every column present in the segment
Map<String, Set<Interval>> currentByColumn = groupIntervalByColumn(metadata);

allSegments.computeIfAbsent(table, ignored -> new AtomicReference<>()).set(current);
allSegmentsByTime.computeIfAbsent(table.getTableName(), ignored -> new AtomicReference<>())
.set(currentByTime);
allSegmentsByColumn.computeIfAbsent(table.getTableName(), ignored -> new AtomicReference<>())
.set(ImmutableMap.copyOf(currentByColumn));
}

/**
* A collector to aggregate all partitions of a segment into a map.
*
* @return The collector
*/
private static Collector<SegmentInfo, ?, Map<String, SegmentInfo>> partitionsToMapCollector() {
protected static Collector<SegmentInfo, ?, Map<String, SegmentInfo>> partitionsToMapCollector() {
// Supplier: a linked hash map.
// Accumulator: adds a segment info to the result map keyed by its identifier
// Combiner: combine two partial result maps
Expand All @@ -85,17 +136,50 @@ public void update(PhysicalTable table, DataSourceMetadata metadata) {
}

/**
* Get all the segments associated with the given Set of physical tables.
* Group segment metadata by date time.
*
* @param physicalTables A Set of physicalTables used by the DruidQuery
* @param metadata Metadata containing the druid segments information
*
* @return A set of segments associated with the given tables
* @return map of column name to set of intervals
*/
public Set<SortedMap<DateTime, Map<String, SegmentInfo>>> getTableSegments(Set<PhysicalTable> physicalTables) {
return physicalTables.stream()
.map(allSegments::get)
.filter(Objects::nonNull)
.map(AtomicReference::get)
.collect(Collectors.toSet());
protected static ConcurrentSkipListMap<DateTime, Map<String, SegmentInfo>> groupSegmentByTime(
DataSourceMetadata metadata
) {
return metadata.getSegments().stream()
.collect(
Collectors.groupingBy(
seg -> seg.getInterval().getStart(),
ConcurrentSkipListMap::new,
Collectors.mapping(SegmentInfo::new, COLLECTOR)
)
);
}

/**
* Group interval in segment metadata by column.
*
* @param metadata Metadata containing the druid segments information
*
* @return map of data time to a map of segment id to segment info
*/
protected static Map<String, Set<Interval>> groupIntervalByColumn(DataSourceMetadata metadata) {
Map<String, Set<Interval>> currentByColumn = new LinkedHashMap<>();

// Accumulate all intervals by column name
for (DataSegment segment : metadata.getSegments()) {
SegmentInfo segmentInfo = new SegmentInfo(segment);
for (String column : segmentInfo.getColumnNames()) {
currentByColumn.computeIfAbsent(column, ignored -> new HashSet<>()).add(segmentInfo.getInterval());
}
}

// Simplify interval sets using SimplifiedIntervalList
return currentByColumn.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> new HashSet<>(new SimplifiedIntervalList(entry.getValue()))
)
);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016 Yahoo Inc.
// Copyright 2017 Yahoo Inc.
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.metadata;

Expand All @@ -10,14 +10,19 @@
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec;

import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* A class that holds the useful information of a partition of a druid segment in bard.
*/
public class SegmentInfo {
private final String dataSource;
private final Interval interval;
private final List<String> dimensions;
private final List<String> metrics;
private final String version;
private final NumberedShardSpec shardSpec;
private final long size;
Expand All @@ -31,6 +36,8 @@ public class SegmentInfo {
public SegmentInfo(DataSegment segment) {
this.dataSource = segment.getDataSource();
this.interval = segment.getInterval();
this.dimensions = segment.getDimensions();
this.metrics = segment.getMetrics();
this.version = segment.getVersion();
ShardSpec spec = segment.getShardSpec();
this.shardSpec = spec instanceof NumberedShardSpec ?
Expand Down Expand Up @@ -58,6 +65,36 @@ public Interval getInterval() {
return interval;
}

/**
* Getter for the dimensions in this segment.
*
* @return The list of dimension names
*/
public List<String> getDimensions() {
return dimensions;
}

/**
* Getter for the metrics in this segment.
*
* @return The list of metric names
*/
public List<String> getMetrics() {
return metrics;
}

/**
* Getter for list of dimension and metric names.
*
* @return The list of dimension and metric names
*/
public List<String> getColumnNames() {
return Stream.concat(
getDimensions().stream(),
getMetrics().stream()
).collect(Collectors.toList());
}

/**
* Getter for the version of this segment partition.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016 Yahoo Inc.
// Copyright 2017 Yahoo Inc.
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.metadata;

Expand Down Expand Up @@ -92,7 +92,11 @@ public Optional<Long> getSegmentSetId(DruidAggregationQuery<?> query) {

//get all table segments
Set<SortedMap<DateTime, Map<String, SegmentInfo>>> tableSegments =
dataSourceMetadataService.getTableSegments(physicalTables);
dataSourceMetadataService.getTableSegments(
physicalTables.stream()
.map(PhysicalTable::getTableName)
.collect(Collectors.toSet())
);

// Check if we have no tables with segments
if (tableSegments.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.table;

import com.yahoo.bard.webservice.data.config.names.TableName;
import com.yahoo.bard.webservice.data.dimension.DimensionDictionary;
import com.yahoo.bard.webservice.data.time.ZonedTimeGrain;
import com.yahoo.bard.webservice.metadata.SegmentMetadata;
Expand All @@ -23,7 +24,7 @@
public abstract class BasePhysicalTable implements PhysicalTable {
private static final Logger LOG = LoggerFactory.getLogger(BasePhysicalTable.class);

private final String name;
private final TableName name;
private final PhysicalTableSchema schema;
private volatile Availability availability;

Expand All @@ -37,7 +38,7 @@ public abstract class BasePhysicalTable implements PhysicalTable {
* @param availability The availability of columns in this table
*/
public BasePhysicalTable(
@NotNull String name,
@NotNull TableName name,
@NotNull ZonedTimeGrain timeGrain,
@NotNull Iterable<Column> columns,
@NotNull Map<String, String> logicalToPhysicalColumnNames,
Expand All @@ -63,10 +64,15 @@ public DateTime getTableAlignment() {
}

@Override
public String getName() {
public TableName getTableName() {
return name;
}

@Override
public String getName() {
return name.asName();
}

@Override
public PhysicalTableSchema getSchema() {
return schema;
Expand Down Expand Up @@ -94,7 +100,7 @@ public String getPhysicalColumnName(String logicalName) {
*/
public void resetColumns(SegmentMetadata segmentMetadata, DimensionDictionary dimensionDictionary) {
setAvailability(new ImmutableAvailability(
name,
name.asName(),
segmentMetadata.getDimensionIntervals(),
segmentMetadata.getMetricIntervals(),
dimensionDictionary
Expand Down
Loading

0 comments on commit a5f572e

Please sign in to comment.