Skip to content

Commit

Permalink
Removing verifyLocally and move to existing verify methods. If the re…
Browse files Browse the repository at this point in the history
…pository is a system repository only local verification will happen

Signed-off-by: Dharmesh 💤 <sdharms@amazon.com>
  • Loading branch information
psychbot authored and Sachin Kale committed Aug 31, 2023
1 parent c94713e commit d0c2b5b
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.util.ArrayList;
import java.util.HashSet;
Expand Down Expand Up @@ -82,6 +83,9 @@ private RepositoryMetadata buildRepositoryMetadata(String name) {
Settings.Builder settings = Settings.builder();
settingsMap.forEach(settings::put);

// Repository metadata built here will always be for a system repository.
settings.put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true);

return new RepositoryMetadata(name, type, settings.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ public RemoteStoreService(Supplier<RepositoriesService> repositoriesService, Thr
public void verifyRepositoriesLocally(List<Repository> repositories, DiscoveryNode localNode) {
for (Repository repository : repositories) {
String repositoryName = repository.getMetadata().name();
repository.verifyLocally(localNode);
String verificationToken = repository.startVerification();
repository.verify(verificationToken, localNode);
repository.endVerification(verificationToken);
logger.info(() -> new ParameterizedMessage("successfully verified [{}] repository", repositoryName));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,13 @@ public void verify(String verificationToken, DiscoveryNode localNode) {
}

@Override
public void verifyLocally(DiscoveryNode localNode) {
in.verifyLocally(localNode);
public boolean isReadOnly() {
return in.isReadOnly();
}

@Override
public boolean isReadOnly() {
return in.isReadOnly();
public boolean isSystemRepository() {
return in.isSystemRepository();
}

@Override
Expand Down
14 changes: 7 additions & 7 deletions server/src/main/java/org/opensearch/repositories/Repository.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,19 +241,19 @@ default RepositoryStats stats() {
*/
void verify(String verificationToken, DiscoveryNode localNode);

/**
* Verifies repository settings on local node by reading and writing files onto blobstore without the
* cluster-manager.
* @param localNode the local node information
*/
void verifyLocally(DiscoveryNode localNode);

/**
* Returns true if the repository supports only read operations
* @return true if the repository is read/only
*/
boolean isReadOnly();

/**
* Returns true if the repository is managed by the system directly and doesn't allow managing the lifetime of the
* repository through external APIs
* @return true if the repository is system managed
*/
boolean isSystemRepository();

/**
* Creates a snapshot of the shard based on the index commit point.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*/
public static final Setting<Boolean> READONLY_SETTING = Setting.boolSetting("readonly", false, Setting.Property.NodeScope);

/***
* Setting to set repository as system repository
*/
public static final Setting<Boolean> SYSTEM_REPOSITORY_SETTING = Setting.boolSetting(
"system_repository",
false,
Setting.Property.NodeScope
);

protected final boolean supportURLRepo;

private final int maxShardBlobDeleteBatch;
Expand Down Expand Up @@ -346,6 +355,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private final boolean readOnly;

private final boolean isSystemRepository;

private final Object lock = new Object();

private final SetOnce<BlobContainer> blobContainer = new SetOnce<>();
Expand Down Expand Up @@ -411,6 +422,7 @@ protected BlobStoreRepository(
remoteUploadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_upload_bytes_per_sec", ByteSizeValue.ZERO);
remoteDownloadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_download_bytes_per_sec", ByteSizeValue.ZERO);
readOnly = READONLY_SETTING.get(metadata.settings());
isSystemRepository = SYSTEM_REPOSITORY_SETTING.get(metadata.settings());
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes());
maxShardBlobDeleteBatch = MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.get(metadata.settings());
Expand Down Expand Up @@ -1823,8 +1835,10 @@ public String startVerification() {
byte[] testBytes = Strings.toUTF8Bytes(seed);
BlobContainer testContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed)));
BytesArray bytes = new BytesArray(testBytes);
try (InputStream stream = bytes.streamInput()) {
testContainer.writeBlobAtomic("master.dat", stream, bytes.length(), true);
if (isSystemRepository == false) {
try (InputStream stream = bytes.streamInput()) {
testContainer.writeBlobAtomic("master.dat", stream, bytes.length(), true);
}
}
return seed;
}
Expand Down Expand Up @@ -2133,6 +2147,11 @@ public boolean isReadOnly() {
return readOnly;
}

@Override
public boolean isSystemRepository() {
return isSystemRepository;
}

/**
* Writing a new index generation is a three step process.
* First, the {@link RepositoryMetadata} entry for this repository is set into a pending state by incrementing its
Expand Down Expand Up @@ -3148,99 +3167,33 @@ public void verify(String seed, DiscoveryNode localNode) {
exp
);
}
try (InputStream masterDat = testBlobContainer.readBlob("master.dat")) {
final String seedRead = Streams.readFully(masterDat).utf8ToString();
if (seedRead.equals(seed) == false) {
throw new RepositoryVerificationException(
metadata.name(),
"Seed read from master.dat was [" + seedRead + "] but expected seed [" + seed + "]"
);
}
} catch (NoSuchFileException e) {
throw new RepositoryVerificationException(
metadata.name(),
"a file written by cluster-manager to the store ["
+ blobStore()
+ "] cannot be accessed on the node ["
+ localNode
+ "]. "
+ "This might indicate that the store ["
+ blobStore()
+ "] is not shared between this node and the cluster-manager node or "
+ "that permissions on the store don't allow reading files written by the cluster-manager node",
e
);
} catch (Exception e) {
throw new RepositoryVerificationException(metadata.name(), "Failed to verify repository", e);
}
}
}

@Override
public void verifyLocally(DiscoveryNode localNode) {
String seed = UUIDs.randomBase64UUID();
if (isReadOnly()) {
try {
latestIndexBlobId();
} catch (Exception e) {
throw new RepositoryVerificationException(
metadata.name(),
"path " + basePath() + " is not accessible on node " + localNode,
e
);
}
} else {
BlobContainer testBlobContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed)));
String blobName = "data-" + localNode.getId() + ".dat";

// Writing test data to the repository
try {
BytesArray bytes = new BytesArray(seed);
try (InputStream stream = bytes.streamInput()) {
testBlobContainer.writeBlob(blobName, stream, bytes.length(), true);
}
} catch (Exception exp) {
throw new RepositoryVerificationException(
metadata.name(),
"store location [" + blobStore() + "] is not accessible on the node [" + localNode + "]",
exp
);
}

// Reading test data from the repository
try (InputStream localNodeDat = testBlobContainer.readBlob(blobName)) {
final String seedRead = Streams.readFully(localNodeDat).utf8ToString();
if (seedRead.equals(seed) == false) {
if (isSystemRepository == false) {
try (InputStream masterDat = testBlobContainer.readBlob("master.dat")) {
final String seedRead = Streams.readFully(masterDat).utf8ToString();
if (seedRead.equals(seed) == false) {
throw new RepositoryVerificationException(
metadata.name(),
"Seed read from master.dat was [" + seedRead + "] but expected seed [" + seed + "]"
);
}
} catch (NoSuchFileException e) {
throw new RepositoryVerificationException(
metadata.name(),
"Seed read was [" + seedRead + "] but expected seed [" + seed + "]"
"a file written by cluster-manager to the store ["
+ blobStore()
+ "] cannot be accessed on the node ["
+ localNode
+ "]. "
+ "This might indicate that the store ["
+ blobStore()
+ "] is not shared between this node and the cluster-manager node or "
+ "that permissions on the store don't allow reading files written by the cluster-manager node",
e
);
} catch (Exception e) {
throw new RepositoryVerificationException(metadata.name(), "Failed to verify repository", e);
}
} catch (NoSuchFileException e) {
throw new RepositoryVerificationException(
metadata.name(),
"a file written to the store ["
+ blobStore()
+ "] cannot be accessed on the node ["
+ localNode
+ "]. "
+ "This might indicate that the store ["
+ blobStore()
+ "] permissions don't allow reading files",
e
);
} catch (Exception e) {
throw new RepositoryVerificationException(metadata.name(), "Failed to verify repository", e);
}

// Trying to delete the repository once the write and read verification completes. We wont fail the
// verification if the detete fails.
// TODO: See if there is a better way to handle this deletion failure.
try {
final String testPrefix = testBlobPrefix(seed);
blobStore().blobContainer(basePath().add(testPrefix)).delete();
} catch (Exception exp) {
logger.warn(() -> new ParameterizedMessage("[{}] cannot delete test data at {} {}", metadata.name(), basePath(), exp));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,12 @@ public void verify(String verificationToken, DiscoveryNode localNode) {
}

@Override
public void verifyLocally(DiscoveryNode localNode) {}
public boolean isReadOnly() {
return false;
}

@Override
public boolean isReadOnly() {
public boolean isSystemRepository() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ public boolean isReadOnly() {
return false;
}

@Override
public boolean isSystemRepository() {
return false;
}

@Override
public void snapshotShard(
Store store,
Expand All @@ -195,9 +200,6 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In
@Override
public void verify(String verificationToken, DiscoveryNode localNode) {}

@Override
public void verifyLocally(DiscoveryNode localNode) {}

@Override
public void updateState(final ClusterState state) {}

Expand Down

0 comments on commit d0c2b5b

Please sign in to comment.