Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added/tested get time EWMA to non-heap tiers for node stats response #10

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public void testDiskTierStats() throws Exception {

resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get();
int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP);
System.out.println(requestSize);
assertTrue(heapSizeBytes > requestSize);
// If this fails, increase heapSizeBytes! We can't adjust it after getting the size of one query
// as the cache size setting is not dynamic
Expand All @@ -82,6 +81,7 @@ public void testDiskTierStats() throws Exception {
assertSearchResponse(resp);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false);
assertPositiveEWMAForDisk(client, "index");
}
// the first request, for "hello0", should have been evicted to the disk tier
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello0")).get();
Expand All @@ -99,4 +99,15 @@ private long getCacheSizeBytes(Client client, String index, TierType tierType) {
.getRequestCache();
return requestCacheStats.getMemorySizeInBytes(tierType);
}

private void assertPositiveEWMAForDisk(Client client, String index) {
RequestCacheStats requestCacheStats = client.admin()
.indices()
.prepareStats(index)
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();
assertTrue(requestCacheStats.getTimeEWMA(TierType.DISK) > 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.time.DateFormatter;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@
*/
public class RequestCacheStats implements Writeable, ToXContentFragment {

private Map<String, StatsHolder> map = new HashMap<>(){{
for (TierType tierType : TierType.values())
private Map<String, StatsHolder> map = new HashMap<>() {
{
put(tierType.getStringValue(), new StatsHolder());
// Every possible tier type must have counters, even if they are disabled. Then the counters report 0
}}
for (TierType tierType : TierType.values()) {
put(tierType.getStringValue(), new StatsHolder());
// Every possible tier type must have counters, even if they are disabled. Then the counters report 0
}
}
};

public RequestCacheStats() {}
Expand All @@ -66,12 +67,12 @@ public RequestCacheStats(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.map = in.readMap(StreamInput::readString, StatsHolder::new);
} else {
// objects from earlier versions only contain on-heap info, and do not have entries info
// objects from earlier versions only contain on-heap info, and do not have entries or getTime info
long memorySize = in.readVLong();
long evictions = in.readVLong();
long hitCount = in.readVLong();
long missCount = in.readVLong();
this.map.put(TierType.ON_HEAP.getStringValue(), new StatsHolder(memorySize, evictions, hitCount, missCount, 0));
this.map.put(TierType.ON_HEAP.getStringValue(), new StatsHolder(memorySize, evictions, hitCount, missCount, 0, 0.0));
}
}

Expand Down Expand Up @@ -116,6 +117,10 @@ public long getEntries(TierType tierType) {
return getTierStats(tierType).entries.count();
}

public double getTimeEWMA(TierType tierType) {
return getTierStats(tierType).getTimeEWMA;
}

// By default, return on-heap stats if no tier is specified

public long getMemorySizeInBytes() {
Expand All @@ -142,12 +147,14 @@ public long getEntries() {
return getEntries(TierType.ON_HEAP);
}

// no getTimeEWMA default as it'll always return 0 for on-heap

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeMap(this.map, StreamOutput::writeString, (o, v) -> v.writeTo(o)); // ?
} else {
// Write only on-heap values, and don't write entries metric
// Write only on-heap values, and don't write entries metric or getTimeEWMA
StatsHolder heapStats = map.get(TierType.ON_HEAP.getStringValue());
out.writeVLong(heapStats.getMemorySize());
out.writeVLong(heapStats.getEvictions());
Expand All @@ -160,13 +167,13 @@ public void writeTo(StreamOutput out) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.REQUEST_CACHE_STATS);
// write on-heap stats outside of tiers object
getTierStats(TierType.ON_HEAP).toXContent(builder, params);
getTierStats(TierType.ON_HEAP).toXContent(builder, params, false); // Heap tier doesn't write a getTime
builder.startObject(Fields.TIERS);
for (TierType tierType : TierType.values()) { // fixed order
if (tierType != TierType.ON_HEAP) {
String tier = tierType.getStringValue();
builder.startObject(tier);
map.get(tier).toXContent(builder, params);
map.get(tier).toXContent(builder, params, true); // Non-heap tiers write a getTime
builder.endObject();
}
}
Expand All @@ -189,5 +196,6 @@ static final class Fields {
static final String HIT_COUNT = "hit_count";
static final String MISS_COUNT = "miss_count";
static final String ENTRIES = "entries";
static final String GET_TIME_EWMA = "get_time_ewma_millis";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,19 @@ public RequestCacheStats stats() {
return new RequestCacheStats(statsHolder);
}

public void onHit(TierType tierType) {
public void onHit(TierType tierType, double getTimeEWMA) {
statsHolder.get(tierType).hitCount.inc();
if (tierType == TierType.DISK) {
statsHolder.get(tierType).getTimeEWMA = getTimeEWMA;
}

}

public void onMiss(TierType tierType) {
public void onMiss(TierType tierType, double getTimeEWMA) {
statsHolder.get(tierType).missCount.inc();
if (tierType == TierType.DISK) {
statsHolder.get(tierType).getTimeEWMA = getTimeEWMA;
}
}

public void onCached(Accountable key, BytesReference value, TierType tierType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,18 @@ public class StatsHolder implements Serializable, Writeable, ToXContentFragment
final CounterMetric hitCount;
final CounterMetric missCount;
final CounterMetric entries;
double getTimeEWMA; // CounterMetric is long, we need a double

public StatsHolder() {
this.totalMetric = new CounterMetric();
this.evictionsMetric = new CounterMetric();
this.hitCount = new CounterMetric();
this.missCount = new CounterMetric();
this.entries = new CounterMetric();
this.getTimeEWMA = 0.0;
}

public StatsHolder(long memorySize, long evictions, long hitCount, long missCount, long entries) {
public StatsHolder(long memorySize, long evictions, long hitCount, long missCount, long entries, double getTimeEWMA) {
// Switched argument order to match RequestCacheStats
this.totalMetric = new CounterMetric();
this.totalMetric.inc(memorySize);
Expand All @@ -46,12 +48,13 @@ public StatsHolder(long memorySize, long evictions, long hitCount, long missCoun
this.missCount.inc(missCount);
this.entries = new CounterMetric();
this.entries.inc(entries);
this.getTimeEWMA = getTimeEWMA;
}

public StatsHolder(StreamInput in) throws IOException {
// Read and write the values of the counter metrics. They should always be positive
// This object is new, so we shouldn't need version checks for different behavior
this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readDouble());
// java forces us to do this in one line
// guaranteed to be evaluated in correct order (https://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.7.4)
}
Expand All @@ -63,6 +66,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(hitCount.count());
out.writeVLong(missCount.count());
out.writeVLong(entries.count());
out.writeDouble(getTimeEWMA);
}

public void add(StatsHolder otherStats) {
Expand All @@ -72,6 +76,18 @@ public void add(StatsHolder otherStats) {
hitCount.inc(otherStats.hitCount.count());
missCount.inc(otherStats.missCount.count());
entries.inc(otherStats.entries.count());
if (!otherStats.isEmpty()) {
getTimeEWMA = otherStats.getTimeEWMA;
}

/* Adding two EWMAs is a bit tricky. If both stats are non-empty we can assume the newer one dominates.
add() is only called in CommonStats.java in two places:
1) it's used to either add otherStats to a new (empty) RequestCacheStats
2) it's used to add new stats to an existing RequestCacheStats
In both cases, the existing object is older, so we can assume otherStats's EWMA dominates.
It doesn't make sense to use the existing EWMA in case 1, and in case 2 the actual value
will be updated from the disk tier on the next hit/miss, so it's probably ok to use otherStats.getTimeEWMA.
*/
}

public long getEvictions() {
Expand All @@ -94,8 +110,16 @@ public long getEntries() {
return entries.count();
}

public double getTimeEWMA() {
return getTimeEWMA;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return toXContent(builder, params, false); // By default do not write the getTime field
}

public XContentBuilder toXContent(XContentBuilder builder, Params params, boolean includeGetTime) throws IOException {
builder.humanReadableField(
RequestCacheStats.Fields.MEMORY_SIZE_IN_BYTES,
RequestCacheStats.Fields.MEMORY_SIZE,
Expand All @@ -105,6 +129,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(RequestCacheStats.Fields.HIT_COUNT, getHitCount());
builder.field(RequestCacheStats.Fields.MISS_COUNT, getMissCount());
builder.field(RequestCacheStats.Fields.ENTRIES, getEntries());
if (includeGetTime) {
builder.field(RequestCacheStats.Fields.GET_TIME_EWMA, getTimeEWMA());
}
return builder;
}

private boolean isEmpty() {
return (getEvictions() == 0)
&& (getMemorySize() == 0)
&& (getHitCount() == 0)
&& (getMissCount() == 0)
&& (getEntries() == 0)
&& (getTimeEWMA() == 0.0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ public final void onCached(IndicesRequestCache.Key key, BytesReference value, Ti
}

@Override
public final void onHit(TierType tierType) {
stats().onHit(tierType);
public final void onHit(TierType tierType, double getTimeEWMA) {
stats().onHit(tierType, getTimeEWMA);
}

@Override
public final void onMiss(TierType tierType) {
stats().onMiss(tierType);
public final void onMiss(TierType tierType, double getTimeEWMA) {
stats().onMiss(tierType, getTimeEWMA);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ void clear(CacheEntity entity) {
}

@Override
public void onMiss(Key key, TierType tierType) {
key.entity.onMiss(tierType);
public void onMiss(Key key, TierType tierType, double getTimeEWMA) {
key.entity.onMiss(tierType, getTimeEWMA);
}

@Override
Expand All @@ -161,8 +161,8 @@ public void onRemoval(RemovalNotification<Key, BytesReference> notification) {
}

@Override
public void onHit(Key key, BytesReference value, TierType tierType) {
key.entity.onHit(tierType);
public void onHit(Key key, BytesReference value, TierType tierType, double getTimeEWMA) {
key.entity.onHit(tierType, getTimeEWMA);
}

@Override
Expand Down Expand Up @@ -275,12 +275,12 @@ interface CacheEntity extends Accountable, Writeable {
/**
* Called each time this entity has a cache hit.
*/
void onHit(TierType tierType);
void onHit(TierType tierType, double getTimeEWMA);

/**
* Called each time this entity has a cache miss.
*/
void onMiss(TierType tierType);
void onMiss(TierType tierType, double getTimeEWMA);

/**
* Called when this entity instance is removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@

public interface TieredCacheEventListener<K, V> {

void onMiss(K key, TierType tierType);
void onMiss(K key, TierType tierType, double getTimeEWMA);

void onRemoval(RemovalNotification<K, V> notification);

void onHit(K key, V value, TierType tierType);
void onHit(K key, V value, TierType tierType, double getTimeEWMA);

void onCached(K key, V value, TierType tierType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,24 @@ private Function<K, CacheValue<V>> getValueFromTierCache() {
return key -> {
for (CachingTier<K, V> cachingTier : cachingTierList) {
V value = cachingTier.get(key);
double getTimeEWMA = getTimeEWMAIfDisk(cachingTier);
if (value != null) {
tieredCacheEventListener.onHit(key, value, cachingTier.getTierType());
tieredCacheEventListener.onHit(key, value, cachingTier.getTierType(), getTimeEWMA);
return new CacheValue<>(value, cachingTier.getTierType());
}
tieredCacheEventListener.onMiss(key, cachingTier.getTierType());
tieredCacheEventListener.onMiss(key, cachingTier.getTierType(), getTimeEWMA);
}
return null;
};
}

private double getTimeEWMAIfDisk(CachingTier<K, V> cachingTier) {
if (cachingTier.getTierType() == TierType.DISK) {
return ((DiskCachingTier<K, V>) cachingTier).getTimeMillisEWMA();
}
return 0.0;
}

@Override
public void closeDiskTier() {
diskCachingTier.close();
Expand Down
Loading