Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remove] Segment memory estimation and tracking #2029

Merged
merged 4 commits into from
Feb 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
Expand All @@ -57,7 +55,6 @@
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.Strings;
import org.opensearch.common.UUIDs;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.common.settings.Settings;
Expand All @@ -76,7 +73,6 @@
import org.opensearch.index.engine.CommitStats;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.NoOpEngine;
import org.opensearch.index.engine.SegmentsStats;
import org.opensearch.index.flush.FlushStats;
import org.opensearch.index.mapper.SourceToParse;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
Expand All @@ -86,10 +82,8 @@
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.breaker.CircuitBreakerStats;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.DummyShardLock;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
Expand Down Expand Up @@ -122,7 +116,6 @@
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.BREAKER;
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.NONE;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
Expand All @@ -135,13 +128,11 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;

public class IndexShardIT extends OpenSearchSingleNodeTestCase {

Expand Down Expand Up @@ -643,86 +634,6 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResul
}
}

/** Check that the accounting breaker correctly matches the segments API for memory usage */
private void checkAccountingBreaker() {
CircuitBreakerService breakerService = getInstanceFromNode(CircuitBreakerService.class);
CircuitBreaker acctBreaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING);
long usedMem = acctBreaker.getUsed();
assertThat(usedMem, greaterThan(0L));
NodesStatsResponse response = client().admin().cluster().prepareNodesStats().setIndices(true).addMetric(BREAKER.metricName()).get();
NodeStats stats = response.getNodes().get(0);
assertNotNull(stats);
SegmentsStats segmentsStats = stats.getIndices().getSegments();
CircuitBreakerStats breakerStats = stats.getBreaker().getStats(CircuitBreaker.ACCOUNTING);
assertEquals(usedMem, segmentsStats.getMemoryInBytes());
assertEquals(usedMem, breakerStats.getEstimated());
}

public void testCircuitBreakerIncrementedByIndexShard() throws Exception {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("network.breaker.inflight_requests.overhead", 0.0))
.get();

// Generate a couple of segments
client().prepareIndex("test", "_doc", "1")
.setSource("{\"foo\":\"" + randomAlphaOfLength(100) + "\"}", XContentType.JSON)
.setRefreshPolicy(IMMEDIATE)
.get();
// Use routing so 2 documents are guaranteed to be on the same shard
String routing = randomAlphaOfLength(5);
client().prepareIndex("test", "_doc", "2")
.setSource("{\"foo\":\"" + randomAlphaOfLength(100) + "\"}", XContentType.JSON)
.setRefreshPolicy(IMMEDIATE)
.setRouting(routing)
.get();
client().prepareIndex("test", "_doc", "3")
.setSource("{\"foo\":\"" + randomAlphaOfLength(100) + "\"}", XContentType.JSON)
.setRefreshPolicy(IMMEDIATE)
.setRouting(routing)
.get();

checkAccountingBreaker();
// Test that force merging causes the breaker to be correctly adjusted
logger.info("--> force merging to a single segment");
client().admin().indices().prepareForceMerge("test").setMaxNumSegments(1).setFlush(randomBoolean()).get();
client().admin().indices().prepareRefresh().get();
checkAccountingBreaker();

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("indices.breaker.total.limit", "1kb"))
.get();

// Test that we're now above the parent limit due to the segments
Exception e = expectThrows(
Exception.class,
() -> client().prepareSearch("test").addAggregation(AggregationBuilders.terms("foo_terms").field("foo.keyword")).get()
);
logger.info("--> got an expected exception", e);
assertThat(e.getCause(), notNullValue());
assertThat(e.getCause().getMessage(), containsString("[parent] Data too large, data for [<agg [foo_terms]>]"));

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().putNull("indices.breaker.total.limit").putNull("network.breaker.inflight_requests.overhead")
)
.get();

// Test that deleting the index causes the breaker to correctly be decremented
logger.info("--> deleting index");
client().admin().indices().prepareDelete("test").get();

// Accounting breaker should now be 0
CircuitBreakerService breakerService = getInstanceFromNode(CircuitBreakerService.class);
CircuitBreaker acctBreaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING);
assertThat(acctBreaker.getUsed(), equalTo(0L));
}

public static final IndexShard recoverShard(IndexShard newShard) throws IOException {
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,6 @@ public void testSegmentsStats() {

assertThat(stats.getTotal().getSegments(), notNullValue());
assertThat(stats.getTotal().getSegments().getCount(), equalTo((long) test1.totalNumShards));
assertThat(stats.getTotal().getSegments().getMemoryInBytes(), greaterThan(0L));
}

public void testAllFlags() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ protected void addCustomXContentFields(XContentBuilder builder, Params params) t
builder.field(Fields.NUM_DOCS, segment.getNumDocs());
builder.field(Fields.DELETED_DOCS, segment.getDeletedDocs());
builder.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, segment.getSize());
builder.humanReadableField(Fields.MEMORY_IN_BYTES, Fields.MEMORY, new ByteSizeValue(segment.getMemoryInBytes()));
builder.humanReadableField(Fields.MEMORY_IN_BYTES, Fields.MEMORY, segment.getZeroMemory());
builder.field(Fields.COMMITTED, segment.isCommitted());
builder.field(Fields.SEARCH, segment.isSearch());
if (segment.getVersion() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ public RecoveryStats getRecoveryStats() {

/**
* Utility method which computes total memory by adding
* FieldData, PercolatorCache, Segments (memory, index writer, version map)
* FieldData, PercolatorCache, Segments (index writer, version map)
*/
public ByteSizeValue getTotalMemory() {
long size = 0;
Expand All @@ -504,8 +504,7 @@ public ByteSizeValue getTotalMemory() {
size += this.getQueryCache().getMemorySizeInBytes();
}
if (this.getSegments() != null) {
size += this.getSegments().getMemoryInBytes() + this.getSegments().getIndexWriterMemoryInBytes() + this.getSegments()
.getVersionMapMemoryInBytes();
size += this.getSegments().getIndexWriterMemoryInBytes() + this.getSegments().getVersionMapMemoryInBytes();
}

return new ByteSizeValue(size);
Expand Down
19 changes: 1 addition & 18 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.SetOnce;
import org.opensearch.ExceptionsHelper;
Expand Down Expand Up @@ -161,14 +160,6 @@ protected Engine(EngineConfig engineConfig) {
this.eventListener = engineConfig.getEventListener();
}

/** Returns 0 in the case where accountable is null, otherwise returns {@code ramBytesUsed()} */
protected static long guardedRamBytesUsed(Accountable a) {
if (a == null) {
return 0;
}
return a.ramBytesUsed();
}

public final EngineConfig config() {
return engineConfig;
}
Expand Down Expand Up @@ -875,14 +866,7 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl
}

protected void fillSegmentStats(SegmentReader segmentReader, boolean includeSegmentFileSizes, SegmentsStats stats) {
stats.add(1, segmentReader.ramBytesUsed());
stats.addTermsMemoryInBytes(guardedRamBytesUsed(segmentReader.getPostingsReader()));
stats.addStoredFieldsMemoryInBytes(guardedRamBytesUsed(segmentReader.getFieldsReader()));
stats.addTermVectorsMemoryInBytes(guardedRamBytesUsed(segmentReader.getTermVectorsReader()));
stats.addNormsMemoryInBytes(guardedRamBytesUsed(segmentReader.getNormsReader()));
stats.addPointsMemoryInBytes(guardedRamBytesUsed(segmentReader.getPointsReader()));
stats.addDocValuesMemoryInBytes(guardedRamBytesUsed(segmentReader.getDocValuesReader()));

stats.add(1);
if (includeSegmentFileSizes) {
// TODO: consider moving this to StoreStats
stats.addFileSizes(getSegmentFileSizes(segmentReader));
Expand Down Expand Up @@ -1048,7 +1032,6 @@ private void fillSegmentInfo(SegmentReader segmentReader, boolean verbose, boole
} catch (IOException e) {
logger.trace(() -> new ParameterizedMessage("failed to get size for [{}]", info.info.name), e);
}
segment.memoryInBytes = segmentReader.ramBytesUsed();
segment.segmentSort = info.info.getIndexSort();
if (verbose) {
segment.ramTree = Accountables.namedAccountable("root", segmentReader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,10 +701,7 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external
DirectoryReader.open(indexWriter),
shardId
);
internalReaderManager = new OpenSearchReaderManager(
directoryReader,
new RamAccountingRefreshListener(engineConfig.getCircuitBreakerService())
);
internalReaderManager = new OpenSearchReaderManager(directoryReader);
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
ExternalReaderManager externalReaderManager = new ExternalReaderManager(internalReaderManager, externalRefreshListener);
success = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
package org.opensearch.index.engine;

import java.io.IOException;
import java.util.function.BiConsumer;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.ReferenceManager;
Expand All @@ -52,23 +51,15 @@
*/
@SuppressForbidden(reason = "reference counting is required here")
class OpenSearchReaderManager extends ReferenceManager<OpenSearchDirectoryReader> {
private final BiConsumer<OpenSearchDirectoryReader, OpenSearchDirectoryReader> refreshListener;

/**
* Creates and returns a new OpenSearchReaderManager from the given
* already-opened {@link OpenSearchDirectoryReader}, stealing
* the incoming reference.
*
* @param reader the directoryReader to use for future reopens
* @param refreshListener A consumer that is called every time a new reader is opened
*/
OpenSearchReaderManager(
OpenSearchDirectoryReader reader,
BiConsumer<OpenSearchDirectoryReader, OpenSearchDirectoryReader> refreshListener
) {
OpenSearchReaderManager(OpenSearchDirectoryReader reader) {
this.current = reader;
this.refreshListener = refreshListener;
refreshListener.accept(current, null);
}

@Override
Expand All @@ -79,9 +70,6 @@ protected void decRef(OpenSearchDirectoryReader reference) throws IOException {
@Override
protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader referenceToRefresh) throws IOException {
final OpenSearchDirectoryReader reader = (OpenSearchDirectoryReader) DirectoryReader.openIfChanged(referenceToRefresh);
if (reader != null) {
refreshListener.accept(reader, referenceToRefresh);
}
return reader;
}

Expand Down

This file was deleted.

Loading