Skip to content

Commit

Permalink
Merge pull request OpenTSDB#29 from hi3g/datapoints_no_tsdb
Browse files Browse the repository at this point in the history
No TSDB reference to the DataPoints implementors
  • Loading branch information
jzeeck committed Aug 7, 2014
2 parents 5f24a7a + 8e75374 commit d14a01c
Show file tree
Hide file tree
Showing 16 changed files with 594 additions and 665 deletions.
44 changes: 7 additions & 37 deletions src/core/DataPoints.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,61 +25,31 @@
* Implementations of this interface aren't expected to be synchronized.
*/
public interface DataPoints extends Iterable<DataPoint> {

/**
* Returns the name of the series.
* Returns the metric id of the series.
*/
String metricName();

/**
* Returns the name of the series.
* @since 1.2
*/
Deferred<String> metricNameAsync();
byte[] metric();

/**
* Returns the tags associated with these data points.
* @return A non-{@code null} map of tag names (keys), tag values (values).
*/
Map<String, String> getTags();

/**
* Returns the tags associated with these data points.
* @return A non-{@code null} map of tag names (keys), tag values (values).
* @since 1.2
* Returns the tags that are common to all the data points that are
* represented by this instance (the intersection set).
*/
Deferred<Map<String, String>> getTagsAsync();
Map<byte[], byte[]> tags();

/**
* Returns the tags associated with some but not all of the data points.
* <p>
* When this instance represents the aggregation of multiple time series
* (same metric but different tags), {@link #getTags} returns the tags that
* are common to all data points (intersection set) whereas this method
* returns all the tags names that are not common to all data points (union
* set minus the intersection set, also called the symmetric difference).
* <p>
* If this instance does not represent an aggregation of multiple time
* series, the list returned is empty.
* @return A non-{@code null} list of tag names.
*/
List<String> getAggregatedTags();

/**
* Returns the tags associated with some but not all of the data points.
* <p>
* When this instance represents the aggregation of multiple time series
* (same metric but different tags), {@link #getTags} returns the tags that
* (same metric but different tags), {@link #tags()} returns the tags that
* are common to all data points (intersection set) whereas this method
* returns all the tags names that are not common to all data points (union
* set minus the intersection set, also called the symmetric difference).
* <p>
* If this instance does not represent an aggregation of multiple time
* series, the list returned is empty.
* @return A non-{@code null} list of tag names.
* @since 1.2
*/
Deferred<List<String>> getAggregatedTagsAsync();
List<byte[]> aggregatedTags();

/**
* Returns a list of unique TSUIDs contained in the results
Expand Down
59 changes: 23 additions & 36 deletions src/core/IncomingDataPoints.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;
import java.util.Map;

import com.google.common.base.Preconditions;
import com.stumbleupon.async.Deferred;

import org.hbase.async.Bytes;
Expand All @@ -27,6 +28,8 @@
import net.opentsdb.stats.Histogram;
import net.opentsdb.uid.UidFormatter;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* Receives new data points and stores them in HBase.
*/
Expand Down Expand Up @@ -282,47 +285,31 @@ public void setBatchImport(final boolean batchornot) {
}
}

public String metricName() {
try {
return metricNameAsync().joinUninterruptibly();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Should never be here", e);
}
}

public Deferred<String> metricNameAsync() {
if (row == null) {
throw new IllegalStateException("setSeries never called before!");
}
final byte[] id = Arrays.copyOfRange(row, 0, tsdb.metrics.width());
return tsdb.metrics.getNameAsync(id);
/**
* @see DataPoints#metric()
*/
@Override
public byte[] metric() {
checkNotNull(row, "setSeries never called before!");
return RowKey.metric(row);
}

public Map<String, String> getTags() {
try {
return getTagsAsync().joinUninterruptibly();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Should never be here", e);
}
}

public Deferred<Map<String, String>> getTagsAsync() {
Map<byte[], byte[]> tag_ids = RowKey.tags(row);
return new UidFormatter(tsdb).formatTags(tag_ids);
/**
* @see DataPoints#tags()
*/
@Override
public Map<byte[], byte[]> tags() {
checkNotNull(row, "setSeries never called before!");
return RowKey.tags(row);
}

public List<String> getAggregatedTags() {
/**
* @see DataPoints#aggregatedTags()
*/
@Override
public List<byte[]> aggregatedTags() {
return Collections.emptyList();
}

public Deferred<List<String>> getAggregatedTagsAsync() {
final List<String> empty = Collections.emptyList();
return Deferred.fromResult(empty);
}

public List<String> getTSUIDs() {
return Collections.emptyList();
Expand Down Expand Up @@ -390,7 +377,7 @@ public double doubleValue(final int i) {
public String toString() {
// The argument passed to StringBuilder is a pretty good estimate of the
// length of the final string based on the row key and number of elements.
final String metric = metricName();
final String metric = Arrays.toString(metric());
final StringBuilder buf = new StringBuilder(80 + metric.length()
+ row.length * 4 + size * 16);
final long base_time = baseTime();
Expand Down
85 changes: 26 additions & 59 deletions src/core/RowSeq.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@
import java.util.NoSuchElementException;

import net.opentsdb.meta.Annotation;
import net.opentsdb.uid.UidFormatter;

import org.hbase.async.Bytes;
import org.hbase.async.KeyValue;

import com.stumbleupon.async.Deferred;

/**
* Represents a read-only sequence of continuous HBase rows.
* <p>
Expand All @@ -37,10 +34,6 @@
* for the values. Access is granted via pointers.
*/
final class RowSeq implements DataPoints {

/** The {@link TSDB} instance we belong to. */
private final TSDB tsdb;

/** First row key. */
byte[] key;

Expand All @@ -59,10 +52,8 @@ final class RowSeq implements DataPoints {

/**
* Constructor.
* @param tsdb The TSDB we belong to.
*/
RowSeq(final TSDB tsdb) {
this.tsdb = tsdb;
RowSeq() {
}

/**
Expand Down Expand Up @@ -266,49 +257,30 @@ static double extractFloatingPointValue(final byte[] values,
+ Arrays.toString(values));
}

public String metricName() {
try {
return metricNameAsync().joinUninterruptibly();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Should never be here", e);
}
/**
* @see DataPoints#metric()
*/
@Override
public byte[] metric() {
return RowKey.metric(key);
}

public Deferred<String> metricNameAsync() {
if (key == null) {
throw new IllegalStateException("the row key is null!");
}
byte[] metric_id = RowKey.metric(key);
return new UidFormatter(tsdb).formatMetric(metric_id);
}

public Map<String, String> getTags() {
try {
return getTagsAsync().joinUninterruptibly();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Should never be here", e);
}
}

public Deferred<Map<String, String>> getTagsAsync() {
Map<byte[], byte[]> tag_ids = RowKey.tags(key);
return new UidFormatter(tsdb).formatTags(tag_ids);
/**
* @see DataPoints#tags()
*/
@Override
public Map<byte[],byte[]> tags() {
return RowKey.tags(key);
}

/** @return an empty list since aggregated tags cannot exist on a single row */
public List<String> getAggregatedTags() {
/**
* @see DataPoints#aggregatedTags()
*/
@Override
public List<byte[]> aggregatedTags() {
return Collections.emptyList();
}

public Deferred<List<String>> getAggregatedTagsAsync() {
final List<String> empty = Collections.emptyList();
return Deferred.fromResult(empty);
}

public List<String> getTSUIDs() {
return Collections.emptyList();
}
Expand Down Expand Up @@ -357,11 +329,6 @@ Iterator internalIterator() {
return new Iterator();
}

/** Extracts the base timestamp from the row key. */
long baseTime() {
return Bytes.getUnsignedInt(key, tsdb.metrics.width());
}

/** @throws IndexOutOfBoundsException if {@code i} is out of bounds. */
private void checkIndex(final int i) {
if (i >= size()) {
Expand All @@ -384,17 +351,17 @@ public long timestamp(final int i) {
int index = 0;
for (int idx = 0; idx < qualifiers.length; idx += 2) {
if (i == index) {
return Internal.getTimestampFromQualifier(qualifiers, baseTime(), idx);
return Internal.getTimestampFromQualifier(qualifiers, RowKey.baseTime(key), idx);
}
if (Internal.inMilliseconds(qualifiers[idx])) {
idx += 2;
}
index++;
}
} else if ((qualifiers[0] & Const.MS_BYTE_FLAG) == Const.MS_BYTE_FLAG) {
return Internal.getTimestampFromQualifier(qualifiers, baseTime(), i * 4);
return Internal.getTimestampFromQualifier(qualifiers, RowKey.baseTime(key), i * 4);
} else {
return Internal.getTimestampFromQualifier(qualifiers, baseTime(), i * 2);
return Internal.getTimestampFromQualifier(qualifiers, RowKey.baseTime(key), i * 2);
}

throw new RuntimeException(
Expand Down Expand Up @@ -451,12 +418,12 @@ public double doubleValue(int i) {
public String toString() {
// The argument passed to StringBuilder is a pretty good estimate of the
// length of the final string based on the row key and number of elements.
final String metric = metricName();
final String metric = Arrays.toString(metric());
final int size = size();
final StringBuilder buf = new StringBuilder(80 + metric.length()
+ key.length * 4
+ size * 16);
final long base_time = baseTime();
final long base_time = RowKey.baseTime(key);
buf.append("RowSeq(")
.append(key == null ? "<null>" : Arrays.toString(key))
.append(" (metric=")
Expand Down Expand Up @@ -498,10 +465,10 @@ public String toString() {
*/
public static final class RowSeqComparator implements Comparator<RowSeq> {
public int compare(final RowSeq a, final RowSeq b) {
if (a.baseTime() == b.baseTime()) {
if (RowKey.baseTime(a.key) == RowKey.baseTime(b.key)) {
return 0;
}
return a.baseTime() < b.baseTime() ? -1 : 1;
return RowKey.baseTime(a.key) < RowKey.baseTime(b.key) ? -1 : 1;
}
}

Expand All @@ -518,7 +485,7 @@ final class Iterator implements SeekableView, DataPoint {
private int value_index;

/** Pre-extracted base time of this row sequence. */
private final long base_time = baseTime();
private final long base_time = RowKey.baseTime(key);

Iterator() {
}
Expand Down
Loading

0 comments on commit d14a01c

Please sign in to comment.