Skip to content

Commit

Permalink
refactoring in remote routing table service
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
  • Loading branch information
Arpit-Bandejiya committed Jun 3, 2024
1 parent 6776c03 commit 65a04a2
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private String getIndexRoutingFileName() {
public CheckedRunnable<IOException> getAsyncIndexMetadataReadAction(
String uploadedFilename,
Index index,
LatchedActionListener<RemoteIndexRoutingResult> latchedActionListener) {
LatchedActionListener<IndexRoutingTable> latchedActionListener) {
int idx = uploadedFilename.lastIndexOf("/");
String blobFileName = uploadedFilename.substring(idx+1);
BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer( BlobPath.cleanPath().add(uploadedFilename.substring(0,idx)));
Expand All @@ -246,7 +246,7 @@ public CheckedRunnable<IOException> getAsyncIndexMetadataReadAction(
blobContainer,
blobFileName,
threadPool.executor(ThreadPool.Names.GENERIC),
ActionListener.wrap(response -> latchedActionListener.onResponse(new RemoteIndexRoutingResult(index.getName(), response.readIndexRoutingTable(index))), latchedActionListener::onFailure)
ActionListener.wrap(response -> latchedActionListener.onResponse(response.readIndexRoutingTable(index)), latchedActionListener::onFailure)
);
}

Expand All @@ -255,7 +255,6 @@ public void readAsync(BlobContainer blobContainer, String name, ExecutorService
try {
listener.onResponse(read(blobContainer, name));
} catch (Exception e) {
logger.error("routing table download failed : ", e);
listener.onFailure(e);
}
});
Expand Down Expand Up @@ -307,22 +306,4 @@ public void start() {
blobStoreRepository = (BlobStoreRepository) repository;
}

public static class RemoteIndexRoutingResult {
String indexName;
IndexRoutingTable indexRoutingTable;

public RemoteIndexRoutingResult(String indexName, IndexRoutingTable indexRoutingTable) {
this.indexName = indexName;
this.indexRoutingTable = indexRoutingTable;
}

public String getIndexName() {
return indexName;
}

public IndexRoutingTable getIndexRoutingTable() {
return indexRoutingTable;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ private ClusterState readClusterStateInParallel(
CountDownLatch latch = new CountDownLatch(totalReadTasks);
List<CheckedRunnable<IOException>> asyncMetadataReadActions = new ArrayList<>();
List<RemoteReadResult> readResults = new ArrayList<>();
List<RemoteRoutingTableService.RemoteIndexRoutingResult> readIndexRoutingTableResults = new ArrayList<>();
List<IndexRoutingTable> readIndexRoutingTableResults = new ArrayList<>();
List<Exception> exceptionList = Collections.synchronizedList(new ArrayList<>(totalReadTasks));

LatchedActionListener<RemoteReadResult> listener = new LatchedActionListener<>(
Expand All @@ -1012,7 +1012,7 @@ private ClusterState readClusterStateInParallel(
);
}

LatchedActionListener<RemoteRoutingTableService.RemoteIndexRoutingResult> routingTableLatchedActionListener = new LatchedActionListener<>(
LatchedActionListener<IndexRoutingTable> routingTableLatchedActionListener = new LatchedActionListener<>(
ActionListener.wrap(
response -> {
logger.debug("Successfully read cluster state component from remote");
Expand Down Expand Up @@ -1187,8 +1187,8 @@ private ClusterState readClusterStateInParallel(
}
});

readIndexRoutingTableResults.forEach(remoteIndexRoutingResult -> {
indicesRouting.put(remoteIndexRoutingResult.getIndexName(), remoteIndexRoutingResult.getIndexRoutingTable());
readIndexRoutingTableResults.forEach(indexRoutingTable -> {
indicesRouting.put(indexRoutingTable.getIndex().getName(), indexRoutingTable);
});

metadataBuilder.indices(indexMetadataMap);
Expand Down

0 comments on commit 65a04a2

Please sign in to comment.