Skip to content

Commit

Permalink
Introducing verifylocally to perform verification without cluster bei…
Browse files Browse the repository at this point in the history
…ng formed such that a node is able to connect to connect and has necessary permissions

Signed-off-by: Dharmesh 💤 <sdharms@amazon.com>
  • Loading branch information
psychbot authored and Sachin Kale committed Aug 31, 2023
1 parent 23cc00f commit bc0e562
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
import org.opensearch.common.settings.Settings;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -85,19 +86,20 @@ private RepositoryMetadata buildRepositoryMetadata(String name) {
}

private RepositoriesMetadata buildRepositoriesMetadata() {
String segmentRepositoryName = validateAttributeNonNull(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
String translogRepositoryName = validateAttributeNonNull(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
if (segmentRepositoryName.equals(translogRepositoryName)) {
return new RepositoriesMetadata(Collections.singletonList(buildRepositoryMetadata(segmentRepositoryName)));
} else {
List<RepositoryMetadata> repositoryMetadataList = new ArrayList<>();
repositoryMetadataList.add(buildRepositoryMetadata(segmentRepositoryName));
repositoryMetadataList.add(buildRepositoryMetadata(translogRepositoryName));
return new RepositoriesMetadata(repositoryMetadataList);
List<RepositoryMetadata> repositoryMetadataList = new ArrayList<>();
Set<String> repositoryNames = new HashSet<>();

repositoryNames.add(validateAttributeNonNull(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY));

for (String repositoryName : repositoryNames) {
repositoryMetadataList.add(buildRepositoryMetadata(repositoryName));
}

return new RepositoriesMetadata(repositoryMetadataList);
}

RepositoriesMetadata getRepositoriesMetadata() {
public RepositoriesMetadata getRepositoriesMetadata() {
return this.repositoriesMetadata;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryVerificationException;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -87,50 +84,11 @@ public RemoteStoreService(Supplier<RepositoriesService> repositoriesService, Thr
* repository mentioned. This verification will happen on a local node to validate if the node is able to connect
* to the repository.
*/
public void verifyRepository(List<Repository> repositories, DiscoveryNode localNode) {
/*
public void verifyRepositoriesLocally(List<Repository> repositories, DiscoveryNode localNode) {
for (Repository repository : repositories) {
String verificationToken = repository.startVerification();
String repositoryName = repository.getMetadata().name();
try {
repository.verify(verificationToken, localNode);
logger.info(() -> new ParameterizedMessage("successfully verified [{}] repository", repositoryName));
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to verify repository", repository), e);
throw new RepositoryVerificationException(repositoryName, e.getMessage());
}
}
Replace the below code with this once #9088 is merged.
*/

for (Repository repository : repositories) {
String verificationToken = repository.startVerification();
String repositoryName = repository.getMetadata().name();
CountDownLatch repositoryVerificationLatch = new CountDownLatch(1);
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
try {
repository.verify(verificationToken, localNode);
logger.info(() -> new ParameterizedMessage("successfully verified [{}] repository", repositoryName));
repositoryVerificationLatch.countDown();
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to verify repository", repository), e);
throw new RepositoryVerificationException(repositoryName, e.getMessage());
}
});

// TODO: See if using listener here which is async makes sense, made this sync as
// we need the repository registration for remote store backed node to be completed before the
// bootstrap completes.
try {
if (repositoryVerificationLatch.await(1000, TimeUnit.MILLISECONDS) == false) {
throw new RepositoryVerificationException(
repository.getMetadata().name(),
"could not complete " + "repository verification within timeout."
);
}
} catch (InterruptedException e) {
throw new RepositoryVerificationException(repository.getMetadata().name(), e.getMessage());
}
repository.verifyLocally(localNode);
logger.info(() -> new ParameterizedMessage("successfully verified [{}] repository", repositoryName));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public static DiscoveryNode createLocal(
);
RemoteStoreNode remoteStoreNode = new RemoteStoreNode(discoveryNode);
List<Repository> repositories = remoteStoreService.createRepositories(remoteStoreNode);
remoteStoreService.verifyRepository(repositories, discoveryNode);
remoteStoreService.verifyRepositoriesLocally(repositories, discoveryNode);
return discoveryNode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ public void verify(String verificationToken, DiscoveryNode localNode) {
in.verify(verificationToken, localNode);
}

@Override
public void verifyLocally(DiscoveryNode localNode) {
in.verifyLocally(localNode);
}

@Override
public boolean isReadOnly() {
return in.isReadOnly();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@ default RepositoryStats stats() {
*/
void verify(String verificationToken, DiscoveryNode localNode);

/**
* Verifies repository settings on local node by reading and writing files onto blobstore without the
* cluster-manager.
* @param localNode the local node information
*/
void verifyLocally(DiscoveryNode localNode);

/**
* Returns true if the repository supports only read operations
* @return true if the repository is read/only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3123,12 +3123,6 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In

@Override
public void verify(String seed, DiscoveryNode localNode) {
/*
if(!isSystemRepository) {
assertSnapshotOrGenericThread();
}
Update the assertion method invocation with this once #9088 is merged.
*/
assertSnapshotOrGenericThread();
if (isReadOnly()) {
try {
Expand Down Expand Up @@ -3182,6 +3176,75 @@ public void verify(String seed, DiscoveryNode localNode) {
}
}

@Override
public void verifyLocally(DiscoveryNode localNode) {
String seed = UUIDs.randomBase64UUID();
if (isReadOnly()) {
try {
latestIndexBlobId();
} catch (Exception e) {
throw new RepositoryVerificationException(
metadata.name(),
"path " + basePath() + " is not accessible on node " + localNode,
e
);
}
} else {
BlobContainer testBlobContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed)));
String blobName = "data-" + localNode.getId() + ".dat";

// Writing test data to the repository
try {
BytesArray bytes = new BytesArray(seed);
try (InputStream stream = bytes.streamInput()) {
testBlobContainer.writeBlob(blobName, stream, bytes.length(), true);
}
} catch (Exception exp) {
throw new RepositoryVerificationException(
metadata.name(),
"store location [" + blobStore() + "] is not accessible on the node [" + localNode + "]",
exp
);
}

// Reading test data from the repository
try (InputStream localNodeDat = testBlobContainer.readBlob(blobName)) {
final String seedRead = Streams.readFully(localNodeDat).utf8ToString();
if (seedRead.equals(seed) == false) {
throw new RepositoryVerificationException(
metadata.name(),
"Seed read was [" + seedRead + "] but expected seed [" + seed + "]"
);
}
} catch (NoSuchFileException e) {
throw new RepositoryVerificationException(
metadata.name(),
"a file written to the store ["
+ blobStore()
+ "] cannot be accessed on the node ["
+ localNode
+ "]. "
+ "This might indicate that the store ["
+ blobStore()
+ "] permissions don't allow reading files",
e
);
} catch (Exception e) {
throw new RepositoryVerificationException(metadata.name(), "Failed to verify repository", e);
}

// Trying to delete the repository once the write and read verification completes. We wont fail the
// verification if the detete fails.
// TODO: See if there is a better way to handle this deletion failure.
try {
final String testPrefix = testBlobPrefix(seed);
blobStore().blobContainer(basePath().add(testPrefix)).delete();
} catch (Exception exp) {
logger.warn(() -> new ParameterizedMessage("[{}] cannot delete test data at {} {}", metadata.name(), basePath(), exp));
}
}
}

@Override
public String toString() {
return "BlobStoreRepository[" + "[" + metadata.name() + "], [" + blobStore.get() + ']' + ']';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In
@Override
public void verify(String verificationToken, DiscoveryNode localNode) {}

@Override
public void verifyLocally(DiscoveryNode localNode) {}

@Override
public void updateState(final ClusterState state) {}

Expand Down

0 comments on commit bc0e562

Please sign in to comment.