Skip to content

Commit

Permalink
fix: use tsid hashing everywhere, adapt downsampling remove decodeTsi…
Browse files Browse the repository at this point in the history
…d dependency
  • Loading branch information
salvatore-campagna committed Nov 22, 2023
1 parent 103b1f8 commit 1044c8a
Show file tree
Hide file tree
Showing 19 changed files with 375 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private List<InternalBucket> randomBuckets(boolean keyed, InternalAggregations a
builder.addKeywordDimension(entry.getKey(), (String) entry.getValue());
}
try {
var key = builder.withoutHash().toBytesRef();
var key = builder.withHash().toBytesRef();
bucketList.add(new InternalBucket(key, docCount, aggregations, keyed));
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.elasticsearch.aggregations.bucket.AggregationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
Expand All @@ -38,7 +33,6 @@
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
import org.elasticsearch.search.aggregations.metrics.Sum;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.test.index.IndexVersionUtils;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -72,12 +66,36 @@ public void testStandAloneTimeSeriesWithSum() throws IOException {
}, ts -> {
assertThat(ts.getBuckets(), hasSize(3));

assertThat(ts.getBucketByKey("{dim1=aaa, dim2=xxx}").docCount, equalTo(2L));
assertThat(((Sum) ts.getBucketByKey("{dim1=aaa, dim2=xxx}").getAggregations().get("sum")).value(), equalTo(6.0));
assertThat(ts.getBucketByKey("{dim1=aaa, dim2=yyy}").docCount, equalTo(2L));
assertThat(((Sum) ts.getBucketByKey("{dim1=aaa, dim2=yyy}").getAggregations().get("sum")).value(), equalTo(8.0));
assertThat(ts.getBucketByKey("{dim1=bbb, dim2=zzz}").docCount, equalTo(4L));
assertThat(((Sum) ts.getBucketByKey("{dim1=bbb, dim2=zzz}").getAggregations().get("sum")).value(), equalTo(22.0));
assertThat(
ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAAAS_0VrZGalylFPi9dkK4dYyY9g0yybS6o").docCount,
equalTo(4L)
);
assertThat(
((Sum) ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAAAS_0VrZGalylFPi9dkK4dYyY9g0yybS6o")
.getAggregations()
.get("sum")).value(),
equalTo(22.0)
);
assertThat(
ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyfgM9Vvd-IRpsCvXNT5j_dX4tz0qg").docCount,
equalTo(2L)
);
assertThat(
((Sum) ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyfgM9Vvd-IRpsCvXNT5j_dX4tz0qg")
.getAggregations()
.get("sum")).value(),
equalTo(6.0)
);
assertThat(
ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyftxCMQuGv-XOPz9J6bZM-ZhUGnV4").docCount,
equalTo(2L)
);
assertThat(
((Sum) ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyftxCMQuGv-XOPz9J6bZM-ZhUGnV4")
.getAggregations()
.get("sum")).value(),
equalTo(8.0)
);

},
new KeywordFieldMapper.KeywordFieldType("dim1"),
Expand Down Expand Up @@ -107,7 +125,7 @@ public static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimens
fields.add(new DoubleDocValuesField(metrics[i].toString(), (double) metrics[i + 1]));
}
}
fields.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, builder.withoutHash().toBytesRef()));
fields.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, builder.withHash().toBytesRef()));
// TODO: Handle metrics
iw.addDocument(fields);
}
Expand Down Expand Up @@ -158,23 +176,38 @@ public void testMultiBucketAggregationAsSubAggregation() throws IOException {
Consumer<InternalTimeSeries> verifier = ts -> {
assertThat(ts.getBuckets(), hasSize(3));

assertThat(ts.getBucketByKey("{dim1=aaa, dim2=xxx}").docCount, equalTo(2L));
InternalDateHistogram byTimeStampBucket = ts.getBucketByKey("{dim1=aaa, dim2=xxx}").getAggregations().get("by_timestamp");
assertThat(
ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAAAS_0VrZGalylFPi9dkK4dYyY9g0yybS6o").docCount,
equalTo(4L)
);
InternalDateHistogram byTimeStampBucket = ts.getBucketByKey(
"C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAAAS_0VrZGalylFPi9dkK4dYyY9g0yybS6o"
).getAggregations().get("by_timestamp");
assertThat(
byTimeStampBucket.getBuckets(),
contains(new InternalDateHistogram.Bucket(startTime, 2, false, null, InternalAggregations.EMPTY))
contains(new InternalDateHistogram.Bucket(startTime, 4, false, null, InternalAggregations.EMPTY))
);
assertThat(ts.getBucketByKey("{dim1=aaa, dim2=yyy}").docCount, equalTo(2L));
byTimeStampBucket = ts.getBucketByKey("{dim1=aaa, dim2=yyy}").getAggregations().get("by_timestamp");
assertThat(
ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyfgM9Vvd-IRpsCvXNT5j_dX4tz0qg").docCount,
equalTo(2L)
);
byTimeStampBucket = ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyfgM9Vvd-IRpsCvXNT5j_dX4tz0qg")
.getAggregations()
.get("by_timestamp");
assertThat(
byTimeStampBucket.getBuckets(),
contains(new InternalDateHistogram.Bucket(startTime, 2, false, null, InternalAggregations.EMPTY))
);
assertThat(ts.getBucketByKey("{dim1=bbb, dim2=zzz}").docCount, equalTo(4L));
byTimeStampBucket = ts.getBucketByKey("{dim1=bbb, dim2=zzz}").getAggregations().get("by_timestamp");
assertThat(
ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyftxCMQuGv-XOPz9J6bZM-ZhUGnV4").docCount,
equalTo(2L)
);
byTimeStampBucket = ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyftxCMQuGv-XOPz9J6bZM-ZhUGnV4")
.getAggregations()
.get("by_timestamp");
assertThat(
byTimeStampBucket.getBuckets(),
contains(new InternalDateHistogram.Bucket(startTime, 4, false, null, InternalAggregations.EMPTY))
contains(new InternalDateHistogram.Bucket(startTime, 2, false, null, InternalAggregations.EMPTY))
);
};

Expand All @@ -191,9 +224,24 @@ public void testAggregationSize() throws IOException {

List<Consumer<InternalTimeSeries>> verifiers = new ArrayList<Consumer<InternalTimeSeries>>();

verifiers.add(ts -> assertThat(ts.getBucketByKey("{dim1=aaa, dim2=xxx}").docCount, equalTo(2L)));
verifiers.add(ts -> assertThat(ts.getBucketByKey("{dim1=aaa, dim2=yyy}").docCount, equalTo(2L)));
verifiers.add(ts -> assertThat(ts.getBucketByKey("{dim1=bbb, dim2=zzz}").docCount, equalTo(4L)));
verifiers.add(
ts -> assertThat(
ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAAAS_0VrZGalylFPi9dkK4dYyY9g0yybS6o").docCount,
equalTo(4L)
)
);
verifiers.add(
ts -> assertThat(
ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyfgM9Vvd-IRpsCvXNT5j_dX4tz0qg").docCount,
equalTo(2L)
)
);
verifiers.add(
ts -> assertThat(
ts.getBucketByKey("C0uMhZuLQSpAQ5ipTZFLMgAAAAAAAAAAAAAAAAAAAABjzNyftxCMQuGv-XOPz9J6bZM-ZhUGnV4").docCount,
equalTo(2L)
)
);

for (int i = 1; i <= 3; i++) {
int size = i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ setup:
location: swamp
temperature: 32.4
humidity: 88.9
- match: { _id: crxuhC8WO3aVdhvtAAABiHD35_g }
- match: { _id: crxuhDzuYSDy965GAAABiHD35_g }
- match: { result: created }
- match: { _version: 1 }

- do:
delete:
index: weather_sensors
id: crxuhC8WO3aVdhvtAAABiHD35_g
- match: { _id: crxuhC8WO3aVdhvtAAABiHD35_g }
id: crxuhDzuYSDy965GAAABiHD35_g
- match: { _id: crxuhDzuYSDy965GAAABiHD35_g }
- match: { result: deleted }
- match: { _version: 2 }

Expand All @@ -74,6 +74,6 @@ setup:
location: swamp
temperature: 32.4
humidity: 88.9
- match: { _id: crxuhC8WO3aVdhvtAAABiHD35_g }
- match: { _id: crxuhDzuYSDy965GAAABiHD35_g }
- match: { result: created }
- match: { _version: 3 }
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.StringHelper;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.hash.Murmur3Hasher;
import org.elasticsearch.common.hash.MurmurHash3;
Expand All @@ -37,13 +36,10 @@

import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
Expand All @@ -58,9 +54,7 @@ public class TimeSeriesIdFieldMapper extends MetadataFieldMapper {
public static final String CONTENT_TYPE = "_tsid";
public static final TimeSeriesIdFieldType FIELD_TYPE = new TimeSeriesIdFieldType();
public static final TimeSeriesIdFieldMapper INSTANCE = new TimeSeriesIdFieldMapper();

// NOTE: used by {@link TimeSeriesIdFieldMapper#decodeTsid(StreamInput)} )}. Remove both if using _tsid hashing
public static final int TSID_HASH_SENTINEL = 0xBAADCAFE;
private static final Base64.Encoder BASE64_ENCODER = Base64.getUrlEncoder().withoutPadding();

@Override
public FieldMapper.Builder getMergeBuilder() {
Expand Down Expand Up @@ -164,31 +158,9 @@ public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() {
*/
public static Object decodeTsid(StreamInput in) {
try {
int sizeOrTsidHashSentinel = in.readVInt();
if (sizeOrTsidHashSentinel == TSID_HASH_SENTINEL) {
return Base64.getUrlEncoder().withoutPadding().encodeToString(in.readBytesRef().bytes);
}
Map<String, Object> result = new LinkedHashMap<>(sizeOrTsidHashSentinel);

for (int i = 0; i < sizeOrTsidHashSentinel; i++) {
String name = in.readBytesRef().utf8ToString();

int type = in.read();
switch (type) {
case (byte) 's' -> // parse a string
result.put(name, in.readBytesRef().utf8ToString());
case (byte) 'l' -> // parse a long
result.put(name, in.readLong());
case (byte) 'u' -> { // parse an unsigned_long
Object ul = DocValueFormat.UNSIGNED_LONG_SHIFTED.format(in.readLong());
result.put(name, ul);
}
default -> throw new IllegalArgumentException("Cannot parse [" + name + "]: Unknown type [" + type + "]");
}
}
return result;
} catch (IOException | IllegalArgumentException e) {
throw new IllegalArgumentException("Error formatting " + NAME + ": " + e.getMessage(), e);
return base64Encode(in.readBytesRef());
} catch (IOException e) {
throw new IllegalArgumentException("Unable to read tsid");
}
}

Expand Down Expand Up @@ -250,21 +222,14 @@ public BytesReference withHash() throws IOException {
// NOTE: hash all dimension field names
int numberOfDimensions = Math.min(MAX_DIMENSIONS, dimensions.size());
int tsidHashIndex = 0;
byte[] tsidHash = new byte[16 + 16 + 16 + 4 * numberOfDimensions];
byte[] tsidHash = new byte[16 + 16 + 4 * numberOfDimensions];

tsidHasher.reset();
for (final DimensionDataHolder dimension : dimensions) {
tsidHasher.update(dimension.name.bytes);
}
tsidHashIndex = writeHash128(tsidHasher.digestHash(), tsidHash, tsidHashIndex);

// NOTE: hash all metric field names
tsidHasher.reset();
for (final String metric : metrics) {
tsidHasher.update(metric.getBytes(StandardCharsets.UTF_8));
}
tsidHashIndex = writeHash128(tsidHasher.digestHash(), tsidHash, tsidHashIndex);

// NOTE: concatenate all dimension value hashes up to a certain number of dimensions
int tsidHashStartIndex = tsidHashIndex;
for (final DimensionDataHolder dimension : dimensions) {
Expand Down Expand Up @@ -292,7 +257,6 @@ public BytesReference withHash() throws IOException {

assert tsidHashIndex == tsidHash.length;
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeVInt(TSID_HASH_SENTINEL);
out.writeBytesRef(new BytesRef(tsidHash, 0, tsidHash.length));
return out.bytes();
}
Expand Down Expand Up @@ -374,10 +338,10 @@ private void add(String fieldName, BytesReference encoded) throws IOException {
}

public static Object decodeTsid(BytesRef bytesRef) {
try (StreamInput input = new BytesArray(bytesRef).streamInput()) {
return decodeTsid(input);
} catch (IOException ex) {
throw new IllegalArgumentException("Dimension field cannot be deserialized.", ex);
}
return base64Encode(bytesRef);
}

private static String base64Encode(BytesRef bytesRef) {
return BASE64_ENCODER.encodeToString(bytesRef.bytes);
}
}
20 changes: 16 additions & 4 deletions server/src/main/java/org/elasticsearch/search/DocValueFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.lucene.document.InetAddressPoint;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -21,7 +20,6 @@
import org.elasticsearch.common.time.DateMathParser;
import org.elasticsearch.geometry.utils.Geohash;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper.TimeSeriesIdBuilder;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;

Expand Down Expand Up @@ -694,11 +692,25 @@ public String toString() {

@Override
public Object format(BytesRef value) {
return TimeSeriesIdFieldMapper.decodeTsid(new BytesArray(value).streamInput());
return Base64.getUrlEncoder().withoutPadding().encodeToString(value.bytes);
}

@Override
public BytesRef parseBytesRef(Object value) {
if (value instanceof BytesRef) {
return (BytesRef) value;
}
return plainTsidParseBytesRef(value);
}

/**
* After introducing tsid hashing this tsid parsing logic is deprecated.
* Tsid hashing does not allow us to parse the tsid extracting dimension fields key/values pairs.
* @param value The Map encoding tsid dimension fields key/value pairs.
*
* @return
*/
private BytesRef plainTsidParseBytesRef(Object value) {
if (value instanceof Map<?, ?> == false) {
throw new IllegalArgumentException("Cannot parse tsid object [" + value + "]");
}
Expand All @@ -724,7 +736,7 @@ public BytesRef parseBytesRef(Object value) {
}

try {
return builder.withHash().toBytesRef();
return builder.withoutHash().toBytesRef();
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,19 @@ public void testSynthesizeIdSimple() throws Exception {

long startTime = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2023-01-01T00:00:00Z");
List<Doc> docs = List.of(
new Doc(startTime, List.of(new Dimension("dim1", "aaa"), new Dimension("dim2", "xxx"))),
new Doc(startTime + 1, List.of(new Dimension("dim1", "aaa"), new Dimension("dim2", "yyy"))),
new Doc(startTime, List.of(new Dimension("dim1", "aaa"), new Dimension("dim2", "yyy"))),
new Doc(startTime + 1, List.of(new Dimension("dim1", "aaa"), new Dimension("dim2", "xxx"))),
new Doc(startTime + 2, List.of(new Dimension("dim1", "bbb"), new Dimension("dim2", "xxx")))
);
CheckedConsumer<IndexReader, IOException> verify = indexReader -> {
assertThat(indexReader.leaves(), hasSize(1));
LeafReader leafReader = indexReader.leaves().get(0).reader();
assertThat(leafReader.numDocs(), equalTo(3));
var leaf = idLoader.leaf(null, leafReader, new int[] { 0, 1, 2 });
assertThat(leaf.getId(0), equalTo(expectedId(routing, docs.get(0))));
// NOTE: time series data is ordered by (tsid, timestamp)
assertThat(leaf.getId(0), equalTo(expectedId(routing, docs.get(2))));
assertThat(leaf.getId(1), equalTo(expectedId(routing, docs.get(1))));
assertThat(leaf.getId(2), equalTo(expectedId(routing, docs.get(2))));
assertThat(leaf.getId(2), equalTo(expectedId(routing, docs.get(0))));
};
prepareIndexReader(indexAndForceMerge(routing, docs), verify, false);
}
Expand Down Expand Up @@ -234,7 +235,7 @@ private static void indexDoc(IndexRouting.ExtractFromSource routing, IndexWriter
fields.add(new SortedSetDocValuesField(dimension.field, new BytesRef(dimension.value.toString())));
}
}
BytesRef tsid = builder.withoutHash().toBytesRef();
BytesRef tsid = builder.withHash().toBytesRef();
fields.add(new SortedDocValuesField(TimeSeriesIdFieldMapper.NAME, tsid));
iw.addDocument(fields);
}
Expand All @@ -252,7 +253,7 @@ private static String expectedId(IndexRouting.ExtractFromSource routing, Doc doc
return TsidExtractingIdFieldMapper.createId(
false,
routingBuilder,
timeSeriesIdBuilder.withoutHash().toBytesRef(),
timeSeriesIdBuilder.withHash().toBytesRef(),
doc.timestamp,
new byte[16]
);
Expand Down
Loading

0 comments on commit 1044c8a

Please sign in to comment.