Skip to content

Commit

Permalink
groupBy v2: Ignore timestamp completely when granularity = all, excep…
Browse files Browse the repository at this point in the history
…t for the final merge. (#3740)

* GroupByBenchmark: Add serde, spilling, all-gran benchmarks.

Also use more iterations.

* groupBy v2: Ignore timestamp completely when granularity = all, except for the final merge.

Specifically:

- Remove timestamp from RowBasedKey when not needed
- Set timestamp to null in MapBasedRows that are not part of the final merge.
  • Loading branch information
gianm authored and fjy committed Dec 7, 2016
1 parent f995b14 commit b1bac9f
Show file tree
Hide file tree
Showing 6 changed files with 322 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import com.google.common.io.Files;

import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
Expand All @@ -39,6 +39,7 @@
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
Expand Down Expand Up @@ -105,14 +106,14 @@

@State(Scope.Benchmark)
@Fork(jvmArgsPrepend = "-server", value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 25)
@Warmup(iterations = 15)
@Measurement(iterations = 30)
public class GroupByBenchmark
{
@Param({"4"})
private int numSegments;

@Param({"4"})
@Param({"2", "4"})
private int numProcessingThreads;

@Param({"-1"})
Expand All @@ -127,6 +128,9 @@ public class GroupByBenchmark
@Param({"v1", "v2"})
private String defaultStrategy;

@Param({"all", "day"})
private String queryGranularity;

private static final Logger log = new Logger(GroupByBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMergerV9 INDEX_MERGER_V9;
Expand All @@ -137,7 +141,7 @@ public class GroupByBenchmark
private IncrementalIndex anIncrementalIndex;
private List<QueryableIndex> queryableIndexes;

private QueryRunnerFactory factory;
private QueryRunnerFactory<Row, GroupByQuery> factory;

private BenchmarkSchemaInfo schemaInfo;
private GroupByQuery query;
Expand Down Expand Up @@ -190,7 +194,7 @@ private void setupQueries()
.setAggregatorSpecs(
queryAggs
)
.setGranularity(QueryGranularities.DAY)
.setGranularity(QueryGranularity.fromString(queryGranularity))
.build();

basicQueries.put("A", queryA);
Expand Down Expand Up @@ -335,7 +339,7 @@ public int getBufferGrouperInitialBuckets()
@Override
public long getMaxOnDiskStorage()
{
return 0L;
return 1_000_000_000L;
}
};
config.setSingleThreaded(false);
Expand Down Expand Up @@ -475,29 +479,68 @@ public void querySingleQueryableIndex(Blackhole blackhole) throws Exception
}
}


@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void queryMultiQueryableIndex(Blackhole blackhole) throws Exception
{
List<QueryRunner<Row>> singleSegmentRunners = Lists.newArrayList();
QueryToolChest toolChest = factory.getToolchest();
for (int i = 0; i < numSegments; i++) {
String segmentName = "qIndex" + i;
QueryRunner<Row> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
segmentName,
new QueryableIndexSegment(segmentName, queryableIndexes.get(i))
);
singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
QueryToolChest<Row, GroupByQuery> toolChest = factory.getToolchest();
QueryRunner<Row> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(executorService, makeMultiRunners())
),
(QueryToolChest) toolChest
);

Sequence<Row> queryResult = theRunner.run(query, Maps.<String, Object>newHashMap());
List<Row> results = Sequences.toList(queryResult, Lists.<Row>newArrayList());

for (Row result : results) {
blackhole.consume(result);
}
}

QueryRunner theRunner = toolChest.postMergeQueryDecoration(
new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)),
toolChest
)
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void queryMultiQueryableIndexWithSpilling(Blackhole blackhole) throws Exception
{
QueryToolChest<Row, GroupByQuery> toolChest = factory.getToolchest();
QueryRunner<Row> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(executorService, makeMultiRunners())
),
(QueryToolChest) toolChest
);

final GroupByQuery spillingQuery = query.withOverriddenContext(
ImmutableMap.<String, Object>of("bufferGrouperMaxSize", 4000)
);
Sequence<Row> queryResult = theRunner.run(spillingQuery, Maps.<String, Object>newHashMap());
List<Row> results = Sequences.toList(queryResult, Lists.<Row>newArrayList());

for (Row result : results) {
blackhole.consume(result);
}
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void queryMultiQueryableIndexWithSerde(Blackhole blackhole) throws Exception
{
QueryToolChest<Row, GroupByQuery> toolChest = factory.getToolchest();
QueryRunner<Row> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
new SerializingQueryRunner<>(
new DefaultObjectMapper(new SmileFactory()),
Row.class,
toolChest.mergeResults(
factory.mergeRunners(executorService, makeMultiRunners())
)
)
),
(QueryToolChest) toolChest
);

Sequence<Row> queryResult = theRunner.run(query, Maps.<String, Object>newHashMap());
Expand All @@ -507,4 +550,19 @@ public void queryMultiQueryableIndex(Blackhole blackhole) throws Exception
blackhole.consume(result);
}
}

private List<QueryRunner<Row>> makeMultiRunners()
{
List<QueryRunner<Row>> runners = Lists.newArrayList();
for (int i = 0; i < numSegments; i++) {
String segmentName = "qIndex" + i;
QueryRunner<Row> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
segmentName,
new QueryableIndexSegment(segmentName, queryableIndexes.get(i))
);
runners.add(factory.getToolchest().preMergeQueryDecoration(runner));
}
return runners;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.benchmark.query;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Query;
import io.druid.query.QueryRunner;

import java.util.Map;

public class SerializingQueryRunner<T> implements QueryRunner<T>
{
private final ObjectMapper smileMapper;
private final QueryRunner<T> baseRunner;
private final Class<T> clazz;

public SerializingQueryRunner(
ObjectMapper smileMapper,
Class<T> clazz,
QueryRunner<T> baseRunner
)
{
this.smileMapper = smileMapper;
this.clazz = clazz;
this.baseRunner = baseRunner;
}

@Override
public Sequence<T> run(
final Query<T> query,
final Map<String, Object> responseContext
)
{
return Sequences.map(
baseRunner.run(query, responseContext),
new Function<T, T>()
{
@Override
public T apply(T input)
{
try {
return smileMapper.readValue(smileMapper.writeValueAsBytes(input), clazz);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
);
}
}
18 changes: 16 additions & 2 deletions processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularities;
import io.druid.granularity.QueryGranularity;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
Expand Down Expand Up @@ -285,7 +286,18 @@ public Ordering<Row> getRowOrdering(final boolean granular)

final Comparator<Row> timeComparator = getTimeComparator(granular);

if (sortByDimsFirst) {
if (timeComparator == null) {
return Ordering.from(
new Comparator<Row>()
{
@Override
public int compare(Row lhs, Row rhs)
{
return compareDims(dimensions, lhs, rhs);
}
}
);
} else if (sortByDimsFirst) {
return Ordering.from(
new Comparator<Row>()
{
Expand Down Expand Up @@ -323,7 +335,9 @@ public int compare(Row lhs, Row rhs)

private Comparator<Row> getTimeComparator(boolean granular)
{
if (granular) {
if (QueryGranularities.ALL.equals(granularity)) {
return null;
} else if (granular) {
return new Comparator<Row>()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package io.druid.query.groupby.epinephelinae;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
Expand Down Expand Up @@ -135,12 +134,13 @@ public boolean aggregate(KeyType key, int keyHash)
return false;
}

Preconditions.checkArgument(
keyBuffer.remaining() == keySize,
"keySerde.toByteBuffer(key).remaining[%s] != keySerde.keySize[%s], buffer was the wrong size?!",
keyBuffer.remaining(),
keySize
);
if (keyBuffer.remaining() != keySize) {
throw new IAE(
"keySerde.toByteBuffer(key).remaining[%s] != keySerde.keySize[%s], buffer was the wrong size?!",
keyBuffer.remaining(),
keySize
);
}

int bucket = findBucket(
tableBuffer,
Expand Down
Loading

0 comments on commit b1bac9f

Please sign in to comment.