Skip to content
This repository has been archived by the owner on Mar 27, 2021. It is now read-only.

Commit

Permalink
[all] Read data from MetricBackend continously
Browse files Browse the repository at this point in the history
When reading data from MetricBackend, read with a consumer in a streamed
fashion. This limits the amount of memory retained during the operation.

This feature is disabled by default. Enable by specifying the
'com.spotify.heroic.sliced_data_fetch' feature flag, either per-query or
in the configuration file, like so:

features:
  - com.spotify.heroic.sliced_data_fetch
  • Loading branch information
jo-ri authored and gabrielgerhardsson committed Mar 10, 2017
1 parent 9b83aca commit bc53307
Show file tree
Hide file tree
Showing 18 changed files with 565 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,15 @@ public enum Feature {
* This will assert that there are data outside of the range queried for, which is a useful
* feature when using a dashboarding system.
*/
SHIFT_RANGE("com.spotify.heroic.shift_range");
SHIFT_RANGE("com.spotify.heroic.shift_range"),

/**
* Enable feature to cause data to be fetched in slices.
* <p>
* This will cause data to be fetched and consumed by the aggregation framework in
* pieces avoiding having to load all data into memory before starting to consume it.
*/
SLICED_DATA_FETCH("com.spotify.heroic.sliced_data_fetch");

private final String id;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,46 +27,69 @@
import com.spotify.heroic.common.DateRange;
import com.spotify.heroic.common.Series;
import eu.toolchain.async.Collector;
import lombok.Data;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.Data;

@Data
public class FetchData {
private final QueryTrace trace;
private final List<RequestError> errors;
private final Result result;
private final List<Long> times;
private final List<MetricCollection> groups;

public static Result errorResult(final QueryTrace trace, final RequestError error) {
return new Result(trace, ImmutableList.of(error));
}

public static Result result(final QueryTrace trace) {
return new Result(trace, ImmutableList.of());
}

@Deprecated
public static FetchData error(final QueryTrace trace, final RequestError error) {
return new FetchData(trace, ImmutableList.of(error), ImmutableList.of(),
ImmutableList.of());
return new FetchData(errorResult(trace, error), ImmutableList.of(), ImmutableList.of());
}

@Deprecated
public static FetchData of(
final QueryTrace trace, final List<Long> times, final List<MetricCollection> groups
) {
return new FetchData(trace, ImmutableList.of(), times, groups);
return new FetchData(new Result(trace, ImmutableList.of()), times, groups);
}

public static Collector<FetchData, FetchData> collect(
public static Collector<Result, Result> collectResult(
final QueryTrace.Identifier what
) {
final QueryTrace.NamedWatch w = QueryTrace.watch(what);

return results -> {
final ImmutableList.Builder<Long> times = ImmutableList.builder();
final Map<MetricType, ImmutableList.Builder<Metric>> fetchGroups = new HashMap<>();
final ImmutableList.Builder<QueryTrace> traces = ImmutableList.builder();
final ImmutableList.Builder<RequestError> errors = ImmutableList.builder();

for (final FetchData fetch : results) {
for (final Result result : results) {
traces.add(result.trace);
errors.addAll(result.errors);
}
return new Result(w.end(traces.build()), errors.build());
};
}

@Deprecated
public static Collector<FetchData, FetchData> collect(
final QueryTrace.Identifier what
) {
final Collector<Result, Result> resultCollector = collectResult(what);

return fetchDataCollection -> {
final ImmutableList.Builder<Long> times = ImmutableList.builder();
final Map<MetricType, ImmutableList.Builder<Metric>> fetchGroups = new HashMap<>();
final ImmutableList.Builder<Result> results = ImmutableList.builder();

for (final FetchData fetch : fetchDataCollection) {
times.addAll(fetch.times);
traces.add(fetch.trace);
errors.addAll(fetch.errors);
results.add(fetch.result);

for (final MetricCollection g : fetch.groups) {
ImmutableList.Builder<Metric> data = fetchGroups.get(g.getType());
Expand All @@ -87,7 +110,7 @@ public static Collector<FetchData, FetchData> collect(
Ordering.from(Metric.comparator()).immutableSortedCopy(e.getValue().build())))
.collect(Collectors.toList());

return new FetchData(w.end(traces.build()), errors.build(), times.build(), groups);
return new FetchData(resultCollector.collect(results.build()), times.build(), groups);
};
}

Expand All @@ -98,4 +121,10 @@ public static class Request {
private final DateRange range;
private final QueryOptions options;
}

@Data
public static class Result {
private final QueryTrace trace;
private final List<RequestError> errors;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.spotify.heroic.aggregation.AggregationInstance;
import com.spotify.heroic.cluster.ClusterShard;
import com.spotify.heroic.common.DateRange;
import com.spotify.heroic.common.Features;
import com.spotify.heroic.common.Statistics;
import com.spotify.heroic.filter.Filter;
import com.spotify.heroic.querylogging.QueryContext;
Expand Down Expand Up @@ -116,6 +117,7 @@ public static class Request {
private final AggregationInstance aggregation;
private final QueryOptions options;
private final QueryContext context;
private final Features features;

public Summary summarize() {
return new Summary(source, filter, range, aggregation, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import com.spotify.heroic.common.Grouped;
import com.spotify.heroic.common.Initializing;
import com.spotify.heroic.common.Statistics;

import eu.toolchain.async.AsyncFuture;

import java.util.List;
import java.util.function.Consumer;

public interface MetricBackend extends Initializing, Grouped, Collected {
Statistics getStatistics();
Expand Down Expand Up @@ -59,8 +61,22 @@ public interface MetricBackend extends Initializing, Grouped, Collected {
* @param watcher The watcher implementation to use when fetching metrics.
* @return A future containing the fetched data wrapped in a {@link FetchData} structure.
*/
@Deprecated
AsyncFuture<FetchData> fetch(FetchData.Request request, FetchQuotaWatcher watcher);

/**
* Query for data points that is part of the specified list of rows and range.
*
* @param request Fetch request to use.
* @param watcher The watcher implementation to use when fetching metrics.
* @param metricsConsumer The consumer that receives the fetched data
* @return A future containing the fetch result.
*/
AsyncFuture<FetchData.Result> fetch(
FetchData.Request request, FetchQuotaWatcher watcher,
Consumer<MetricCollection> metricsConsumer
);

/**
* List all series directly from the database.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,4 @@ public interface MetricBackendGroup extends MetricBackend {
*/
AsyncFuture<FullQuery> query(FullQuery.Request request);

/**
* Fetch metrics with a default (no-op) quota watcher. This method allows for the fetching of an
* indefinite amount of metrics.
*
* @see MetricBackend#fetch
*/
AsyncFuture<FetchData> fetch(FetchData.Request request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public AsyncFuture<QueryResult> query(final Query q, final QueryContext queryCon

final FullQuery.Request request =
new FullQuery.Request(source, filter, range, aggregationInstance, options,
queryContext);
queryContext, features);

queryLogger.logOutgoingRequestToShards(queryContext, request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.spotify.heroic.aggregation.RetainQuotaWatcher;
import com.spotify.heroic.async.AsyncObservable;
import com.spotify.heroic.common.DateRange;
import com.spotify.heroic.common.Feature;
import com.spotify.heroic.common.GroupSet;
import com.spotify.heroic.common.Groups;
import com.spotify.heroic.common.OptionalLimit;
Expand All @@ -52,12 +53,6 @@
import eu.toolchain.async.AsyncFuture;
import eu.toolchain.async.LazyTransform;
import eu.toolchain.async.StreamCollector;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand All @@ -68,6 +63,10 @@
import java.util.function.Function;
import javax.inject.Inject;
import javax.inject.Named;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.NotImplementedException;

@Slf4j
@ToString(of = {})
Expand Down Expand Up @@ -167,6 +166,8 @@ public AsyncFuture<FullQuery> query(final FullQuery.Request request) {

queryLogger.logIncomingRequestAtNode(queryContext, request);

final boolean slicedFetch = request.getFeatures().hasFeature(Feature.SLICED_DATA_FETCH);

final QuotaWatcher watcher = new QuotaWatcher(
options.getDataLimit().orElse(dataLimit).asLong().orElse(Long.MAX_VALUE), options
.getAggregationLimit()
Expand Down Expand Up @@ -217,20 +218,6 @@ public AsyncFuture<FullQuery> query(final FullQuery.Request request) {
Statistics.empty(), ResultLimits.of(ResultLimit.AGGREGATION)));
}

final List<Callable<AsyncFuture<Pair<Series, FetchData>>>> fetches =
new ArrayList<>();

/* setup fetches */
accept(b -> {
for (final Series s : result.getSeries()) {
/* Setup fetches from metric backend
* The result is a stream of Pair<Series, FetchData> */
fetches.add(() -> b
.fetch(new FetchData.Request(source, s, range, options), watcher)
.directTransform(d -> Pair.of(s, d)));
}
});

/* setup collector */

final ResultCollector collector;
Expand All @@ -244,8 +231,8 @@ public AsyncFuture<FullQuery> query(final FullQuery.Request request) {
new ConcurrentLinkedQueue<>();

@Override
public void resolved(Pair<Series, FetchData> result) throws Exception {
traces.add(result.getRight().getTrace());
public void resolved(final FetchData.Result result) throws Exception {
traces.add(result.getTrace());
super.resolved(result);
}

Expand All @@ -266,6 +253,28 @@ public QueryTrace buildTrace() {
};
}

final List<Callable<AsyncFuture<FetchData.Result>>> fetches = new ArrayList<>();

/* setup fetches */
accept(b -> {
for (final Series s : result.getSeries()) {
if (slicedFetch) {
fetches.add(
() -> b.fetch(new FetchData.Request(source, s, range, options),
watcher, mc -> collector.acceptMetricsCollection(s, mc)));
} else {
fetches.add(() -> b
.fetch(new FetchData.Request(source, s, range, options), watcher)
.directTransform(d -> {
d.getGroups().forEach(group -> {
collector.acceptMetricsCollection(s, group);
});
return d.getResult();
}));
}
}
});

return async.eventuallyCollect(fetches, collector, fetchParallelism);
};

Expand Down Expand Up @@ -300,8 +309,13 @@ public AsyncFuture<FetchData> fetch(
}

@Override
public AsyncFuture<FetchData> fetch(final FetchData.Request request) {
return fetch(request, FetchQuotaWatcher.NO_QUOTA);
public AsyncFuture<FetchData.Result> fetch(
final FetchData.Request request, final FetchQuotaWatcher watcher,
final Consumer<MetricCollection> metricsConsumer
) {
final List<AsyncFuture<FetchData.Result>> callbacks =
map(b -> b.fetch(request, watcher, metricsConsumer));
return async.collect(callbacks, FetchData.collectResult(FETCH));
}

@Override
Expand Down Expand Up @@ -400,7 +414,7 @@ private <T> List<T> map(final Function<MetricBackend, T> op) {

@RequiredArgsConstructor
private abstract static class ResultCollector
implements StreamCollector<Pair<Series, FetchData>, FullQuery> {
implements StreamCollector<FetchData.Result, FullQuery> {
final ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>();
final ConcurrentLinkedQueue<RequestError> requestErrors = new ConcurrentLinkedQueue<>();

Expand All @@ -413,19 +427,13 @@ private abstract static class ResultCollector
final boolean failOnLimits;

@Override
public void resolved(final Pair<Series, FetchData> result) throws Exception {
final FetchData f = result.getRight();
requestErrors.addAll(f.getErrors());
try {
for (final MetricCollection g : f.getGroups()) {
g.updateAggregation(session, result.getLeft().getTags(),
ImmutableSet.of(result.getLeft()));
dataInMemoryReporter.reportDataNoLongerNeeded(g.size());
}
} catch (QuotaViolationException e) {
// Seems resolved() should not throw exception
log.debug("Aggregation quota violated", e);
}
public void resolved(final FetchData.Result result) throws Exception {
requestErrors.addAll(result.getErrors());
}

void acceptMetricsCollection(final Series series, MetricCollection g) {
g.updateAggregation(session, series.getTags(), ImmutableSet.of(series));
dataInMemoryReporter.reportDataNoLongerNeeded(g.size());
}

@Override
Expand Down
Loading

0 comments on commit bc53307

Please sign in to comment.