Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tiered stats to request cache response #8

Draft
wants to merge 10 commits into
base: framework-serialized
Choose a base branch
from
Prev Previous commit
Added/tested get time EWMA to non-heap tiers
Peter Alfonsi committed Nov 6, 2023
commit 81c038a80e85392ea11e729553c95858219b5389
Original file line number Diff line number Diff line change
@@ -70,7 +70,6 @@ public void testDiskTierStats() throws Exception {

resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get();
int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP);
System.out.println(requestSize);
assertTrue(heapSizeBytes > requestSize);
// If this fails, increase heapSizeBytes! We can't adjust it after getting the size of one query
// as the cache size setting is not dynamic
@@ -82,6 +81,7 @@ public void testDiskTierStats() throws Exception {
assertSearchResponse(resp);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false);
assertPositiveEWMAForDisk(client, "index");
}
// the first request, for "hello0", should have been evicted to the disk tier
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello0")).get();
@@ -99,4 +99,15 @@ private long getCacheSizeBytes(Client client, String index, TierType tierType) {
.getRequestCache();
return requestCacheStats.getMemorySizeInBytes(tierType);
}

private void assertPositiveEWMAForDisk(Client client, String index) {
RequestCacheStats requestCacheStats = client.admin()
.indices()
.prepareStats(index)
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();
assertTrue(requestCacheStats.getTimeEWMA(TierType.DISK) > 0);
}
}
Original file line number Diff line number Diff line change
@@ -43,7 +43,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.time.DateFormatter;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
Original file line number Diff line number Diff line change
@@ -52,12 +52,13 @@
*/
public class RequestCacheStats implements Writeable, ToXContentFragment {

private Map<String, StatsHolder> map = new HashMap<>(){{
for (TierType tierType : TierType.values())
private Map<String, StatsHolder> map = new HashMap<>() {
{
put(tierType.getStringValue(), new StatsHolder());
// Every possible tier type must have counters, even if they are disabled. Then the counters report 0
}}
for (TierType tierType : TierType.values()) {
put(tierType.getStringValue(), new StatsHolder());
// Every possible tier type must have counters, even if they are disabled. Then the counters report 0
}
}
};

public RequestCacheStats() {}
@@ -66,12 +67,12 @@ public RequestCacheStats(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.map = in.readMap(StreamInput::readString, StatsHolder::new);
} else {
// objects from earlier versions only contain on-heap info, and do not have entries info
// objects from earlier versions only contain on-heap info, and do not have entries or getTime info
long memorySize = in.readVLong();
long evictions = in.readVLong();
long hitCount = in.readVLong();
long missCount = in.readVLong();
this.map.put(TierType.ON_HEAP.getStringValue(), new StatsHolder(memorySize, evictions, hitCount, missCount, 0));
this.map.put(TierType.ON_HEAP.getStringValue(), new StatsHolder(memorySize, evictions, hitCount, missCount, 0, 0.0));
}
}

@@ -116,6 +117,10 @@ public long getEntries(TierType tierType) {
return getTierStats(tierType).entries.count();
}

public double getTimeEWMA(TierType tierType) {
return getTierStats(tierType).getTimeEWMA;
}

// By default, return on-heap stats if no tier is specified

public long getMemorySizeInBytes() {
@@ -142,12 +147,14 @@ public long getEntries() {
return getEntries(TierType.ON_HEAP);
}

// no getTimeEWMA default as it'll always return 0 for on-heap

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeMap(this.map, StreamOutput::writeString, (o, v) -> v.writeTo(o)); // ?
} else {
// Write only on-heap values, and don't write entries metric
// Write only on-heap values, and don't write entries metric or getTimeEWMA
StatsHolder heapStats = map.get(TierType.ON_HEAP.getStringValue());
out.writeVLong(heapStats.getMemorySize());
out.writeVLong(heapStats.getEvictions());
@@ -160,13 +167,13 @@ public void writeTo(StreamOutput out) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.REQUEST_CACHE_STATS);
// write on-heap stats outside of tiers object
getTierStats(TierType.ON_HEAP).toXContent(builder, params);
getTierStats(TierType.ON_HEAP).toXContent(builder, params, false); // Heap tier doesn't write a getTime
builder.startObject(Fields.TIERS);
for (TierType tierType : TierType.values()) { // fixed order
if (tierType != TierType.ON_HEAP) {
String tier = tierType.getStringValue();
builder.startObject(tier);
map.get(tier).toXContent(builder, params);
map.get(tier).toXContent(builder, params, true); // Non-heap tiers write a getTime
builder.endObject();
}
}
@@ -189,5 +196,6 @@ static final class Fields {
static final String HIT_COUNT = "hit_count";
static final String MISS_COUNT = "miss_count";
static final String ENTRIES = "entries";
static final String GET_TIME_EWMA = "get_time_ewma_millis";
}
}
Original file line number Diff line number Diff line change
@@ -57,12 +57,19 @@ public RequestCacheStats stats() {
return new RequestCacheStats(statsHolder);
}

public void onHit(TierType tierType) {
public void onHit(TierType tierType, double getTimeEWMA) {
statsHolder.get(tierType).hitCount.inc();
if (tierType == TierType.DISK) {
statsHolder.get(tierType).getTimeEWMA = getTimeEWMA;
}

}

public void onMiss(TierType tierType) {
public void onMiss(TierType tierType, double getTimeEWMA) {
statsHolder.get(tierType).missCount.inc();
if (tierType == TierType.DISK) {
statsHolder.get(tierType).getTimeEWMA = getTimeEWMA;
}
}

public void onCached(Accountable key, BytesReference value, TierType tierType) {
Original file line number Diff line number Diff line change
@@ -25,16 +25,18 @@ public class StatsHolder implements Serializable, Writeable, ToXContentFragment
final CounterMetric hitCount;
final CounterMetric missCount;
final CounterMetric entries;
double getTimeEWMA; // CounterMetric is long, we need a double

public StatsHolder() {
this.totalMetric = new CounterMetric();
this.evictionsMetric = new CounterMetric();
this.hitCount = new CounterMetric();
this.missCount = new CounterMetric();
this.entries = new CounterMetric();
this.getTimeEWMA = 0.0;
}

public StatsHolder(long memorySize, long evictions, long hitCount, long missCount, long entries) {
public StatsHolder(long memorySize, long evictions, long hitCount, long missCount, long entries, double getTimeEWMA) {
// Switched argument order to match RequestCacheStats
this.totalMetric = new CounterMetric();
this.totalMetric.inc(memorySize);
@@ -46,12 +48,13 @@ public StatsHolder(long memorySize, long evictions, long hitCount, long missCoun
this.missCount.inc(missCount);
this.entries = new CounterMetric();
this.entries.inc(entries);
this.getTimeEWMA = getTimeEWMA;
}

public StatsHolder(StreamInput in) throws IOException {
// Read and write the values of the counter metrics. They should always be positive
// This object is new, so we shouldn't need version checks for different behavior
this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readDouble());
// java forces us to do this in one line
// guaranteed to be evaluated in correct order (https://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.7.4)
}
@@ -63,6 +66,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(hitCount.count());
out.writeVLong(missCount.count());
out.writeVLong(entries.count());
out.writeDouble(getTimeEWMA);
}

public void add(StatsHolder otherStats) {
@@ -72,6 +76,18 @@ public void add(StatsHolder otherStats) {
hitCount.inc(otherStats.hitCount.count());
missCount.inc(otherStats.missCount.count());
entries.inc(otherStats.entries.count());
if (!otherStats.isEmpty()) {
getTimeEWMA = otherStats.getTimeEWMA;
}

/* Adding two EWMAs is a bit tricky. If both stats are non-empty we can assume the newer one dominates.
add() is only called in CommonStats.java in two places:
1) it's used to either add otherStats to a new (empty) RequestCacheStats
2) it's used to add new stats to an existing RequestCacheStats
In both cases, the existing object is older, so we can assume otherStats's EWMA dominates.
It doesn't make sense to use the existing EWMA in case 1, and in case 2 the actual value
will be updated from the disk tier on the next hit/miss, so it's probably ok to use otherStats.getTimeEWMA.
*/
}

public long getEvictions() {
@@ -94,8 +110,16 @@ public long getEntries() {
return entries.count();
}

public double getTimeEWMA() {
return getTimeEWMA;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return toXContent(builder, params, false); // By default do not write the getTime field
}

public XContentBuilder toXContent(XContentBuilder builder, Params params, boolean includeGetTime) throws IOException {
builder.humanReadableField(
RequestCacheStats.Fields.MEMORY_SIZE_IN_BYTES,
RequestCacheStats.Fields.MEMORY_SIZE,
@@ -105,6 +129,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(RequestCacheStats.Fields.HIT_COUNT, getHitCount());
builder.field(RequestCacheStats.Fields.MISS_COUNT, getMissCount());
builder.field(RequestCacheStats.Fields.ENTRIES, getEntries());
if (includeGetTime) {
builder.field(RequestCacheStats.Fields.GET_TIME_EWMA, getTimeEWMA());
}
return builder;
}

private boolean isEmpty() {
return (getEvictions() == 0)
&& (getMemorySize() == 0)
&& (getHitCount() == 0)
&& (getMissCount() == 0)
&& (getEntries() == 0)
&& (getTimeEWMA() == 0.0);
}
}
Original file line number Diff line number Diff line change
@@ -56,13 +56,13 @@ public final void onCached(IndicesRequestCache.Key key, BytesReference value, Ti
}

@Override
public final void onHit(TierType tierType) {
stats().onHit(tierType);
public final void onHit(TierType tierType, double getTimeEWMA) {
stats().onHit(tierType, getTimeEWMA);
}

@Override
public final void onMiss(TierType tierType) {
stats().onMiss(tierType);
public final void onMiss(TierType tierType, double getTimeEWMA) {
stats().onMiss(tierType, getTimeEWMA);
}

@Override
Original file line number Diff line number Diff line change
@@ -151,8 +151,8 @@ void clear(CacheEntity entity) {
}

@Override
public void onMiss(Key key, TierType tierType) {
key.entity.onMiss(tierType);
public void onMiss(Key key, TierType tierType, double getTimeEWMA) {
key.entity.onMiss(tierType, getTimeEWMA);
}

@Override
@@ -161,8 +161,8 @@ public void onRemoval(RemovalNotification<Key, BytesReference> notification) {
}

@Override
public void onHit(Key key, BytesReference value, TierType tierType) {
key.entity.onHit(tierType);
public void onHit(Key key, BytesReference value, TierType tierType, double getTimeEWMA) {
key.entity.onHit(tierType, getTimeEWMA);
}

@Override
@@ -275,12 +275,12 @@ interface CacheEntity extends Accountable, Writeable {
/**
* Called each time this entity has a cache hit.
*/
void onHit(TierType tierType);
void onHit(TierType tierType, double getTimeEWMA);

/**
* Called each time this entity has a cache miss.
*/
void onMiss(TierType tierType);
void onMiss(TierType tierType, double getTimeEWMA);

/**
* Called when this entity instance is removed
Original file line number Diff line number Diff line change
@@ -12,11 +12,11 @@

public interface TieredCacheEventListener<K, V> {

void onMiss(K key, TierType tierType);
void onMiss(K key, TierType tierType, double getTimeEWMA);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding getTimeEWMA here isn't needed considering it will only be needed as part of stats. We need to rethink in terms of low level design. For such specific stats related to a particular tier, we can instead create separate DiskTierStats associated with disk tier for example, keep accumulating relevant stats there in memory. This stats can be eventually used inside ShardRequestCacheStats to pull in those values.

As there might be more values coming in later on which we need to add as part of stats, so this solution isn't extensible as we can't keep adding it here.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense. I just wasn't sure where to actually fetch the values from the disk tier, and I thought onHit and onMiss would be reasonable since that's when getTimeEWMA will actually change. But I agree it's not extensible to new stats which might change on some other frequency. Should we instead have some sort of background job to periodically gather stats from the disk tier?


void onRemoval(RemovalNotification<K, V> notification);

void onHit(K key, V value, TierType tierType);
void onHit(K key, V value, TierType tierType, double getTimeEWMA);

void onCached(K key, V value, TierType tierType);
}
Original file line number Diff line number Diff line change
@@ -131,16 +131,24 @@ private Function<K, CacheValue<V>> getValueFromTierCache() {
return key -> {
for (CachingTier<K, V> cachingTier : cachingTierList) {
V value = cachingTier.get(key);
double getTimeEWMA = getTimeEWMAIfDisk(cachingTier);
if (value != null) {
tieredCacheEventListener.onHit(key, value, cachingTier.getTierType());
tieredCacheEventListener.onHit(key, value, cachingTier.getTierType(), getTimeEWMA);
return new CacheValue<>(value, cachingTier.getTierType());
}
tieredCacheEventListener.onMiss(key, cachingTier.getTierType());
tieredCacheEventListener.onMiss(key, cachingTier.getTierType(), getTimeEWMA);
Comment on lines +134 to +139
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem right. We should ideally put these get times inside Disk caching tier itself. So that if we have a different implementation of TieredService, we don't have to duplicate this work.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And as discussed having separate DiskCacheStats separately should be able to solve this.

}
return null;
};
}

private double getTimeEWMAIfDisk(CachingTier<K, V> cachingTier) {
if (cachingTier.getTierType() == TierType.DISK) {
return ((DiskCachingTier<K, V>) cachingTier).getTimeMillisEWMA();
}
return 0.0;
}

@Override
public void closeDiskTier() {
diskCachingTier.close();
Loading