Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batch async shard fetch transport action for replica #8218 #8356

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
07ba88a
Add batch async shard fetch transport action for replica
sudarshan-baliga Jul 24, 2023
45aadc8
Add RSA Async batch shard fetch transport integ test
sudarshan-baliga Aug 3, 2023
e2b573c
Update the documentation of TransportNodesListShardStoreMetadataBatch.
sudarshan-baliga Aug 6, 2023
86e8ec1
Merge branch 'main' into async-shard-fetch-rsatransport
sudarshan-baliga Aug 6, 2023
344a4f1
Transport RSA refactor
shiv0408 Sep 25, 2023
24ec9a2
Correcting the refernce of StoreFilesMetadata
shiv0408 Sep 27, 2023
88cb94e
Added helper class to remove code duplication
shiv0408 Dec 13, 2023
63682c1
Merge branch 'main' into async-shard-fetch-rsatransport
shiv0408 Jan 8, 2024
d623b28
Fix build failure after main merge
shiv0408 Jan 8, 2024
934ba3f
Removed AsyncBatchShardFetch
shiv0408 Jan 25, 2024
2d96a53
Address review comments
shiv0408 Jan 29, 2024
5bc6202
Spotless changes
shiv0408 Jan 29, 2024
875f4fa
Added UsingBatchAction suffix to ITs
shiv0408 Feb 1, 2024
4a4da6b
Merge branch 'main' into async-shard-fetch-rsatransport
shiv0408 Feb 2, 2024
5a328ca
Apply Spotless
shiv0408 Feb 2, 2024
5f5fbba
Fix tests
shiv0408 Feb 5, 2024
fc3ab43
Merge branch 'main' into async-shard-fetch-rsatransport
shiv0408 Feb 5, 2024
57960f9
Catch OpenSearchException as well during the batch flow
shiv0408 Feb 27, 2024
d07b86b
Merge branch 'opensearch-project:main' into async-shard-fetch-rsatran…
shiv0408 Feb 27, 2024
15af614
Address comments
shiv0408 Mar 12, 2024
d0d46ab
Apply spotless
shiv0408 Mar 12, 2024
f05dfc2
Handle index not found properly and return null in response
Mar 13, 2024
143c3d4
Catch all exceptions in Batch mode
shiv0408 Mar 13, 2024
62c11d1
Merge branch 'main' into async-shard-fetch-rsatransport
shiv0408 Mar 13, 2024
29871bf
Fix Integ Test
shiv0408 Mar 13, 2024
85faccc
Update Integration test
shiv0408 Mar 13, 2024
776f625
Remove BaseShardResponse as it'll be pushed later
Mar 14, 2024
2cc687f
Modify integ test accordingly
Mar 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -879,27 +879,36 @@ public void testShardStoreFetchNodeNotConnectedUsingBatchAction() {

public void testShardStoreFetchCorruptedIndexUsingBatchAction() throws Exception {
internalCluster().startNodes(2);
String indexName = "test";
prepareIndices(new String[] { indexName }, 1, 1);
Map<ShardId, ShardAttributes> shardAttributesMap = prepareRequestMap(new String[] { indexName }, 1);
Index index = resolveIndex(indexName);
ShardId shardId = new ShardId(index, 0);
ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName).get();
String index1Name = "test1";
String index2Name = "test2";
prepareIndices(new String[] { index1Name, index2Name }, 1, 1);
Map<ShardId, ShardAttributes> shardAttributesMap = prepareRequestMap(new String[] { index1Name, index2Name }, 1);
Index index1 = resolveIndex(index1Name);
ShardId shardId1 = new ShardId(index1, 0);
ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(index1Name).get();
assertEquals(2, searchShardsResponse.getNodes().length);
corruptShard(searchShardsResponse.getNodes()[0].getName(), shardId);
corruptShard(searchShardsResponse.getNodes()[1].getName(), shardId);

// corrupt test1 index shards
corruptShard(searchShardsResponse.getNodes()[0].getName(), shardId1);
corruptShard(searchShardsResponse.getNodes()[1].getName(), shardId1);
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(false).get();
DiscoveryNode[] discoveryNodes = getDiscoveryNodes();
TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch response;
response = ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesListShardStoreMetadataBatch.class),
new TransportNodesListShardStoreMetadataBatch.Request(shardAttributesMap, discoveryNodes)
);
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata = response.getNodesMap()
Map<ShardId, TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata> nodeStoreFilesMetadata = response.getNodesMap()
.get(discoveryNodes[0].getId())
.getNodeStoreFilesMetadataBatch()
.get(shardId);
assertNodeStoreFilesMetadataFailureCase(nodeStoreFilesMetadata, shardId);
.getNodeStoreFilesMetadataBatch();
// We don't store exception in case of corrupt index, rather just return an empty response
assertNull(nodeStoreFilesMetadata.get(shardId1).getException());
assertEquals(shardId1, nodeStoreFilesMetadata.get(shardId1).storeFilesMetadata().shardId());
assertTrue(nodeStoreFilesMetadata.get(shardId1).storeFilesMetadata().isEmpty());

Index index2 = resolveIndex(index2Name);
ShardId shardId2 = new ShardId(index2, 0);
assertNodeStoreFilesMetadataSuccessCase(nodeStoreFilesMetadata.get(shardId2), shardId2);
}

private void prepareIndices(String[] indices, int numberOfPrimaryShards, int numberOfReplicaShards) {
Expand Down Expand Up @@ -930,21 +939,11 @@ private TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch p
);
}

private void assertNodeStoreFilesMetadataFailureCase(
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata,
ShardId shardId
) {
assertNotNull(nodeStoreFilesMetadata.getStoreFileFetchException());
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFileMetadata = nodeStoreFilesMetadata.storeFilesMetadata();
assertEquals(shardId, storeFileMetadata.shardId());
assertTrue(storeFileMetadata.peerRecoveryRetentionLeases().isEmpty());
}

private void assertNodeStoreFilesMetadataSuccessCase(
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeStoreFilesMetadata,
ShardId shardId
) {
assertNull(nodeStoreFilesMetadata.getStoreFileFetchException());
assertNull(nodeStoreFilesMetadata.getException());
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFileMetadata = nodeStoreFilesMetadata.storeFilesMetadata();
assertFalse(storeFileMetadata.isEmpty());
assertEquals(shardId, storeFileMetadata.shardId());
Expand Down
51 changes: 51 additions & 0 deletions server/src/main/java/org/opensearch/gateway/BaseShardResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Base response class for shard response.
*
* @opensearch.internal
*/
public abstract class BaseShardResponse {
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved

private final Exception storeException;

public BaseShardResponse(Exception storeException) {
this.storeException = storeException;
}

Check warning on line 27 in server/src/main/java/org/opensearch/gateway/BaseShardResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/BaseShardResponse.java#L25-L27

Added lines #L25 - L27 were not covered by tests

public abstract boolean isEmpty();

public Exception getException() {
return storeException;

Check warning on line 32 in server/src/main/java/org/opensearch/gateway/BaseShardResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/BaseShardResponse.java#L32

Added line #L32 was not covered by tests
}

public BaseShardResponse(StreamInput in) throws IOException {

Check warning on line 35 in server/src/main/java/org/opensearch/gateway/BaseShardResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/BaseShardResponse.java#L35

Added line #L35 was not covered by tests
if (in.readBoolean()) {
storeException = in.readException();

Check warning on line 37 in server/src/main/java/org/opensearch/gateway/BaseShardResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/BaseShardResponse.java#L37

Added line #L37 was not covered by tests
} else {
storeException = null;

Check warning on line 39 in server/src/main/java/org/opensearch/gateway/BaseShardResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/BaseShardResponse.java#L39

Added line #L39 was not covered by tests
}
}

Check warning on line 41 in server/src/main/java/org/opensearch/gateway/BaseShardResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/BaseShardResponse.java#L41

Added line #L41 was not covered by tests

public void writeTo(StreamOutput out) throws IOException {
if (storeException != null) {
out.writeBoolean(true);
out.writeException(storeException);

Check warning on line 46 in server/src/main/java/org/opensearch/gateway/BaseShardResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/BaseShardResponse.java#L45-L46

Added lines #L45 - L46 were not covered by tests
} else {
out.writeBoolean(false);

Check warning on line 48 in server/src/main/java/org/opensearch/gateway/BaseShardResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/BaseShardResponse.java#L48

Added line #L48 was not covered by tests
}
}

Check warning on line 50 in server/src/main/java/org/opensearch/gateway/BaseShardResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/BaseShardResponse.java#L50

Added line #L50 was not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,18 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.gateway.AsyncShardFetch;
import org.opensearch.index.store.Store;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper.getListShardMetadataOnLocalNode;
import static org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper.listShardMetadataInternal;

/**
* Metadata for shard stores from a list of transport nodes
Expand Down Expand Up @@ -157,19 +155,7 @@

private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOException {
final ShardId shardId = request.getShardId();
try {
return getListShardMetadataOnLocalNode(
logger,
shardId,
nodeEnv,
indicesService,
request.getCustomDataPath(),
settings,
clusterService
);
} catch (IOException e) {
return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());
}
return listShardMetadataInternal(logger, shardId, nodeEnv, indicesService, request.getCustomDataPath(), settings, clusterService);

Check warning on line 158 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java#L158

Added line #L158 was not covered by tests
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.gateway.AsyncShardFetch;
import org.opensearch.gateway.BaseShardResponse;
import org.opensearch.index.store.Store;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata;
Expand All @@ -41,6 +42,8 @@
import java.util.Map;
import java.util.Objects;

import static org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper.INDEX_NOT_FOUND;

/**
* Transport action for fetching the batch of shard stores Metadata from a list of transport nodes
*
Expand Down Expand Up @@ -98,17 +101,17 @@
DiscoveryNode[] nodes,
ActionListener<NodesStoreFilesMetadataBatch> listener
) {
execute(new TransportNodesListShardStoreMetadataBatch.Request(shardAttributes, nodes), listener);
}

Check warning on line 105 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L104-L105

Added lines #L104 - L105 were not covered by tests

@Override
protected NodeRequest newNodeRequest(Request request) {
return new NodeRequest(request);

Check warning on line 109 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L109

Added line #L109 was not covered by tests
}

@Override
protected NodeStoreFilesMetadataBatch newNodeResponse(StreamInput in) throws IOException {
return new NodeStoreFilesMetadataBatch(in);

Check warning on line 114 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L114

Added line #L114 was not covered by tests
}

@Override
Expand All @@ -117,16 +120,16 @@
List<NodeStoreFilesMetadataBatch> responses,
List<FailedNodeException> failures
) {
return new NodesStoreFilesMetadataBatch(clusterService.getClusterName(), responses, failures);

Check warning on line 123 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L123

Added line #L123 was not covered by tests
}

@Override
protected NodeStoreFilesMetadataBatch nodeOperation(NodeRequest request) {
try {
return new NodeStoreFilesMetadataBatch(clusterService.localNode(), listStoreMetadata(request));
} catch (IOException e) {
throw new OpenSearchException(
"Failed to list store metadata for shards [" + request.getShardAttributes().keySet().stream().map(ShardId::toString) + "]",

Check warning on line 132 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L129-L132

Added lines #L129 - L132 were not covered by tests
e
);
}
Expand All @@ -137,28 +140,37 @@
* In this case we fetch the shard store files for batch of shards instead of one shard.
*/
private Map<ShardId, NodeStoreFilesMetadata> listStoreMetadata(NodeRequest request) throws IOException {
Map<ShardId, NodeStoreFilesMetadata> shardStoreMetadataMap = new HashMap<ShardId, NodeStoreFilesMetadata>();

Check warning on line 143 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L143

Added line #L143 was not covered by tests
for (ShardAttributes shardAttributes : request.getShardAttributes().values()) {
final ShardId shardId = shardAttributes.getShardId();
for (Map.Entry<ShardId, ShardAttributes> shardAttributes : request.getShardAttributes().entrySet()) {
final ShardId shardId = shardAttributes.getKey();

Check warning on line 145 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L145

Added line #L145 was not covered by tests
try {
StoreFilesMetadata storeFilesMetadata = TransportNodesListShardStoreMetadataHelper.getListShardMetadataOnLocalNode(
StoreFilesMetadata storeFilesMetadata = TransportNodesListShardStoreMetadataHelper.listShardMetadataInternal(

Check warning on line 147 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L147

Added line #L147 was not covered by tests
logger,
shardId,
nodeEnv,
indicesService,
shardAttributes.getCustomDataPath(),
shardAttributes.getValue().getCustomDataPath(),

Check warning on line 152 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L152

Added line #L152 was not covered by tests
settings,
clusterService
);
shardStoreMetadataMap.put(shardId, new NodeStoreFilesMetadata(storeFilesMetadata, null));
} catch (IOException | OpenSearchException e) {
shardStoreMetadataMap.put(
shardId,
new NodeStoreFilesMetadata(new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()), e)
);
} catch (Exception e) {

Check warning on line 157 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L156-L157

Added lines #L156 - L157 were not covered by tests
// should return null in case of known exceptions being returned from listShardMetadataInternal method.
if (e.getMessage().contains(INDEX_NOT_FOUND)) {
shardStoreMetadataMap.put(shardId, null);

Check warning on line 160 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L160

Added line #L160 was not covered by tests
} else {
// return actual exception as it is for unknown exceptions
shardStoreMetadataMap.put(

Check warning on line 163 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L163

Added line #L163 was not covered by tests
shardId,
new NodeStoreFilesMetadata(
new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()),

Check warning on line 166 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L166

Added line #L166 was not covered by tests
e
)
);
}
}
}
return shardStoreMetadataMap;

Check warning on line 173 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L171-L173

Added lines #L171 - L173 were not covered by tests
}

/**
Expand All @@ -172,24 +184,24 @@
private final Map<ShardId, ShardAttributes> shardAttributes;

public Request(StreamInput in) throws IOException {
super(in);
shardAttributes = in.readMap(ShardId::new, ShardAttributes::new);
}

Check warning on line 189 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L187-L189

Added lines #L187 - L189 were not covered by tests

public Request(Map<ShardId, ShardAttributes> shardAttributes, DiscoveryNode[] nodes) {
super(nodes);
this.shardAttributes = Objects.requireNonNull(shardAttributes);
}

Check warning on line 194 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L192-L194

Added lines #L192 - L194 were not covered by tests

public Map<ShardId, ShardAttributes> getShardAttributes() {
return shardAttributes;

Check warning on line 197 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L197

Added line #L197 was not covered by tests
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(shardAttributes, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o));
}

Check warning on line 204 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L202-L204

Added lines #L202 - L204 were not covered by tests
}

/**
Expand All @@ -200,26 +212,26 @@
public static class NodesStoreFilesMetadataBatch extends BaseNodesResponse<NodeStoreFilesMetadataBatch> {

public NodesStoreFilesMetadataBatch(StreamInput in) throws IOException {
super(in);
}

Check warning on line 216 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L215-L216

Added lines #L215 - L216 were not covered by tests

public NodesStoreFilesMetadataBatch(
ClusterName clusterName,
List<NodeStoreFilesMetadataBatch> nodes,
List<FailedNodeException> failures
) {
super(clusterName, nodes, failures);
}

Check warning on line 224 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L223-L224

Added lines #L223 - L224 were not covered by tests

@Override
protected List<NodeStoreFilesMetadataBatch> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodeStoreFilesMetadataBatch::new);

Check warning on line 228 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L228

Added line #L228 was not covered by tests
}

@Override
protected void writeNodesTo(StreamOutput out, List<NodeStoreFilesMetadataBatch> nodes) throws IOException {
out.writeList(nodes);
}

Check warning on line 234 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L233-L234

Added lines #L233 - L234 were not covered by tests
}

/**
Expand All @@ -227,55 +239,46 @@
*
* @opensearch.internal
*/
public static class NodeStoreFilesMetadata {
public static class NodeStoreFilesMetadata extends BaseShardResponse {
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved

private StoreFilesMetadata storeFilesMetadata;
private Exception storeFileFetchException;

public NodeStoreFilesMetadata(StoreFilesMetadata storeFilesMetadata) {
this.storeFilesMetadata = storeFilesMetadata;
this.storeFileFetchException = null;
@Override
public boolean isEmpty() {
return storeFilesMetadata == null || storeFilesMetadata().isEmpty() && getException() == null;
}

public NodeStoreFilesMetadata(StreamInput in) throws IOException {
storeFilesMetadata = new StoreFilesMetadata(in);
super(in);

Check warning on line 252 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L252

Added line #L252 was not covered by tests
if (in.readBoolean()) {
this.storeFileFetchException = in.readException();
this.storeFilesMetadata = new StoreFilesMetadata(in);

Check warning on line 254 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L254

Added line #L254 was not covered by tests
} else {
this.storeFileFetchException = null;
this.storeFilesMetadata = null;

Check warning on line 256 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L256

Added line #L256 was not covered by tests
}
}

Check warning on line 258 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L258

Added line #L258 was not covered by tests

public NodeStoreFilesMetadata(StoreFilesMetadata storeFilesMetadata, Exception storeFileFetchException) {
super(storeFileFetchException);
this.storeFilesMetadata = storeFilesMetadata;
this.storeFileFetchException = storeFileFetchException;
}

Check warning on line 263 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L261-L263

Added lines #L261 - L263 were not covered by tests

public StoreFilesMetadata storeFilesMetadata() {
return storeFilesMetadata;

Check warning on line 266 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L266

Added line #L266 was not covered by tests
}

public static NodeStoreFilesMetadata readListShardStoreNodeOperationResponse(StreamInput in) throws IOException {
return new NodeStoreFilesMetadata(in);
}

public void writeTo(StreamOutput out) throws IOException {
storeFilesMetadata.writeTo(out);
if (storeFileFetchException != null) {
super.writeTo(out);

Check warning on line 270 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L270

Added line #L270 was not covered by tests
if (storeFilesMetadata != null) {
out.writeBoolean(true);
out.writeException(storeFileFetchException);
storeFilesMetadata.writeTo(out);

Check warning on line 273 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L272-L273

Added lines #L272 - L273 were not covered by tests
} else {
out.writeBoolean(false);

Check warning on line 275 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L275

Added line #L275 was not covered by tests
}
}

Check warning on line 277 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L277

Added line #L277 was not covered by tests

public Exception getStoreFileFetchException() {
return storeFileFetchException;
}

@Override
public String toString() {
return "[[" + storeFilesMetadata + "]]";

Check warning on line 281 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L281

Added line #L281 was not covered by tests
}
}

Expand All @@ -289,22 +292,22 @@
private final Map<ShardId, ShardAttributes> shardAttributes;

public NodeRequest(StreamInput in) throws IOException {
super(in);
shardAttributes = in.readMap(ShardId::new, ShardAttributes::new);
}

Check warning on line 297 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L295-L297

Added lines #L295 - L297 were not covered by tests

public NodeRequest(Request request) {
this.shardAttributes = Objects.requireNonNull(request.getShardAttributes());
}

Check warning on line 301 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L299-L301

Added lines #L299 - L301 were not covered by tests

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(shardAttributes, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o));
}

Check warning on line 307 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L305-L307

Added lines #L305 - L307 were not covered by tests

public Map<ShardId, ShardAttributes> getShardAttributes() {
return shardAttributes;

Check warning on line 310 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L310

Added line #L310 was not covered by tests
}
}

Expand All @@ -316,24 +319,24 @@
private final Map<ShardId, NodeStoreFilesMetadata> nodeStoreFilesMetadataBatch;
sudarshan-baliga marked this conversation as resolved.
Show resolved Hide resolved

protected NodeStoreFilesMetadataBatch(StreamInput in) throws IOException {
super(in);
this.nodeStoreFilesMetadataBatch = in.readMap(ShardId::new, NodeStoreFilesMetadata::new);
}

Check warning on line 324 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L322-L324

Added lines #L322 - L324 were not covered by tests

public NodeStoreFilesMetadataBatch(DiscoveryNode node, Map<ShardId, NodeStoreFilesMetadata> nodeStoreFilesMetadataBatch) {
super(node);
this.nodeStoreFilesMetadataBatch = nodeStoreFilesMetadataBatch;
}

Check warning on line 329 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L327-L329

Added lines #L327 - L329 were not covered by tests

public Map<ShardId, NodeStoreFilesMetadata> getNodeStoreFilesMetadataBatch() {
return this.nodeStoreFilesMetadataBatch;

Check warning on line 332 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L332

Added line #L332 was not covered by tests
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(nodeStoreFilesMetadataBatch, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o));
}

Check warning on line 339 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java#L337-L339

Added lines #L337 - L339 were not covered by tests
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@
*
* @opensearch.internal
*/
public class TransportNodesListShardStoreMetadataHelper {

Check warning on line 50 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L50

Added line #L50 was not covered by tests
public static StoreFilesMetadata getListShardMetadataOnLocalNode(

public static final String INDEX_NOT_FOUND = "node doesn't have meta data for index ";

public static StoreFilesMetadata listShardMetadataInternal(
Logger logger,
final ShardId shardId,
NodeEnvironment nodeEnv,
Expand All @@ -57,28 +60,28 @@
Settings settings,
ClusterService clusterService
) throws IOException {
logger.trace("listing store meta data for {}", shardId);
long startTimeNS = System.nanoTime();
boolean exists = false;

Check warning on line 65 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L63-L65

Added lines #L63 - L65 were not covered by tests
try {
IndexService indexService = indicesService.indexService(shardId.getIndex());

Check warning on line 67 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L67

Added line #L67 was not covered by tests
if (indexService != null) {
IndexShard indexShard = indexService.getShardOrNull(shardId.id());

Check warning on line 69 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L69

Added line #L69 was not covered by tests
if (indexShard != null) {
try {
final StoreFilesMetadata storeFilesMetadata = new StoreFilesMetadata(

Check warning on line 72 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L72

Added line #L72 was not covered by tests
shardId,
indexShard.snapshotStoreMetadata(),
indexShard.getPeerRecoveryRetentionLeases()

Check warning on line 75 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L74-L75

Added lines #L74 - L75 were not covered by tests
);
exists = true;
return storeFilesMetadata;
} catch (org.apache.lucene.index.IndexNotFoundException e) {
logger.trace(new ParameterizedMessage("[{}] node is missing index, responding with empty", shardId), e);
throw e;
return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());
} catch (IOException e) {
logger.warn(new ParameterizedMessage("[{}] can't read metadata from store, responding with empty", shardId), e);
throw e;
return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());

Check warning on line 84 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L77-L84

Added lines #L77 - L84 were not covered by tests
}
}
}
Expand All @@ -86,40 +89,40 @@
// TODO: Fallback for BWC with older predecessor (ES) versions.
// Remove this once request.getCustomDataPath() always returns non-null
if (indexService != null) {
customDataPath = indexService.getIndexSettings().customDataPath();

Check warning on line 92 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L92

Added line #L92 was not covered by tests
} else {
IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex());

Check warning on line 94 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L94

Added line #L94 was not covered by tests
if (metadata != null) {
customDataPath = new IndexSettings(metadata, settings).customDataPath();

Check warning on line 96 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L96

Added line #L96 was not covered by tests
} else {
logger.trace("{} node doesn't have meta data for the requests index", shardId);
throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex());
throw new OpenSearchException(INDEX_NOT_FOUND + shardId.getIndex());

Check warning on line 99 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L98-L99

Added lines #L98 - L99 were not covered by tests
}
}
}
final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath);

Check warning on line 103 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L103

Added line #L103 was not covered by tests
if (shardPath == null) {
return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());

Check warning on line 105 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L105

Added line #L105 was not covered by tests
}
// note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means:
// 1) a shard is being constructed, which means the cluster-manager will not use a copy of this replica
// 2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the cluster-manager may not
// reuse local resources.
final Store.MetadataSnapshot metadataSnapshot = Store.readMetadataSnapshot(
shardPath.resolveIndex(),

Check warning on line 112 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L111-L112

Added lines #L111 - L112 were not covered by tests
shardId,
nodeEnv::shardLock,

Check warning on line 114 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L114

Added line #L114 was not covered by tests
logger
);
// We use peer recovery retention leases from the primary for allocating replicas. We should always have retention leases when
// we refresh shard info after the primary has started. Hence, we can ignore retention leases if there is no active shard.
return new StoreFilesMetadata(shardId, metadataSnapshot, Collections.emptyList());

Check warning on line 119 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L119

Added line #L119 was not covered by tests
} finally {
TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS);

Check warning on line 121 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L121

Added line #L121 was not covered by tests
if (exists) {
logger.debug("{} loaded store meta data (took [{}])", shardId, took);

Check warning on line 123 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L123

Added line #L123 was not covered by tests
} else {
logger.trace("{} didn't find any store meta data to load (took [{}])", shardId, took);

Check warning on line 125 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L125

Added line #L125 was not covered by tests
}
}
}
Expand Down Expand Up @@ -158,7 +161,7 @@
}

public ShardId shardId() {
return this.shardId;

Check warning on line 164 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L164

Added line #L164 was not covered by tests
}

public boolean isEmpty() {
Expand Down Expand Up @@ -204,13 +207,13 @@

@Override
public String toString() {
return "StoreFilesMetadata{"

Check warning on line 210 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L210

Added line #L210 was not covered by tests
+ ", shardId="
+ shardId
+ ", metadataSnapshot{size="
+ metadataSnapshot.size()

Check warning on line 214 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L214

Added line #L214 was not covered by tests
+ ", syncId="
+ metadataSnapshot.getSyncId()

Check warning on line 216 in server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java#L216

Added line #L216 was not covered by tests
+ "}"
+ '}';
}
Expand Down
Loading