Skip to content

Commit

Permalink
Merge pull request OpenTSDB#30 from hi3g/consolidate_basetime
Browse files Browse the repository at this point in the history
Consolidate basetime calculation into one place
  • Loading branch information
jzeeck committed Aug 7, 2014
2 parents d14a01c + 7efc472 commit e1ecba8
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 46 deletions.
35 changes: 11 additions & 24 deletions src/core/IncomingDataPoints.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import net.opentsdb.meta.Annotation;
import net.opentsdb.stats.Histogram;
import net.opentsdb.storage.hbase.HBaseStore;
import net.opentsdb.uid.UidFormatter;

import static com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -123,22 +124,20 @@ public void setSeries(final String metric, final Map<String, String> tags) {

/**
* Updates the base time in the row key.
* @param timestamp The timestamp from which to derive the new base time.
* @param base_time The new base time
* @return The updated base time.
*/
private long updateBaseTime(final long timestamp) {
private void updateBaseTime(final long base_time) {
// We force the starting timestamp to be on a MAX_TIMESPAN boundary
// so that all TSDs create rows with the same base time. Otherwise
// we'd need to coordinate TSDs to avoid creating rows that cover
// overlapping time periods.
final long base_time = timestamp - (timestamp % Const.MAX_TIMESPAN);
// Clone the row key since we're going to change it. We must clone it
// because the TsdbStore may still hold a reference to it in its
// internal datastructures.
row = Arrays.copyOf(row, row.length);
Bytes.setInt(row, (int) base_time, tsdb.metrics.width());
tsdb.getTsdbStore().scheduleForCompaction(row);
return base_time;
}

/**
Expand Down Expand Up @@ -170,21 +169,14 @@ private Deferred<Object> addPointInternal(final long timestamp, final byte[] val
+ " when trying to add value=" + Arrays.toString(value)
+ " to " + this);
}
last_ts = (ms_timestamp ? timestamp : timestamp * 1000);

long base_time = baseTime();
long incoming_base_time;
if (ms_timestamp) {
// drop the ms timestamp to seconds to calculate the base timestamp
incoming_base_time = ((timestamp / 1000) -
((timestamp / 1000) % Const.MAX_TIMESPAN));
} else {
incoming_base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN));
}

last_ts = (ms_timestamp ? timestamp : timestamp * 1000);

long base_time = RowKey.baseTime(row);
long incoming_base_time = HBaseStore.buildBaseTime(timestamp);

if (incoming_base_time - base_time >= Const.MAX_TIMESPAN) {
// Need to start a new row as we've exceeded Const.MAX_TIMESPAN.
base_time = updateBaseTime((ms_timestamp ? timestamp / 1000: timestamp));
updateBaseTime(incoming_base_time);
}

// Java is so stupid with its auto-promotion of int to float.
Expand Down Expand Up @@ -226,11 +218,6 @@ private void grow() {
qualifiers = Arrays.copyOf(qualifiers, new_size);
}

/** Extracts the base timestamp from the row key. */
private long baseTime() {
return Bytes.getUnsignedInt(row, tsdb.metrics.width());
}

public Deferred<Object> addPoint(final long timestamp, final long value) {
final byte[] v;
if (Byte.MIN_VALUE <= value && value <= Byte.MAX_VALUE) {
Expand Down Expand Up @@ -349,7 +336,7 @@ private static short delta(final short qualifier) {

public long timestamp(final int i) {
checkIndex(i);
return baseTime() + (delta(qualifiers[i]) & 0xFFFF);
return RowKey.baseTime(row) + (delta(qualifiers[i]) & 0xFFFF);
}

public boolean isInteger(final int i) {
Expand Down Expand Up @@ -380,7 +367,7 @@ public String toString() {
final String metric = Arrays.toString(metric());
final StringBuilder buf = new StringBuilder(80 + metric.length()
+ row.length * 4 + size * 16);
final long base_time = baseTime();
final long base_time = RowKey.baseTime(row);
buf.append("IncomingDataPoints(")
.append(row == null ? "<null>" : Arrays.toString(row))
.append(" (metric=")
Expand Down
11 changes: 3 additions & 8 deletions src/core/RowKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.Arrays;
import java.util.Map;

import net.opentsdb.storage.hbase.HBaseStore;

import org.hbase.async.Bytes;

import com.stumbleupon.async.Deferred;
Expand Down Expand Up @@ -81,14 +83,7 @@ public static Map<byte[], byte[]> tags(final byte[] row) {
*/
public static byte[] rowKeyFromTSUID(final TSDB tsdb, final byte[] tsuid,
final long timestamp) {
final long base_time;
if ((timestamp & Const.SECOND_MASK) != 0) {
// drop the ms timestamp to seconds to calculate the base timestamp
base_time = ((timestamp / 1000) -
((timestamp / 1000) % Const.MAX_TIMESPAN));
} else {
base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN));
}
final long base_time = HBaseStore.buildBaseTime(timestamp);
final byte[] row = new byte[tsuid.length + Const.TIMESTAMP_BYTES];
System.arraycopy(tsuid, 0, row, 0, Const.METRICS_WIDTH);
Bytes.setInt(row, (int) base_time, Const.METRICS_WIDTH);
Expand Down
2 changes: 1 addition & 1 deletion src/core/RowSeq.java
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ public String toString() {

/**
* Used to compare two RowSeq objects when sorting a {@link Span}. Compares
* on the {@code RowSeq#baseTime()}
* on {@link net.opentsdb.core.RowKey#baseTime(byte[])}.
* @since 2.0
*/
public static final class RowSeqComparator implements Comparator<RowSeq> {
Expand Down
10 changes: 1 addition & 9 deletions src/core/TSDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -719,15 +719,7 @@ class RowKeyCB implements Callback<Deferred<Object>, byte[]> {
public Deferred<Object> call(byte[] row) throws Exception {
final byte[] qualifier = Internal.buildQualifier(timestamp, flags);

final long base_time;
if ((timestamp & Const.SECOND_MASK) != 0) {
// drop the ms timestamp to seconds to calculate the base timestamp
base_time = ((timestamp / 1000) -
((timestamp / 1000) % Const.MAX_TIMESPAN));
} else {
base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN));
}

final long base_time = HBaseStore.buildBaseTime(timestamp);
Bytes.setInt(row, (int) base_time, metrics.width());

// TODO(tsuna): Add a callback to time the latency of HBase and store the
Expand Down
5 changes: 3 additions & 2 deletions src/storage/hbase/CompactionQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import net.opentsdb.core.Const;
import net.opentsdb.core.IllegalDataException;
import net.opentsdb.core.Internal;
import net.opentsdb.core.RowKey;
import net.opentsdb.storage.TsdbStore;
import net.opentsdb.utils.Config;
import org.hbase.async.*;
Expand Down Expand Up @@ -170,7 +171,7 @@ private Deferred<ArrayList<Object>> flush(final long cut_off, int maxflushes) {
if (seed == row.hashCode() % 3) {
continue;
}
final long base_time = Bytes.getUnsignedInt(row, Const.METRICS_WIDTH);
final long base_time = RowKey.baseTime(row);
if (base_time > cut_off) {
break;
} else if (nflushes == MAX_CONCURRENT_FLUSHES) {
Expand Down Expand Up @@ -355,7 +356,7 @@ public Deferred<Object> compact() {

if (compacted != null) { // Caller is interested in the compacted form.
compacted[0] = compact;
final long base_time = Bytes.getUnsignedInt(compact.key(), Const.METRICS_WIDTH);
final long base_time = RowKey.baseTime(compact.key());
final long cut_off = System.currentTimeMillis() / 1000
- Const.MAX_TIMESPAN - 1;
if (base_time > cut_off) { // If row is too recent...
Expand Down
3 changes: 1 addition & 2 deletions src/tools/Fsck.java
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,7 @@ private void fsckRow(final ArrayList<KeyValue> row,
return;
}

final long base_time = Bytes.getUnsignedInt(row.get(0).key(),
Const.METRICS_WIDTH);
final long base_time = RowKey.baseTime(row.get(0).key());

for (final KeyValue kv : row) {
kvs_processed.getAndIncrement();
Expand Down

0 comments on commit e1ecba8

Please sign in to comment.