Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ability to not rollup at index time, make pre aggregation an option #3020

Merged
merged 5 commits into from
Aug 2, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public class IncrementalIndexReadBenchmark
@Param({"basic"})
private String schema;

@Param({"true", "false"})
private boolean rollup;

private static final Logger log = new Logger(IncrementalIndexReadBenchmark.class);
private static final int RNG_SEED = 9999;
private IncrementalIndex incIndex;
Expand Down Expand Up @@ -124,6 +127,7 @@ private IncrementalIndex makeIncIndex()
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.withRollup(rollup)
.build(),
true,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public class IndexIngestionBenchmark
@Param({"basic"})
private String schema;

@Param({"true", "false"})
private boolean rollup;

private static final Logger log = new Logger(IndexIngestionBenchmark.class);
private static final int RNG_SEED = 9999;

Expand Down Expand Up @@ -107,6 +110,7 @@ private IncrementalIndex makeIncIndex()
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.withRollup(rollup)
.build(),
true,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public class IndexMergeBenchmark
@Param({"basic"})
private String schema;

@Param({"true", "false"})
private boolean rollup;

private static final Logger log = new Logger(IndexMergeBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMerger INDEX_MERGER;
Expand Down Expand Up @@ -155,6 +158,7 @@ private IncrementalIndex makeIncIndex()
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.withRollup(rollup)
.build(),
true,
false,
Expand All @@ -174,7 +178,7 @@ public void merge(Blackhole blackhole) throws Exception
log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory());
tmpFile.deleteOnExit();

File mergedFile = INDEX_MERGER.mergeQueryableIndex(indexesToMerge, schemaInfo.getAggsArray(), tmpFile, new IndexSpec());
File mergedFile = INDEX_MERGER.mergeQueryableIndex(indexesToMerge, rollup, schemaInfo.getAggsArray(), tmpFile, new IndexSpec());

blackhole.consume(mergedFile);

Expand All @@ -192,7 +196,7 @@ public void mergeV9(Blackhole blackhole) throws Exception
log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory());
tmpFile.deleteOnExit();

File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex(indexesToMerge, schemaInfo.getAggsArray(), tmpFile, new IndexSpec());
File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex(indexesToMerge, rollup, schemaInfo.getAggsArray(), tmpFile, new IndexSpec());

blackhole.consume(mergedFile);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public class IndexPersistBenchmark
@Param({"basic"})
private String schema;

@Param({"true", "false"})
private boolean rollup;

private static final Logger log = new Logger(IndexPersistBenchmark.class);
private static final int RNG_SEED = 9999;

Expand Down Expand Up @@ -156,6 +159,7 @@ private IncrementalIndex makeIncIndex()
.withQueryGranularity(QueryGranularities.NONE)
.withMetrics(schemaInfo.getAggsArray())
.withDimensionsSpec(new DimensionsSpec(null, null, null))
.withRollup(rollup)
.build(),
true,
false,
Expand Down
2 changes: 2 additions & 0 deletions docs/content/ingestion/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ This spec is used to generated segments with uniform intervals.
| type | string | The type of granularity spec. | no (default == 'uniform') |
| segmentGranularity | string | The granularity to create segments at. | no (default == 'DAY') |
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
| rollup | boolean | rollup or not | no (default == true) |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |

### Arbitrary Granularity Spec
Expand All @@ -196,6 +197,7 @@ This spec is used to generate segments with arbitrary intervals (it tries to cre
|-------|------|-------------|----------|
| type | string | The type of granularity spec. | no (default == 'uniform') |
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
| rollup | boolean | rollup or not | no (default == true) |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |

# IO Config
Expand Down
6 changes: 5 additions & 1 deletion docs/content/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,18 @@ Append tasks append a list of segments together into a single segment (one after

### Merge Task

Merge tasks merge a list of segments together. Any common timestamps are merged. The grammar is:
Merge tasks merge a list of segments together. Any common timestamps are merged.
If rollup is disabled as part of ingestion, common timestamps are not merged and rows are reorderded by their timestamp.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"reorderded" should be "reordered"


The grammar is:

```json
{
"type": "merge",
"id": <task_id>,
"dataSource": <task_datasource>,
"aggregations": <list of aggregators>,
"rollup": <whether or not to rollup data during a merge>,
"segments": <JSON list of DataSegment objects to merge>
}
```
Expand Down
6 changes: 6 additions & 0 deletions docs/content/querying/segmentmetadataquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Segment metadata queries return per-segment information about:
* Interval the segment covers
* Column type of all the columns in the segment
* Estimated total segment byte size in if it was stored in a flat format
* Is the segment rolled up
* Segment id

```json
Expand Down Expand Up @@ -143,6 +144,11 @@ null if the aggregators are unknown or unmergeable (if merging is enabled).

* The form of the result is a map of column name to aggregator.

#### rollup

* `rollup` in the result is true/false/null.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does null mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a case, if previous segment is rolled up, and the next segment use new spec which is not to rollup, then for segmentMetadataQuery should handle this case and return null if merge=true
Otherwise, rollup should be always true or false

* When merging is enabled, if some are rollup, others are not, result is null.

### lenientAggregatorMerge

Conflicts between aggregator metadata across segments can occur if some segments have unknown aggregators, or if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public boolean run()
new UniformGranularitySpec(
config.getGranularitySpec().getSegmentGranularity(),
config.getGranularitySpec().getQueryGranularity(),
config.getGranularitySpec().isRollup(),
intervals
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ private static IncrementalIndex makeIncrementalIndex(
.withDimensionsSpec(config.getSchema().getDataSchema().getParser())
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs)
.withRollup(config.getSchema().getDataSchema().getGranularitySpec().isRollup())
.build();

OnheapIncrementalIndex newIndex = new OnheapIncrementalIndex(
Expand Down Expand Up @@ -514,13 +515,14 @@ protected File mergeQueryableIndex(
ProgressIndicator progressIndicator
) throws IOException
{
boolean rollup = config.getSchema().getDataSchema().getGranularitySpec().isRollup();
if (config.isBuildV9Directly()) {
return HadoopDruidIndexerConfig.INDEX_MERGER_V9.mergeQueryableIndex(
indexes, aggs, file, config.getIndexSpec(), progressIndicator
indexes, rollup, aggs, file, config.getIndexSpec(), progressIndicator
);
} else {
return HadoopDruidIndexerConfig.INDEX_MERGER.mergeQueryableIndex(
indexes, aggs, file, config.getIndexSpec(), progressIndicator
indexes, rollup, aggs, file, config.getIndexSpec(), progressIndicator
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOExce
new UniformGranularitySpec(
segmentGranularity,
config.getGranularitySpec().getQueryGranularity(),
config.getGranularitySpec().isRollup(),
Lists.newArrayList(bucketsToRun)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public void finishJob()
}

fileToUpload = new File(tmpSegmentDir, "merged");
theIndexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload, config.getIndexSpec());
theIndexMerger.mergeQueryableIndex(indexes, schema.getGranularitySpec().isRollup(), schema.getAggregators(), fileToUpload, config.getIndexSpec());
}

// Map merged segment so we can extract dimensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class MergeTask extends MergeTaskBase
{
@JsonIgnore
private final List<AggregatorFactory> aggregators;
private final Boolean rollup;
private final IndexSpec indexSpec;

@JsonCreator
Expand All @@ -52,12 +53,14 @@ public MergeTask(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("aggregations") List<AggregatorFactory> aggregators,
@JsonProperty("rollup") Boolean rollup,
@JsonProperty("indexSpec") IndexSpec indexSpec,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation for the mergeTask needs to be updated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already updated in tasks.md

@JsonProperty("context") Map<String, Object> context
)
{
super(id, dataSource, segments, context);
this.aggregators = Preconditions.checkNotNull(aggregators, "null aggregations");
this.rollup = rollup == null ? Boolean.TRUE : rollup;
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
}

Expand All @@ -82,6 +85,7 @@ public QueryableIndex apply(@Nullable File input)
}
}
),
rollup,
aggregators.toArray(new AggregatorFactory[aggregators.size()]),
outDir,
indexSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public void testMergeTaskSerde() throws Exception
"foo",
segments,
aggregators,
true,
indexSpec,
null
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,14 @@ public static SegmentAnalysis mergeAnalyses(
mergedId = "merged";
}

final Boolean rollup;

if (arg1.isRollup() != null && arg2.isRollup() != null && arg1.isRollup().equals(arg2.isRollup())) {
rollup = arg1.isRollup();
} else {
rollup = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does null rollup mean? I feel like whenever we can't make a decision rollup should be set to true with a comment that Druid rolls up data by default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a case, if previous segment is rolled up, and the next segment use new spec which is not to rollup, then for segmentMetadataQuery should handle this case and return null if merge=true
Otherwise, rollup should be always true or false

}

return new SegmentAnalysis(
mergedId,
newIntervals,
Expand All @@ -363,7 +371,8 @@ public static SegmentAnalysis mergeAnalyses(
arg1.getNumRows() + arg2.getNumRows(),
aggregators.isEmpty() ? null : aggregators,
timestampSpec,
queryGranularity
queryGranularity,
rollup
);
}

Expand All @@ -378,7 +387,8 @@ public static SegmentAnalysis finalizeAnalysis(SegmentAnalysis analysis)
analysis.getNumRows(),
analysis.getAggregators(),
analysis.getTimestampSpec(),
analysis.getQueryGranularity()
analysis.getQueryGranularity(),
analysis.isRollup()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,19 @@ public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Obj
queryGranularity = null;
}

Boolean rollup = null;
if (query.hasRollup()) {
if (metadata == null) {
metadata = segment.asStorageAdapter().getMetadata();
}
rollup = metadata != null ? metadata.isRollup() : null;
if (rollup == null) {
// in this case, this segment is built before no-rollup function is coded,
// thus it is built with rollup
rollup = Boolean.TRUE;
}
}

return Sequences.simple(
Arrays.asList(
new SegmentAnalysis(
Expand All @@ -158,7 +171,8 @@ public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Obj
numRows,
aggregators,
timestampSpec,
queryGranularity
queryGranularity,
rollup
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
private final Map<String, AggregatorFactory> aggregators;
private final TimestampSpec timestampSpec;
private final QueryGranularity queryGranularity;
private final Boolean rollup;

@JsonCreator
public SegmentAnalysis(
Expand All @@ -50,7 +51,8 @@ public SegmentAnalysis(
@JsonProperty("numRows") long numRows,
@JsonProperty("aggregators") Map<String, AggregatorFactory> aggregators,
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("queryGranularity") QueryGranularity queryGranularity
@JsonProperty("queryGranularity") QueryGranularity queryGranularity,
@JsonProperty("rollup") Boolean rollup
)
{
this.id = id;
Expand All @@ -61,6 +63,7 @@ public SegmentAnalysis(
this.aggregators = aggregators;
this.timestampSpec = timestampSpec;
this.queryGranularity = queryGranularity;
this.rollup = rollup;
}

@JsonProperty
Expand Down Expand Up @@ -105,6 +108,12 @@ public QueryGranularity getQueryGranularity()
return queryGranularity;
}

@JsonProperty
public Boolean isRollup()
{
return rollup;
}

@JsonProperty
public Map<String, AggregatorFactory> getAggregators()
{
Expand All @@ -123,6 +132,7 @@ public String toString()
", aggregators=" + aggregators +
", timestampSpec=" + timestampSpec +
", queryGranularity=" + queryGranularity +
", rollup=" + rollup +
'}';
}

Expand All @@ -141,6 +151,7 @@ public boolean equals(Object o)
SegmentAnalysis that = (SegmentAnalysis) o;
return size == that.size &&
numRows == that.numRows &&
rollup == that.rollup &&
Objects.equals(id, that.id) &&
Objects.equals(interval, that.interval) &&
Objects.equals(columns, that.columns) &&
Expand All @@ -156,7 +167,7 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
return Objects.hash(id, interval, columns, size, numRows, aggregators, timestampSpec, queryGranularity);
return Objects.hash(id, interval, columns, size, numRows, aggregators, timestampSpec, queryGranularity, rollup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public enum AnalysisType
AGGREGATORS,
MINMAX,
TIMESTAMPSPEC,
QUERYGRANULARITY;
QUERYGRANULARITY,
ROLLUP;

@JsonValue
@Override
Expand Down Expand Up @@ -199,6 +200,11 @@ public boolean hasQueryGranularity()
return analysisTypes.contains(AnalysisType.QUERYGRANULARITY);
}

public boolean hasRollup()
{
return analysisTypes.contains(AnalysisType.ROLLUP);
}

public boolean hasMinMax()
{
return analysisTypes.contains(AnalysisType.MINMAX);
Expand Down
Loading