Skip to content

Commit

Permalink
Implementation of RemoteWritableEntity for objects to uploaded to rem…
Browse files Browse the repository at this point in the history
…ote store (#13834)

* Implementation of RemoteWritableEntity for objects to uploaded to remote store

Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha authored Jun 8, 2024
1 parent fbe048f commit 49282c1
Show file tree
Hide file tree
Showing 17 changed files with 2,408 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.common.remote;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.core.action.ActionListener;

import java.io.IOException;
Expand All @@ -18,6 +19,7 @@
* @param <T> The object type which can be uploaded to or downloaded from remote storage.
* @param <U> The wrapper entity which provides methods for serializing/deserializing entity T.
*/
@ExperimentalApi
public interface RemoteWritableEntityStore<T, U extends RemoteWriteableEntity<T>> {

public void writeAsync(U entity, ActionListener<Void> listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.common.remote;

import org.opensearch.common.annotation.ExperimentalApi;

import java.io.IOException;
import java.io.InputStream;

Expand All @@ -17,6 +19,7 @@
*
* @param <T> The object type which can be uploaded to or downloaded from remote storage.
*/
@ExperimentalApi
public interface RemoteWriteableEntity<T> {
/**
* @return An InputStream created by serializing the entity T
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,30 @@

package org.opensearch.gateway.remote;

import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.core.xcontent.ToXContent;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;

/**
* Utility class for Remote Cluster State
*/
public class RemoteClusterStateUtils {

public static final String DELIMITER = "__";
public static final String PATH_DELIMITER = "/";
public static final String GLOBAL_METADATA_PATH_TOKEN = "global-metadata";
public static final String METADATA_NAME_FORMAT = "%s.dat";
public static final String METADATA_NAME_PLAIN_FORMAT = "%s";
public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1;

// ToXContent Params with gateway mode.
// We are using gateway context mode to persist all custom metadata.
public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(
Map.of(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_GATEWAY)
);

public static String encodeString(String content) {
return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.model;

import org.opensearch.common.io.Streams;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.BlobPathParameters;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;

/**
* Wrapper class for uploading/downloading {@link ClusterMetadataManifest} to/from remote blob store
*/
public class RemoteClusterMetadataManifest extends AbstractRemoteWritableBlobEntity<ClusterMetadataManifest> {

public static final String MANIFEST = "manifest";
public static final int SPLITTED_MANIFEST_FILE_LENGTH = 6;

public static final String METADATA_MANIFEST_NAME_FORMAT = "%s";
public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V3;
public static final String COMMITTED = "C";
public static final String PUBLISHED = "P";

/**
* Manifest format compatible with older codec v0, where codec version was missing.
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT_V0 =
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV0);
/**
* Manifest format compatible with older codec v1, where global metadata was missing.
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT_V1 =
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV1);

/**
* Manifest format compatible with codec v2, where we introduced codec versions/global metadata.
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>(
"cluster-metadata-manifest",
METADATA_MANIFEST_NAME_FORMAT,
ClusterMetadataManifest::fromXContent
);

private ClusterMetadataManifest clusterMetadataManifest;

public RemoteClusterMetadataManifest(
final ClusterMetadataManifest clusterMetadataManifest,
final String clusterUUID,
final Compressor compressor,
final NamedXContentRegistry namedXContentRegistry
) {
super(clusterUUID, compressor, namedXContentRegistry);
this.clusterMetadataManifest = clusterMetadataManifest;
}

public RemoteClusterMetadataManifest(
final String blobName,
final String clusterUUID,
final Compressor compressor,
final NamedXContentRegistry namedXContentRegistry
) {
super(clusterUUID, compressor, namedXContentRegistry);
this.blobName = blobName;
}

@Override
public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of(MANIFEST), MANIFEST);
}

@Override
public String generateBlobFileName() {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__
// <codec_version>
String blobFileName = String.join(
DELIMITER,
MANIFEST,
RemoteStoreUtils.invertLong(clusterMetadataManifest.getClusterTerm()),
RemoteStoreUtils.invertLong(clusterMetadataManifest.getStateVersion()),
(clusterMetadataManifest.isCommitted() ? COMMITTED : PUBLISHED),
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
String.valueOf(clusterMetadataManifest.getCodecVersion())
// Keep the codec version at last place only, during we read last place to determine codec version.
);
this.blobFileName = blobFileName;
return blobFileName;
}

@Override
public UploadedMetadata getUploadedMetadata() {
assert blobName != null;
return new UploadedMetadataAttribute(MANIFEST, blobName);
}

@Override
public InputStream serialize() throws IOException {
return CLUSTER_METADATA_MANIFEST_FORMAT.serialize(
clusterMetadataManifest,
generateBlobFileName(),
getCompressor(),
RemoteClusterStateUtils.FORMAT_PARAMS
).streamInput();
}

@Override
public ClusterMetadataManifest deserialize(final InputStream inputStream) throws IOException {
ChecksumBlobStoreFormat<ClusterMetadataManifest> blobStoreFormat = getClusterMetadataManifestBlobStoreFormat();
return blobStoreFormat.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream));
}

private int getManifestCodecVersion() {
assert blobName != null;
String[] splitName = blobName.split(DELIMITER);
if (splitName.length == SPLITTED_MANIFEST_FILE_LENGTH) {
return Integer.parseInt(splitName[splitName.length - 1]); // Last value would be codec version.
} else if (splitName.length < SPLITTED_MANIFEST_FILE_LENGTH) { // Where codec is not part of file name, i.e. default codec version 0
// is used.
return ClusterMetadataManifest.CODEC_V0;
} else {
throw new IllegalArgumentException("Manifest file name is corrupted");
}
}

private ChecksumBlobStoreFormat<ClusterMetadataManifest> getClusterMetadataManifestBlobStoreFormat() {
long codecVersion = getManifestCodecVersion();
if (codecVersion == MANIFEST_CURRENT_CODEC_VERSION) {
return CLUSTER_METADATA_MANIFEST_FORMAT;
} else if (codecVersion == ClusterMetadataManifest.CODEC_V1) {
return CLUSTER_METADATA_MANIFEST_FORMAT_V1;
} else if (codecVersion == ClusterMetadataManifest.CODEC_V0) {
return CLUSTER_METADATA_MANIFEST_FORMAT_V0;
}
throw new IllegalArgumentException("Cluster metadata manifest file is corrupted, don't have valid codec version");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.model;

import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.common.io.Streams;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.BlobPathParameters;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_PLAIN_FORMAT;

/**
* Wrapper class for uploading/downloading {@link CoordinationMetadata} to/from remote blob store
*/
public class RemoteCoordinationMetadata extends AbstractRemoteWritableBlobEntity<CoordinationMetadata> {

public static final String COORDINATION_METADATA = "coordination";
public static final ChecksumBlobStoreFormat<CoordinationMetadata> COORDINATION_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
"coordination",
METADATA_NAME_PLAIN_FORMAT,
CoordinationMetadata::fromXContent
);

private CoordinationMetadata coordinationMetadata;
private long metadataVersion;

public RemoteCoordinationMetadata(
final CoordinationMetadata coordinationMetadata,
final long metadataVersion,
final String clusterUUID,
final Compressor compressor,
final NamedXContentRegistry namedXContentRegistry
) {
super(clusterUUID, compressor, namedXContentRegistry);
this.coordinationMetadata = coordinationMetadata;
this.metadataVersion = metadataVersion;
}

public RemoteCoordinationMetadata(
final String blobName,
final String clusterUUID,
final Compressor compressor,
final NamedXContentRegistry namedXContentRegistry
) {
super(clusterUUID, compressor, namedXContentRegistry);
this.blobName = blobName;
}

@Override
public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of("global-metadata"), COORDINATION_METADATA);
}

@Override
public String generateBlobFileName() {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/<componentPrefix>__<inverted_metadata_version>__<inverted__timestamp>__<codec_version>
String blobFileName = String.join(
DELIMITER,
getBlobPathParameters().getFilePrefix(),
RemoteStoreUtils.invertLong(metadataVersion),
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION)
);
this.blobFileName = blobFileName;
return blobFileName;
}

@Override
public InputStream serialize() throws IOException {
return COORDINATION_METADATA_FORMAT.serialize(
coordinationMetadata,
generateBlobFileName(),
getCompressor(),
RemoteClusterStateUtils.FORMAT_PARAMS
).streamInput();
}

@Override
public CoordinationMetadata deserialize(final InputStream inputStream) throws IOException {
return COORDINATION_METADATA_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream));
}

@Override
public UploadedMetadata getUploadedMetadata() {
assert blobName != null;
return new UploadedMetadataAttribute(COORDINATION_METADATA, blobName);
}
}
Loading

0 comments on commit 49282c1

Please sign in to comment.