Skip to content
28 changes: 11 additions & 17 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -301,26 +301,20 @@ protected static ShardFieldStats shardFieldStats(List<LeafReaderContext> leaves,
} else {
usages = -1;
}
boolean trackPostingsMemoryEnabled = isStateless;
boolean trackLiveDocsMemoryEnabled = ShardFieldStats.TRACK_LIVE_DOCS_IN_MEMORY_BYTES.isEnabled();
if (trackLiveDocsMemoryEnabled || trackPostingsMemoryEnabled) {
if (isStateless) {
SegmentReader segmentReader = Lucene.tryUnwrapSegmentReader(leaf.reader());
if (segmentReader != null) {
if (trackPostingsMemoryEnabled) {
String postingBytes = segmentReader.getSegmentInfo().info.getAttribute(
TrackingPostingsInMemoryBytesCodec.IN_MEMORY_POSTINGS_BYTES_KEY
);
if (postingBytes != null) {
totalPostingBytes += Long.parseLong(postingBytes);
}
String postingBytes = segmentReader.getSegmentInfo().info.getAttribute(
TrackingPostingsInMemoryBytesCodec.IN_MEMORY_POSTINGS_BYTES_KEY
);
if (postingBytes != null) {
totalPostingBytes += Long.parseLong(postingBytes);
}
if (trackLiveDocsMemoryEnabled) {
var liveDocs = segmentReader.getLiveDocs();
if (liveDocs != null) {
assert validateLiveDocsClass(liveDocs);
long liveDocsBytes = getLiveDocsBytes(liveDocs);
totalLiveDocsBytes += liveDocsBytes;
}
var liveDocs = segmentReader.getLiveDocs();
if (liveDocs != null) {
assert validateLiveDocsClass(liveDocs);
long liveDocsBytes = getLiveDocsBytes(liveDocs);
totalLiveDocsBytes += liveDocsBytes;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.util.FeatureFlag;

/**
* A per shard stats including the number of segments and total fields across those segments.
Expand All @@ -26,7 +25,6 @@
*/
public record ShardFieldStats(int numSegments, int totalFields, long fieldUsages, long postingsInMemoryBytes, long liveDocsBytes) {

public static final FeatureFlag TRACK_LIVE_DOCS_IN_MEMORY_BYTES = new FeatureFlag("track_live_docs_in_memory_bytes");
public static final long FIXED_BITSET_BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(FixedBitSet.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -78,7 +77,6 @@
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
Expand Down Expand Up @@ -1981,71 +1979,6 @@ public void testShardFieldStats() throws IOException {
closeShards(shard);
}

public void testShardFieldStatsWithDeletes() throws IOException {
Settings settings = Settings.builder()
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.MINUS_ONE)
.build();
IndexShard shard = newShard(true, settings);
assertNull(shard.getShardFieldStats());
recoverShardFromStore(shard);
boolean liveDocsTrackingEnabled = ShardFieldStats.TRACK_LIVE_DOCS_IN_MEMORY_BYTES.isEnabled();

// index some documents
int numDocs = 10;
for (int i = 0; i < numDocs; i++) {
indexDoc(shard, "_doc", "first_" + i, """
{
"f1": "foo",
"f2": "bar"
}
""");
}
shard.refresh("test");
var stats = shard.getShardFieldStats();
assertThat(stats.numSegments(), equalTo(1));
assertThat(stats.liveDocsBytes(), equalTo(0L));

// delete a doc
deleteDoc(shard, "first_0");

// Refresh and fetch new stats:
shard.refresh("test");
stats = shard.getShardFieldStats();
// More segments because delete operation is stored in the new segment for replication purposes.
assertThat(stats.numSegments(), equalTo(2));
long expectedLiveDocsSize = 0;
if (liveDocsTrackingEnabled) {
// Delete op is stored in new segment, but marked as deleted. All segements have live docs:
expectedLiveDocsSize += new FixedBitSet(numDocs).ramBytesUsed();
// Second segment the delete operation that is marked as deleted:
expectedLiveDocsSize += new FixedBitSet(1).ramBytesUsed();
}
assertThat(stats.liveDocsBytes(), equalTo(expectedLiveDocsSize));

// delete another doc:
deleteDoc(shard, "first_1");
shard.getMinRetainedSeqNo();

// Refresh and fetch new stats:
shard.refresh("test");
stats = shard.getShardFieldStats();
// More segments because delete operation is stored in the new segment for replication purposes.
assertThat(stats.numSegments(), equalTo(3));
expectedLiveDocsSize = 0;
if (liveDocsTrackingEnabled) {
// Delete op is stored in new segment, but marked as deleted. All segements have live docs:
// First segment with deletes
expectedLiveDocsSize += new FixedBitSet(numDocs).ramBytesUsed();
// Second and third segments the delete operation that is marked as deleted:
expectedLiveDocsSize += new FixedBitSet(1).ramBytesUsed();
expectedLiveDocsSize += new FixedBitSet(1).ramBytesUsed();
}
assertThat(stats.liveDocsBytes(), equalTo(expectedLiveDocsSize));

closeShards(shard);
}

public void testIndexingOperationsListeners() throws IOException {
IndexShard shard = newStartedShard(true);
indexDoc(shard, "_doc", "0", "{\"foo\" : \"bar\"}");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.shard;

import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MergePolicyConfig;

import java.io.IOException;

import static org.hamcrest.Matchers.equalTo;

public class LiveDocsEstimationTests extends IndexShardTestCase {

@Override
protected Settings nodeSettings() {
return Settings.builder().put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true).build();
}

public void testShardFieldStatsWithDeletes() throws IOException {
Settings settings = Settings.builder()
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.MINUS_ONE)
.build();
IndexShard shard = newShard(true, settings);
assertNull(shard.getShardFieldStats());
recoverShardFromStore(shard);

// index some documents
int numDocs = 10;
for (int i = 0; i < numDocs; i++) {
indexDoc(shard, "_doc", "first_" + i, """
{
"f1": "foo",
"f2": "bar"
}
""");
}
shard.refresh("test");
var stats = shard.getShardFieldStats();
assertThat(stats.numSegments(), equalTo(1));
assertThat(stats.liveDocsBytes(), equalTo(0L));

// delete a doc
deleteDoc(shard, "first_0");

// Refresh and fetch new stats:
shard.refresh("test");
stats = shard.getShardFieldStats();
// More segments because delete operation is stored in the new segment for replication purposes.
assertThat(stats.numSegments(), equalTo(2));
long expectedLiveDocsSize = 0;
// Delete op is stored in new segment, but marked as deleted. All segements have live docs:
expectedLiveDocsSize += new FixedBitSet(numDocs).ramBytesUsed();
// Second segment the delete operation that is marked as deleted:
expectedLiveDocsSize += new FixedBitSet(1).ramBytesUsed();
assertThat(stats.liveDocsBytes(), equalTo(expectedLiveDocsSize));

// delete another doc:
deleteDoc(shard, "first_1");
shard.getMinRetainedSeqNo();

// Refresh and fetch new stats:
shard.refresh("test");
stats = shard.getShardFieldStats();
// More segments because delete operation is stored in the new segment for replication purposes.
assertThat(stats.numSegments(), equalTo(3));
expectedLiveDocsSize = 0;
// Delete op is stored in new segment, but marked as deleted. All segements have live docs:
// First segment with deletes
expectedLiveDocsSize += new FixedBitSet(numDocs).ramBytesUsed();
// Second and third segments the delete operation that is marked as deleted:
expectedLiveDocsSize += new FixedBitSet(1).ramBytesUsed();
expectedLiveDocsSize += new FixedBitSet(1).ramBytesUsed();
assertThat(stats.liveDocsBytes(), equalTo(expectedLiveDocsSize));

closeShards(shard);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ protected IndexShard newShard(
List<SearchOperationListener> soListener,
IndexingOperationListener... listeners
) throws IOException {
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
final Settings nodeSettings = Settings.builder().put(nodeSettings()).put("node.name", routing.currentNodeId()).build();
final IndexSettings indexSettings = new IndexSettings(indexMetadata, nodeSettings);
final IndexShard indexShard;
if (storeProvider == null) {
Expand Down Expand Up @@ -1336,4 +1336,8 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) {
public static long recoverLocallyUpToGlobalCheckpoint(IndexShard indexShard) {
return safeAwait(indexShard::recoverLocallyUpToGlobalCheckpoint);
}

protected Settings nodeSettings() {
return Settings.EMPTY;
}
}