Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an alternative serialization method #58

Merged
merged 15 commits into from
Jul 22, 2021
Merged
34 changes: 19 additions & 15 deletions src/jmh/java/com/datadoghq/sketch/ddsketch/DDSketchOption.java
Original file line number Diff line number Diff line change
@@ -6,30 +6,34 @@
package com.datadoghq.sketch.ddsketch;

import com.datadoghq.sketch.ddsketch.mapping.BitwiseLinearlyInterpolatedMapping;
import com.datadoghq.sketch.ddsketch.mapping.CubicallyInterpolatedMapping;
import com.datadoghq.sketch.ddsketch.mapping.IndexMapping;
import com.datadoghq.sketch.ddsketch.mapping.LogarithmicMapping;
import com.datadoghq.sketch.ddsketch.store.PaginatedStore;
import com.datadoghq.sketch.ddsketch.store.Store;
import com.datadoghq.sketch.ddsketch.store.UnboundedSizeDenseStore;
import java.util.function.DoubleFunction;
import java.util.function.Supplier;

public enum DDSketchOption {
FAST(
relativeAccuracy ->
new DDSketch(
new BitwiseLinearlyInterpolatedMapping(relativeAccuracy),
UnboundedSizeDenseStore::new)),
MEMORY_OPTIMAL(DDSketches::logarithmicUnboundedDense),
BALANCED(DDSketches::unboundedDense),
PAGINATED(
relativeAccuracy ->
new DDSketch(
new BitwiseLinearlyInterpolatedMapping(relativeAccuracy), PaginatedStore::new));
FAST(BitwiseLinearlyInterpolatedMapping::new, UnboundedSizeDenseStore::new),
MEMORY_OPTIMAL(LogarithmicMapping::new, UnboundedSizeDenseStore::new),
BALANCED(CubicallyInterpolatedMapping::new, UnboundedSizeDenseStore::new),
PAGINATED(BitwiseLinearlyInterpolatedMapping::new, PaginatedStore::new);

private final DoubleFunction<DDSketch> creator;
private final DoubleFunction<IndexMapping> indexMapping;
private final Supplier<Store> storeSupplier;

DDSketchOption(DoubleFunction<DDSketch> creator) {
this.creator = creator;
DDSketchOption(DoubleFunction<IndexMapping> indexMapping, Supplier<Store> storeSupplier) {
this.indexMapping = indexMapping;
this.storeSupplier = storeSupplier;
}

public DDSketch create(double relativeAccuracy) {
return creator.apply(relativeAccuracy);
return new DDSketch(indexMapping.apply(relativeAccuracy), storeSupplier);
}

public Supplier<Store> getStoreSupplier() {
return storeSupplier;
}
}
10 changes: 7 additions & 3 deletions src/jmh/java/com/datadoghq/sketch/ddsketch/Distributions.java
Original file line number Diff line number Diff line change
@@ -5,22 +5,26 @@

package com.datadoghq.sketch.ddsketch;

import java.util.concurrent.ThreadLocalRandom;
import java.util.Random;

public enum Distributions {
NORMAL {
@Override
protected Distribution create(double... parameters) {
return () -> parameters[1] * ThreadLocalRandom.current().nextGaussian() + parameters[0];
final Random random = new Random(seed);
return () -> parameters[1] * random.nextGaussian() + parameters[0];
}
},
POISSON {
@Override
protected Distribution create(double... parameters) {
return () -> -(Math.log(ThreadLocalRandom.current().nextDouble()) / parameters[0]);
final Random random = new Random(seed);
return () -> -(Math.log(random.nextDouble()) / parameters[0]);
}
};

private static final long seed = 5388928120325255124L;

public Distribution of(double... parameters) {
return create(parameters);
}
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
import com.datadoghq.sketch.ddsketch.DDSketch;
import com.datadoghq.sketch.ddsketch.DDSketchOption;
import com.datadoghq.sketch.ddsketch.DataGenerator;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.*;

@@ -29,7 +30,7 @@ public abstract class BuiltSketchState {
DDSketch sketch;

@Setup(Level.Trial)
public void init() {
public void init() throws IOException {
this.sketch = sketchOption.create(relativeAccuracy);
for (int i = 0; i < count; ++i) {
sketch.accept(unit.toNanos(Math.abs(Math.round(generator.nextValue()))));
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/* Unless explicitly stated otherwise all files in this repository are licensed under the Apache License 2.0.
* This product includes software developed at Datadog (https://www.datadoghq.com/).
* Copyright 2021 Datadog, Inc.
*/

package com.datadoghq.sketch.ddsketch.benchmarks;

import com.datadoghq.sketch.ddsketch.DDSketch;
import com.datadoghq.sketch.ddsketch.DDSketchProtoBinding;
import com.datadoghq.sketch.ddsketch.encoding.ByteArrayInput;
import com.datadoghq.sketch.ddsketch.encoding.GrowingByteArrayOutput;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Setup;

@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Mode.AverageTime)
public class Deserialize extends BuiltSketchState {

byte[] fromProtoData;
byte[] decodeData;
DDSketch decodedSketch;

@Setup(Level.Trial)
public void init() throws IOException {
super.init();
this.fromProtoData = DDSketchProtoBinding.toProto(sketch).toByteArray();
final GrowingByteArrayOutput output = GrowingByteArrayOutput.withDefaultInitialCapacity();
sketch.encode(output, false);
this.decodeData = output.trimmedCopy();
this.decodedSketch =
DDSketch.decode(ByteArrayInput.wrap(decodeData), sketchOption.getStoreSupplier());
}

@Benchmark
public DDSketch fromProto() throws InvalidProtocolBufferException {
return DDSketchProtoBinding.fromProto(
sketchOption.getStoreSupplier(),
com.datadoghq.sketch.ddsketch.proto.DDSketch.parseFrom(fromProtoData));
}

@Benchmark
public DDSketch decode() throws IOException {
return DDSketch.decode(ByteArrayInput.wrap(decodeData), sketchOption.getStoreSupplier());
}

@Benchmark
public DDSketch decodeReusing() throws IOException {
decodedSketch.clear();
decodedSketch.decodeAndMergeWith(ByteArrayInput.wrap(decodeData));
return decodedSketch;
}
}
Original file line number Diff line number Diff line change
@@ -6,16 +6,28 @@
package com.datadoghq.sketch.ddsketch.benchmarks;

import com.datadoghq.sketch.ddsketch.DDSketchProtoBinding;
import com.datadoghq.sketch.ddsketch.encoding.GrowingByteArrayOutput;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Setup;

@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Mode.AverageTime)
public class Serialize extends BuiltSketchState {

GrowingByteArrayOutput output;

@Setup(Level.Trial)
public void init() throws IOException {
super.init();
this.output = GrowingByteArrayOutput.withDefaultInitialCapacity();
}

@Benchmark
public byte[] serialize() {
return sketch.serialize().array();
@@ -25,4 +37,18 @@ public byte[] serialize() {
public byte[] toProto() {
return DDSketchProtoBinding.toProto(sketch).toByteArray();
}

@Benchmark
public byte[] encode() throws IOException {
final GrowingByteArrayOutput output = GrowingByteArrayOutput.withDefaultInitialCapacity();
sketch.encode(output, false);
return output.trimmedCopy();
}

@Benchmark
public byte[] encodeReusing() throws IOException {
output.clear();
sketch.encode(output, false);
return output.trimmedCopy();
}
}
108 changes: 103 additions & 5 deletions src/main/java/com/datadoghq/sketch/ddsketch/DDSketch.java
Original file line number Diff line number Diff line change
@@ -9,6 +9,13 @@
import static com.datadoghq.sketch.ddsketch.Serializer.embeddedFieldSize;

import com.datadoghq.sketch.QuantileSketch;
import com.datadoghq.sketch.ddsketch.encoding.BinEncodingMode;
import com.datadoghq.sketch.ddsketch.encoding.Flag;
import com.datadoghq.sketch.ddsketch.encoding.IndexMappingLayout;
import com.datadoghq.sketch.ddsketch.encoding.Input;
import com.datadoghq.sketch.ddsketch.encoding.MalformedInputException;
import com.datadoghq.sketch.ddsketch.encoding.Output;
import com.datadoghq.sketch.ddsketch.encoding.VarEncodingHelper;
import com.datadoghq.sketch.ddsketch.mapping.BitwiseLinearlyInterpolatedMapping;
import com.datadoghq.sketch.ddsketch.mapping.IndexMapping;
import com.datadoghq.sketch.ddsketch.mapping.IndexMappingConverter;
@@ -18,6 +25,7 @@
import com.datadoghq.sketch.ddsketch.store.CollapsingLowestDenseStore;
import com.datadoghq.sketch.ddsketch.store.Store;
import com.datadoghq.sketch.ddsketch.store.UnboundedSizeDenseStore;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
@@ -230,15 +238,18 @@ private void checkValueTrackable(double value) {
*/
@Override
public void mergeWith(DDSketch other) {
checkMergeability(indexMapping, other.indexMapping);
negativeValueStore.mergeWith(other.negativeValueStore);
positiveValueStore.mergeWith(other.positiveValueStore);
zeroCount += other.zeroCount;
}

if (!indexMapping.equals(other.indexMapping)) {
private static void checkMergeability(IndexMapping indexMapping1, IndexMapping indexMapping2)
throws IllegalArgumentException {
if (!indexMapping1.equals(indexMapping2)) {
throw new IllegalArgumentException(
"The sketches are not mergeable because they do not use the same index mappings.");
}

negativeValueStore.mergeWith(other.negativeValueStore);
positiveValueStore.mergeWith(other.positiveValueStore);
zeroCount += other.zeroCount;
}

@Override
@@ -376,6 +387,93 @@ public DDSketch convert(IndexMapping newIndexMapping, Supplier<Store> storeSuppl
newIndexMapping, newNegativeValueStore, newPositiveValueStore, zeroCount, minIndexedValue);
}

public void encode(Output output, boolean omitIndexMapping) throws IOException {
if (!omitIndexMapping) {
indexMapping.encode(output);
}

if (zeroCount != 0) {
Flag.ZERO_COUNT.encode(output);
VarEncodingHelper.encodeVarDouble(output, zeroCount);
}

positiveValueStore.encode(output, Flag.Type.POSITIVE_STORE);
negativeValueStore.encode(output, Flag.Type.NEGATIVE_STORE);
}

public void decodeAndMergeWith(Input input) throws IOException {
final DecodingState state =
new DecodingState(indexMapping, negativeValueStore, positiveValueStore, zeroCount);
decodeAndMergeWith(state, input);
zeroCount = state.zeroCount;
}

public static DDSketch decode(Input input, Supplier<Store> storeSupplier) throws IOException {
return decode(input, storeSupplier, null);
}

public static DDSketch decode(
Input input, Supplier<Store> storeSupplier, IndexMapping indexMapping) throws IOException {
final DecodingState state =
new DecodingState(indexMapping, storeSupplier.get(), storeSupplier.get(), 0);
decodeAndMergeWith(state, input);
if (state.indexMapping == null) {
throw new IllegalArgumentException("The index mapping is missing.");
}
return new DDSketch(
state.indexMapping, state.negativeValueStore, state.positiveValueStore, state.zeroCount);
}

private static void decodeAndMergeWith(DecodingState state, Input input) throws IOException {
while (input.hasRemaining()) {
final Flag flag = Flag.decode(input);
switch (flag.type()) {
case POSITIVE_STORE:
state.positiveValueStore.decodeAndMergeWith(input, BinEncodingMode.ofFlag(flag));
break;
case NEGATIVE_STORE:
state.negativeValueStore.decodeAndMergeWith(input, BinEncodingMode.ofFlag(flag));
break;
case INDEX_MAPPING:
final IndexMapping decodedIndexMapping =
IndexMapping.decode(input, IndexMappingLayout.ofFlag(flag));
if (state.indexMapping == null) {
state.indexMapping = decodedIndexMapping;
} else {
checkMergeability(state.indexMapping, decodedIndexMapping);
}
break;
case SKETCH_FEATURES:
if (Flag.ZERO_COUNT.equals(flag)) {
state.zeroCount += VarEncodingHelper.decodeVarDouble(input);
} else {
throw new MalformedInputException("The flag is invalid.");
}
break;
default:
throw new MalformedInputException("The flag type is invalid.");
}
}
}

private static final class DecodingState {
private IndexMapping indexMapping;
private final Store negativeValueStore;
private final Store positiveValueStore;
private double zeroCount;

private DecodingState(
IndexMapping indexMapping,
Store negativeValueStore,
Store positiveValueStore,
double zeroCount) {
this.indexMapping = indexMapping;
this.negativeValueStore = negativeValueStore;
this.positiveValueStore = positiveValueStore;
this.zeroCount = zeroCount;
}
}

/** @return the size of the sketch when serialized in protobuf */
public int serializedSize() {
return embeddedFieldSize(1, indexMapping.serializedSize())
Loading