Skip to content
This repository has been archived by the owner on Mar 27, 2021. It is now read-only.

Commit

Permalink
Remove legacy source input, move distribution type out of request input
Browse files Browse the repository at this point in the history
  • Loading branch information
lmuhlha committed Jan 27, 2021
1 parent 687ca05 commit 7956c01
Show file tree
Hide file tree
Showing 20 changed files with 114 additions and 109 deletions.
2 changes: 0 additions & 2 deletions heroic-component/src/main/java/com/spotify/heroic/Query.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ package com.spotify.heroic
import com.spotify.heroic.aggregation.Aggregation
import com.spotify.heroic.common.FeatureSet
import com.spotify.heroic.filter.Filter
import com.spotify.heroic.metric.MetricType
import java.util.*

data class Query(
val aggregation: Optional<Aggregation>,
val source: Optional<MetricType>,
val range: Optional<QueryDateRange>,
val filter: Optional<Filter>,
val options: Optional<QueryOptions>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.spotify.heroic.filter.Filter;
import com.spotify.heroic.filter.MatchKeyFilter;
import com.spotify.heroic.filter.MatchTagFilter;
import com.spotify.heroic.metric.MetricType;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -40,7 +39,6 @@
import org.slf4j.LoggerFactory;

public class QueryBuilder {
private Optional<MetricType> source = Optional.empty();
private Optional<Map<String, String>> tags = Optional.empty();
private Optional<String> key = Optional.empty();
private Optional<Filter> filter = Optional.empty();
Expand Down Expand Up @@ -106,11 +104,6 @@ public QueryBuilder aggregation(final Optional<Aggregation> aggregation) {
return this;
}

public QueryBuilder source(Optional<MetricType> source) {
this.source = source;
return this;
}

public QueryBuilder options(final Optional<QueryOptions> options) {
checkNotNull(options, "options");
this.options = pickOptional(this.options, options);
Expand Down Expand Up @@ -146,7 +139,7 @@ public QueryBuilder features(final Optional<FeatureSet> features) {
}

public Query build() {
return new Query(aggregation, source, range, legacyFilter(), options, features);
return new Query(aggregation, range, legacyFilter(), options, features);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ data class FetchData(
}

data class Request(
val type: MetricType,
val series: Series,
val range: DateRange,
val options: QueryOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ static Summary create(
public abstract static class Request {
@JsonCreator
public static Request create(
@JsonProperty("source") MetricType source,
@JsonProperty("filter") Filter filter,
@JsonProperty("range") DateRange range,
@JsonProperty("aggregation") AggregationInstance aggregation,
Expand All @@ -167,11 +166,10 @@ public static Request create(
@JsonProperty("features") Features features
) {
return new AutoValue_FullQuery_Request(
source, filter, range, aggregation, options, context, features);
filter, range, aggregation, options, context, features);
}

@JsonProperty
public abstract MetricType source();

@JsonProperty
public abstract Filter filter();
@JsonProperty
Expand All @@ -186,12 +184,11 @@ public static Request create(
public abstract Features features();

public Summary summarize() {
return Summary.create(source(), filter(), range(), aggregation(), options());
return Summary.create(filter(), range(), aggregation(), options());
}

public void hashTo(final ObjectHasher hasher) {
hasher.putObject(getClass(), () -> {
hasher.putField("source", source(), hasher.enumValue());
hasher.putField("filter", filter(), hasher.with(Filter::hashTo));
hasher.putField("range", range(), hasher.with(DateRange::hashTo));
hasher.putField("aggregation", aggregation(),
Expand All @@ -205,18 +202,15 @@ public void hashTo(final ObjectHasher hasher) {
public abstract static class Summary {
@JsonCreator
public static Summary create(
@JsonProperty("source") MetricType source,
@JsonProperty("filter") Filter filter,
@JsonProperty("range") DateRange range,
@JsonProperty("aggregation") AggregationInstance aggregation,
@JsonProperty("options") QueryOptions options
) {
return new AutoValue_FullQuery_Request_Summary(
source, filter, range, aggregation, options);
filter, range, aggregation, options);
}

@JsonProperty
abstract MetricType source();
@JsonProperty
abstract Filter filter();
@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ AsyncFuture<FetchData.Result> fetch(
FetchData.Request request,
FetchQuotaWatcher watcher,
Consumer<MetricReadResult> metricsConsumer,
Span parentSpan
Span parentSpan,
MetricType metricType

);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,12 @@ public abstract class QueryMetrics {
public static QueryMetrics create(
Optional<String> query,
Optional<Aggregation> aggregation,
Optional<String> source,
Optional<QueryDateRange> range,
Optional<Filter> filter,
Optional<QueryOptions> options,
Optional<JsonNode> clientContext
) {
return legacyCreate(query, aggregation, source, range, filter, options, clientContext,
return legacyCreate(query, aggregation, range, filter, options, clientContext,
Optional.empty(), Optional.empty(), Optional.empty(),
Optional.empty(), false);
}
Expand All @@ -60,7 +59,6 @@ public static QueryMetrics create(
public static QueryMetrics legacyCreate(
@JsonProperty("query") Optional<String> query,
@JsonProperty("aggregation") Optional<Aggregation> aggregation,
@JsonProperty("source") Optional<String> source,
@JsonProperty("range") Optional<QueryDateRange> range,
@JsonProperty("filter") Optional<Filter> filter,
@JsonProperty("options") Optional<QueryOptions> options,
Expand All @@ -75,18 +73,16 @@ public static QueryMetrics legacyCreate(

final Optional<Aggregation> legitAggregation = firstPresent(aggregation,
aggregators.filter(c -> !c.isEmpty()).map(Chain::fromList));
final Optional<MetricType> sourceMetric = source.flatMap(MetricType::fromIdentifier);

return new AutoValue_QueryMetrics(query, legitAggregation, sourceMetric, range, filter,
return new AutoValue_QueryMetrics(query, legitAggregation, range, filter,
options, clientContext, key, tags, features);
}

@JsonProperty("query")
public abstract Optional<String> query();
@JsonProperty("aggregation")
public abstract Optional<Aggregation> aggregation();
@JsonProperty("source")
public abstract Optional<MetricType> source();

@JsonProperty("range")
public abstract Optional<QueryDateRange> range();
@JsonProperty("filter")
Expand All @@ -111,7 +107,6 @@ public QueryBuilder toQueryBuilder(final Function<String, QueryBuilder> stringTo
.filter(filter())
.range(range())
.aggregation(aggregation())
.source(source())
.options(options())
.clientContext(clientContext());

Expand Down
1 change: 1 addition & 0 deletions heroic-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ dependencies {
// Default usage tracking module. Normally different module dependencies are handled in
// heroic-dist, but defaults need to be accessible in heroic-core.
implementation project(':heroic-usage-tracking-google-analytics')
implementation project(':heroic-aggregation-simple')

testImplementation project(':heroic-test')
testImplementation project(path: ':heroic-component', configuration: 'testRuntime')
Expand Down
68 changes: 38 additions & 30 deletions heroic-core/src/main/java/com/spotify/heroic/CoreQueryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import com.spotify.heroic.aggregation.BucketStrategy;
import com.spotify.heroic.aggregation.DistributedAggregationCombiner;
import com.spotify.heroic.aggregation.Empty;
import com.spotify.heroic.aggregation.GroupInstance;
import com.spotify.heroic.aggregation.TDigestAggregationCombiner;
import com.spotify.heroic.aggregation.simple.TdigestInstance;
import com.spotify.heroic.cache.QueryCache;
import com.spotify.heroic.cluster.ClusterManager;
import com.spotify.heroic.cluster.ClusterNode;
Expand Down Expand Up @@ -100,6 +102,7 @@
import org.slf4j.LoggerFactory;

public class CoreQueryManager implements QueryManager {

private static final Logger log = LoggerFactory.getLogger(CoreQueryManager.class);
public static final long SHIFT_TOLERANCE = TimeUnit.MILLISECONDS.convert(10, TimeUnit.SECONDS);
public static final QueryTrace.Identifier QUERY_SHARD =
Expand Down Expand Up @@ -171,8 +174,6 @@ public QueryBuilder newQueryFromString(final String queryString) {
return expressions.get(0).eval(scope).visit(new Expression.Visitor<QueryBuilder>() {
@Override
public QueryBuilder visitQuery(final QueryExpression e) {
final Optional<MetricType> source = e.getSource();

final Optional<QueryDateRange> range =
e.getRange().map(expr -> expr.visit(new Expression.Visitor<QueryDateRange>() {
@Override
Expand Down Expand Up @@ -202,7 +203,6 @@ public Aggregation visitString(final StringExpression e) {
final Optional<Filter> filter = e.getFilter();

return new QueryBuilder()
.source(source)
.range(range)
.aggregation(aggregation)
.filter(filter);
Expand All @@ -211,6 +211,7 @@ public Aggregation visitString(final StringExpression e) {
}

public class Group implements QueryManager.Group {

private final List<ClusterShard> shards;

public Group(List<ClusterShard> shards) {
Expand All @@ -237,12 +238,12 @@ public AsyncFuture<QueryResult> query(

final List<AsyncFuture<QueryResultPart>> futures = new ArrayList<>();

final MetricType source = q.getSource().orElse(MetricType.POINT);

final Aggregation aggregation = q.getAggregation().orElse(Empty.INSTANCE);

final DateRange rawRange = buildRange(q);

MetricType metricType = MetricType.POINT;

final Filter filter = q.getFilter().orElseGet(TrueFilter::get);

final AggregationContext context =
Expand All @@ -252,12 +253,15 @@ public AsyncFuture<QueryResult> query(

final AggregationInstance aggregationInstance;

final AggregationCombiner combiner;

final Features features = requestFeatures(q, queryContext);

boolean isDistributed = features.hasFeature(Feature.DISTRIBUTED_AGGREGATIONS);

if (isDistributed) {
aggregationInstance = root.distributed();

} else {
aggregationInstance = root;
}
Expand All @@ -278,20 +282,23 @@ public AsyncFuture<QueryResult> query(
.orElseGet(() -> features.withFeature(Feature.END_BUCKET, () -> BucketStrategy.END,
() -> BucketStrategy.START));

final AggregationCombiner combiner;

if (aggregationInstance instanceof GroupInstance) {
if (((GroupInstance) aggregationInstance).getEach() instanceof TdigestInstance) {
metricType = MetricType.DISTRIBUTION_POINTS;
}
}

if (isDistributed) {
combiner = (source.equals(MetricType.POINT)) ?
DistributedAggregationCombiner.create(root, range, bucketStrategy) :
TDigestAggregationCombiner.create(root, range, bucketStrategy);
combiner = (metricType.equals(MetricType.POINT)) ?
DistributedAggregationCombiner.create(root, range, bucketStrategy) :
TDigestAggregationCombiner.create(root, range, bucketStrategy);
} else {
combiner = (source.equals(MetricType.DISTRIBUTION_POINTS)) ?
AggregationCombiner.TDIGEST_DEFAULT : AggregationCombiner.DEFAULT;
combiner = (metricType.equals(MetricType.DISTRIBUTION_POINTS)) ?
AggregationCombiner.TDIGEST_DEFAULT : AggregationCombiner.DEFAULT;
}

final FullQuery.Request request =
FullQuery.Request.create(source, filter, range, aggregationInstance, options,
FullQuery.Request.create(filter, range, aggregationInstance, options,
queryContext, features);

queryLogger.logOutgoingRequestToShards(queryContext, request);
Expand Down Expand Up @@ -334,21 +341,20 @@ public AsyncFuture<QueryResult> query(
QueryResult.collectParts(QUERY, range, combiner, limit));
});

return query
.directTransform(result -> {
reportCompletedQuery(result, fullQueryWatch);
if (result.getErrors().size() > 0) {
queryManagerSpan.addAnnotation(result.getErrors().toString());
queryManagerSpan.putAttribute("error", booleanAttributeValue(true));
}
queryManagerSpan.putAttribute("preAggregationSampleSize",
longAttributeValue(result.getPreAggregationSampleSize()));

return query
.directTransform(result -> {
reportCompletedQuery(result, fullQueryWatch);
if (result.getErrors().size() > 0) {
queryManagerSpan.addAnnotation(result.getErrors().toString());
queryManagerSpan.putAttribute("error", booleanAttributeValue(true));
}
queryManagerSpan.putAttribute("preAggregationSampleSize",
longAttributeValue(result.getPreAggregationSampleSize()));

return result;
})
.onDone(reporter.reportQuery())
.onDone(new EndSpanFutureReporter(queryManagerSpan));
return result;
})
.onDone(reporter.reportQuery())
.onDone(new EndSpanFutureReporter(queryManagerSpan));
}

private void reportCompletedQuery(
Expand Down Expand Up @@ -500,7 +506,7 @@ private Features requestFeatures(final Query q, final QueryContext context) {
hasWarnedSlicedDataFetch = true;
log.warn(
"The mandatory feature 'com.spotify.heroic.sliced_data_fetch' can't be " +
"disabled!");
"disabled!");
}
}

Expand Down Expand Up @@ -560,6 +566,7 @@ private Duration cadenceFromRange(final DateRange range) {
* too close or after 'now'. This is useful to avoid querying non-complete buckets.
*
* @param rawRange Original range.
*
* @return A possibly shifted range.
*/
DateRange buildShiftedRange(DateRange rawRange, long cadence, long now) {
Expand All @@ -584,10 +591,11 @@ DateRange buildShiftedRange(DateRange rawRange, long cadence, long now) {
* Calculate a tolerance shift period that corresponds to the given difference that needs to be
* applied to the range to honor the tolerance shift period.
*
* @param diff The time difference to apply.
* @param diff The time difference to apply.
* @param cadence The cadence period.
*
* @return The number of milliseconds that the query should be shifted to get within 'now' and
* maintain the given cadence.
* maintain the given cadence.
*/
private long toleranceShiftPeriod(final long diff, final long cadence) {
// raw query, only shift so that we are within now.
Expand Down
Loading

0 comments on commit 7956c01

Please sign in to comment.