Skip to content

Commit

Permalink
Align downsampling intervals to the Gregorian calendar.
Browse files Browse the repository at this point in the history
This feature supports the alignment of downsampling intervals to the
Gregorian calendar based on four different time categories:

- DAILY: The start time of each interval is computed as the start of the
  day in which the first data point occurs, based on a specified time zone
  (or the default JVM time zone, if no time zone has been specified).
  The end time of each interval is computed as the end of the day in which
  the first data point occurs.  For instance, if the specified time zone
  is UTC, and the timestamp of the first data point is 2016-01-05T05:32:00Z,
  then start of the interval will be computed as 2016-01-05T00:00:00.000Z,
  while the end of the interval will be computed as 2016-01-05T23:59:59.999Z.

- WEEKLY: The start time of each interval is computed as the start of the
  week in which the first data point occurs, based on a specified time zone
  (or the default JVM time zone, if no time zone has been specified).
  The end time of each interval is computed as the end of the week in which
  the first data point occurs.  Weeks are considered to begin on Sundays (in
  the future, it might be a good idea to allow for variations based on a
  configuration setting). For instance, if the specified time zone is UTC,
  and the timestamp of the first data point is 2016-01-05T05:32:00Z, then
  start of the interval will be computed as 2016-01-03T00:00:00.000Z,
  while the end of the interval will be computed as 2016-01-09T23:59:59.999Z.

- MONTHLY: The start time of each interval is computed as the start of the
  month in which the first data point occurs, based on a specified time zone
  (or the default JVM time zone, if no time zone has been specified).
  The end time of each interval is computed as the end of the month in which
  the first data point occurs.  For instance, if the specified time zone
  is UTC, and the timestamp of the first data point is 2016-01-05T05:32:00Z,
  then start of the interval will be computed as 2016-01-01T00:00:00.000Z,
  while the end of the interval will be computed as 2016-01-31T23:59:59.999Z.

- YEARLY: The start time of each interval is computed as the start of the
  year in which the first data point occurs, based on a specified time zone
  (or the default JVM time zone, if no time zone has been specified).
  The end time of each interval is computed as the end of the year in which
  the first data point occurs.  For instance, if the specified time zone
  is UTC, and the timestamp of the first data point is 2016-01-05T05:32:00Z,
  then start of the interval will be computed as 2016-01-01T00:00:00.000Z,
  while the end of the interval will be computed as 2016-12-31T23:59:59.999Z.

This feature also allows for the alignment of intervals that are multiples
of one year, one month, one week, or one day.  In cases where a given
interval is a multiple of more than one time category, the larger time
category will be used. For instance, an interval of 24 months will be
interpreted as a interval of two years, and will be aligned to the calendar
accordingly. As such, if the specified time zone is UTC,
and the timestamp of the first data point is 2016-03-05T05:32:00Z, then
start of the interval will be computed as 2016-01-01T00:00:00.000Z,
while the end of the interval will be computed as 2017-12-31T23:59:59.999Z.
This is in keeping with the principle of least astonishment.

To specify the time zone for a given HTTP query, include a query string
parameter named "tz" with a value equal to a JVM time zone id (e.g. "UTC").
If a time zone is not included in the query string, the default JVM time zone
will be used.

To specify that a given HTTP query should use the calendar alignment feature
for downsampling, include a query string parameter named "use_calendar" with
a value of "true". You can stipulate that all HTTP queries should use the
calendar alignment feature by including a "tsd.query.downsample.use_calendar"
configuration setting within the opentsdb.conf file and by setting its value
to "true" (the default value is "false").  This config file setting can be
overridden on a per-query basis by including the "use_calendar" parameter in
the query string as specified above.
  • Loading branch information
Carlos Devoto committed Dec 13, 2015
1 parent 2b66c77 commit 781c4f3
Show file tree
Hide file tree
Showing 15 changed files with 1,175 additions and 47 deletions.
51 changes: 47 additions & 4 deletions src/core/AggregationIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;

import com.google.common.annotations.VisibleForTesting;
import java.util.TimeZone;

import net.opentsdb.core.Aggregators.Interpolation;
import net.opentsdb.utils.DateTime;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

/**
* Iterator that aggregates multiple spans or time series data and does linear
* interpolation (lerp) for missing data points.
Expand Down Expand Up @@ -225,6 +228,41 @@ public static AggregationIterator create(final List<Span> spans,
return create(spans, start_time, end_time, aggregator, method, downsampler,
sample_interval_ms, rate, rate_options, null);
}

/**
* Creates a new iterator for a {@link SpanGroup}.
* @param spans Spans in a group.
* @param start_time Any data point strictly before this timestamp will be
* ignored.
* @param end_time Any data point strictly after this timestamp will be
* ignored.
* @param aggregator The aggregation function to use.
* @param method Interpolation method to use when aggregating time series
* @param downsampler Aggregation function to use to group data points
* within an interval.
* @param sample_interval_ms Number of milliseconds wanted between each data
* point.
* @param rate If {@code true}, the rate of the series will be used instead
* of the actual values.
* @param rate_options Specifies the optional additional rate calculation
* options.
* @param fill_policy Policy specifying whether to interpolate or to fill
* missing intervals with special values.
* @return An {@link AggregationIterator} object.
*/
public static AggregationIterator create(final List<Span> spans,
final long start_time,
final long end_time,
final Aggregator aggregator,
final Interpolation method,
final Aggregator downsampler,
final long sample_interval_ms,
final boolean rate,
final RateOptions rate_options,
final FillPolicy fill_policy) {
return create(spans, start_time, end_time, aggregator, method, downsampler,
sample_interval_ms, rate, rate_options, fill_policy, null, false);
}

/**
* Creates a new iterator for a {@link SpanGroup}.
Expand All @@ -245,6 +283,8 @@ public static AggregationIterator create(final List<Span> spans,
* options.
* @param fill_policy Policy specifying whether to interpolate or to fill
* missing intervals with special values.
* @param timezone The timezone to use for aligning intervals based on the calendar.
* @param use_calendar A flag denoting whether or not to align intervals based on the calendar.
* @return An {@link AggregationIterator} object.
* @since 2.2
*/
Expand All @@ -257,16 +297,19 @@ public static AggregationIterator create(final List<Span> spans,
final long sample_interval_ms,
final boolean rate,
final RateOptions rate_options,
final FillPolicy fill_policy) {
final FillPolicy fill_policy,
final String timezone,
final boolean use_calendar) {
final int size = spans.size();
final SeekableView[] iterators = new SeekableView[size];
TimeZone tz = DateTime.timezones.get(timezone);
for (int i = 0; i < size; i++) {
SeekableView it;
if (downsampler == null) {
it = spans.get(i).spanIterator();
} else {
it = spans.get(i).downsampler(start_time, end_time, sample_interval_ms,
downsampler, fill_policy);
downsampler, fill_policy, tz, use_calendar);
}
if (rate) {
it = new RateSpan(it, rate_options);
Expand Down
166 changes: 157 additions & 9 deletions src/core/Downsampler.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,28 @@
// see <http://www.gnu.org/licenses/>.
package net.opentsdb.core;

import static net.opentsdb.utils.DateTime.toEndOfDay;
import static net.opentsdb.utils.DateTime.toEndOfMonth;
import static net.opentsdb.utils.DateTime.toEndOfWeek;
import static net.opentsdb.utils.DateTime.toEndOfYear;
import static net.opentsdb.utils.DateTime.toStartOfDay;
import static net.opentsdb.utils.DateTime.toStartOfMonth;
import static net.opentsdb.utils.DateTime.toStartOfWeek;
import static net.opentsdb.utils.DateTime.toStartOfYear;

import java.util.NoSuchElementException;
import java.util.TimeZone;

/**
* Iterator that downsamples data points using an {@link Aggregator}.
*/
public class Downsampler implements SeekableView, DataPoint {

static final long ONE_WEEK_INTERVAL = 604800000L;
static final long ONE_MONTH_INTERVAL = 2592000000L;
static final long ONE_YEAR_INTERVAL = 31536000000L;
static final long ONE_DAY_INTERVAL = 86400000L;

/** Function to use for downsampling. */
protected final Aggregator downsampler;
/** Iterator to iterate the values of the current interval. */
Expand All @@ -29,7 +44,7 @@ public class Downsampler implements SeekableView, DataPoint {
protected double value;

/**
* Ctor.
* Ctor (for backward compatibility).
* @param source The iterator to access the underlying data.
* @param interval_ms The interval in milli seconds wanted between each data
* point.
Expand All @@ -38,10 +53,26 @@ public class Downsampler implements SeekableView, DataPoint {
Downsampler(final SeekableView source,
final long interval_ms,
final Aggregator downsampler) {
this.values_in_interval = new ValuesInInterval(source, interval_ms);
this.downsampler = downsampler;
this(source, interval_ms, downsampler, null, false);
}

/**
* Ctor.
* @param source The iterator to access the underlying data.
* @param interval_ms The interval in milli seconds wanted between each data
* point.
* @param downsampler The downsampling function to use.
* @param timezone The timezone to use for aligning intervals based on the calendar.
* @param use_calendar A flag denoting whether or not to align intervals based on the calendar.
*/
Downsampler(final SeekableView source,
final long interval_ms,
final Aggregator downsampler,
final TimeZone timezone,
final boolean use_calendar) {
this.values_in_interval = new ValuesInInterval(source, interval_ms, timezone, use_calendar);
this.downsampler = downsampler;
}
// ------------------ //
// Iterator interface //
// ------------------ //
Expand Down Expand Up @@ -133,6 +164,10 @@ protected static class ValuesInInterval implements Aggregator.Doubles {
private boolean has_next_value_from_source = false;
/** The last data point extracted from the source. */
private DataPoint next_dp = null;
/** The timezone to use for aligning intervals based on the calendar */
private final TimeZone timezone;
/** A flag denoting whether or not to align intervals based on the calendar */
private final boolean use_calendar;

/** True if it is initialized for iterating intervals. */
private boolean initialized = false;
Expand All @@ -142,10 +177,13 @@ protected static class ValuesInInterval implements Aggregator.Doubles {
* @param source The iterator to access the underlying data.
* @param interval_ms Downsampling interval.
*/
ValuesInInterval(final SeekableView source, final long interval_ms) {
ValuesInInterval(final SeekableView source, final long interval_ms,
final TimeZone timezone, boolean use_calendar) {
this.source = source;
this.interval_ms = interval_ms;
this.timestamp_end_interval = interval_ms;
this.timezone = timezone != null ? timezone : TimeZone.getDefault();
this.use_calendar = use_calendar;
}

/** Initializes to iterate intervals. */
Expand Down Expand Up @@ -177,8 +215,14 @@ private void moveToNextValue() {
private void resetEndOfInterval() {
if (has_next_value_from_source) {
// Sets the end of the interval of the timestamp.
timestamp_end_interval = alignTimestamp(next_dp.timestamp()) +

if (use_calendar && isCalendarInterval()) {
timestamp_end_interval = toEndOfInterval(next_dp.timestamp());
} else {
// default timestamp normalization (tsdb v2.1.0)
timestamp_end_interval = alignTimestamp(next_dp.timestamp()) +
interval_ms;
}
}
}

Expand All @@ -193,8 +237,14 @@ void seekInterval(final long timestamp) {
// To make sure that the interval of the given timestamp is fully filled,
// rounds up the seeking timestamp to the smallest timestamp that is
// a multiple of the interval and is greater than or equal to the given
// timestamp..
source.seek(alignTimestamp(timestamp + interval_ms - 1));
// timestamp.
if (use_calendar && isCalendarInterval()) {
source.seek(alignTimestamp(timestamp + toEndOfInterval(timestamp)
- toStartOfInterval(timestamp)));
} else {
source.seek(alignTimestamp(timestamp + interval_ms - 1));
}

initialized = false;
}

Expand All @@ -203,13 +253,111 @@ protected long getIntervalTimestamp() {
// NOTE: It is well-known practice taking the start time of
// a downsample interval as a representative timestamp of it. It also
// provides the correct context for seek.
return alignTimestamp(timestamp_end_interval - interval_ms);
if (use_calendar && isCalendarInterval()) {
return toStartOfInterval(timestamp_end_interval);
} else {
return alignTimestamp(timestamp_end_interval - interval_ms);
}
}

/** Returns timestamp aligned by interval. */
protected long alignTimestamp(final long timestamp) {
return timestamp - (timestamp % interval_ms);
if (use_calendar && isCalendarInterval()) {
return toStartOfInterval(timestamp);
} else {
return timestamp - (timestamp % interval_ms);
}
}

/** Returns a flag denoting whether the interval can
* be aligned to the calendar */
private boolean isCalendarInterval () {
if (interval_ms != 0 &&
(interval_ms % ONE_YEAR_INTERVAL == 0 ||
interval_ms % ONE_MONTH_INTERVAL == 0 ||
interval_ms % ONE_WEEK_INTERVAL == 0 ||
interval_ms % ONE_DAY_INTERVAL == 0)) {
return true;
}
return false;
}

/** Returns a timestamp corresponding to the start of the interval
* in which the specified timestamp occurs, aligned to the calendar
* based on the timezone. */
private long toStartOfInterval(long timestamp) {
if (interval_ms % ONE_YEAR_INTERVAL == 0) {
final long multiplier = interval_ms / ONE_YEAR_INTERVAL;
long result = timestamp;
for (long i = 0; i < multiplier; i++) {
result = toStartOfYear(result, timezone) - 1;
}
return result + 1;
} else if (interval_ms % ONE_MONTH_INTERVAL == 0) {
final long multiplier = interval_ms / ONE_MONTH_INTERVAL;
long result = timestamp;
for (long i = 0; i < multiplier; i++) {
result = toStartOfMonth(result, timezone) - 1;
}
return result + 1;
} else if (interval_ms % ONE_WEEK_INTERVAL == 0) {
final long multiplier = interval_ms / ONE_WEEK_INTERVAL;
long result = timestamp;
for (long i = 0; i < multiplier; i++) {
result = toStartOfWeek(result, timezone) - 1;
}
return result + 1;
} else if (interval_ms % ONE_DAY_INTERVAL == 0) {
final long multiplier = interval_ms / ONE_DAY_INTERVAL;
long result = timestamp;
for (long i = 0; i < multiplier; i++) {
result = toStartOfDay(result, timezone) - 1;
}
return result + 1;
} else {
throw new IllegalArgumentException(interval_ms + " does not correspond to a "
+ "an interval that can be aligned to the calendar.");
}
}

/** Returns a timestamp corresponding to the end of the interval
* in which the specified timestamp occurs, aligned to the calendar
* based on the timezone. */
private long toEndOfInterval(long timestamp) {
if (interval_ms % ONE_YEAR_INTERVAL == 0) {
final long multiplier = interval_ms / ONE_YEAR_INTERVAL;
long result = timestamp;
for (long i = 0; i < multiplier; i++) {
result = toEndOfYear(result, timezone) + 1;
}
return result - 1;
} else if (interval_ms % ONE_MONTH_INTERVAL == 0) {
final long multiplier = interval_ms / ONE_MONTH_INTERVAL;
long result = timestamp;
for (long i = 0; i < multiplier; i++) {
result = toEndOfMonth(result, timezone) + 1;
}
return result - 1;
} else if (interval_ms % ONE_WEEK_INTERVAL == 0) {
final long multiplier = interval_ms / ONE_WEEK_INTERVAL;
long result = timestamp;
for (long i = 0; i < multiplier; i++) {
result = toEndOfWeek(result, timezone) + 1;
}
return result - 1;
} else if (interval_ms % ONE_DAY_INTERVAL == 0) {
final long multiplier = interval_ms / ONE_DAY_INTERVAL;
long result = timestamp;
for (long i = 0; i < multiplier; i++) {
result = toEndOfDay(result, timezone) + 1;
}
return result - 1;
} else {
throw new IllegalArgumentException(interval_ms + " does not correspond to a "
+ "an interval that can be aligned to the calendar.");
}
}


// ---------------------- //
// Doubles interface //
Expand Down
27 changes: 25 additions & 2 deletions src/core/FillingDownsampler.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package net.opentsdb.core;

import java.util.NoSuchElementException;
import java.util.TimeZone;

/**
* A specialized downsampler that returns special values, based on the fill
Expand All @@ -30,7 +31,7 @@ public class FillingDownsampler extends Downsampler {
protected final FillPolicy fill_policy;

/**
* Create a new nulling downsampler.
* Ctor (preserved for backward compatibility).
* @param source The iterator to access the underlying data.
* @param start_time The time in milliseconds at which the data begins.
* @param end_time The time in milliseconds at which the data ends.
Expand All @@ -44,8 +45,30 @@ public class FillingDownsampler extends Downsampler {
FillingDownsampler(final SeekableView source, final long start_time,
final long end_time, final long interval_ms,
final Aggregator downsampler, final FillPolicy fill_policy) {
this(source, start_time, end_time, interval_ms, downsampler, fill_policy, null, false);
}


/**
* Create a new nulling downsampler.
* @param source The iterator to access the underlying data.
* @param start_time The time in milliseconds at which the data begins.
* @param end_time The time in milliseconds at which the data ends.
* @param interval_ms The interval in milli seconds wanted between each data
* point.
* @param downsampler The downsampling function to use.
* @param fill_policy Policy specifying whether to interpolate or to fill
* missing intervals with special values.
* @param timezone The timezone to use for aligning intervals based on the calendar.
* @throws IllegalArgumentException if fill_policy is interpolation.
*/
FillingDownsampler(final SeekableView source, final long start_time,
final long end_time, final long interval_ms,
final Aggregator downsampler, final FillPolicy fill_policy,
final TimeZone timezone,
final boolean use_calendar) {
// Lean on the superclass implementation.
super(source, interval_ms, downsampler);
super(source, interval_ms, downsampler, timezone, use_calendar);

// Ensure we aren't given a bogus fill policy.
if (FillPolicy.NONE == fill_policy) {
Expand Down
Loading

0 comments on commit 781c4f3

Please sign in to comment.