Skip to content

Commit

Permalink
Add nodeId field in marker
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha committed Aug 28, 2023
1 parent f7458e9 commit 9050ef8
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ public void markLastAcceptedStateAsCommitted() {

@Override
public void close() throws IOException {
PersistedState.super.close();
remoteClusterStateService.close();
}

private void handleExceptionOnWrite(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.Version;
import org.opensearch.core.ParseField;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ConstructingObjectParser;
Expand All @@ -37,6 +38,7 @@ public class ClusterMetadataMarker implements Writeable, ToXContentFragment {
private static final ParseField CLUSTER_UUID_FIELD = new ParseField("cluster_uuid");
private static final ParseField STATE_UUID_FIELD = new ParseField("state_uuid");
private static final ParseField OPENSEARCH_VERSION_FIELD = new ParseField("opensearch_version");
private static final ParseField NODE_ID_FIELD = new ParseField("node_id");
private static final ParseField COMMITTED_FIELD = new ParseField("committed");
private static final ParseField INDICES_FIELD = new ParseField("indices");

Expand All @@ -60,12 +62,16 @@ private static Version opensearchVersion(Object[] fields) {
return Version.fromId((int) fields[4]);
}

private static String nodeId(Object[] fields) {
return (String) fields[5];
}

private static boolean committed(Object[] fields) {
return (boolean) fields[5];
return (boolean) fields[6];
}

private static List<UploadedIndexMetadata> indices(Object[] fields) {
return (List<UploadedIndexMetadata>) fields[6];
return (List<UploadedIndexMetadata>) fields[7];
}

private static final ConstructingObjectParser<ClusterMetadataMarker, Void> PARSER = new ConstructingObjectParser<>(
Expand All @@ -76,6 +82,7 @@ private static List<UploadedIndexMetadata> indices(Object[] fields) {
clusterUUID(fields),
stateUUID(fields),
opensearchVersion(fields),
nodeId(fields),
committed(fields),
indices(fields)
)
Expand All @@ -87,6 +94,7 @@ private static List<UploadedIndexMetadata> indices(Object[] fields) {
PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_UUID_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), OPENSEARCH_VERSION_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), NODE_ID_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), COMMITTED_FIELD);
PARSER.declareObjectArray(
ConstructingObjectParser.constructorArg(),
Expand All @@ -101,6 +109,7 @@ private static List<UploadedIndexMetadata> indices(Object[] fields) {
private final String clusterUUID;
private final String stateUUID;
private final Version opensearchVersion;
private final String nodeId;
private final boolean committed;

public List<UploadedIndexMetadata> getIndices() {
Expand All @@ -127,6 +136,10 @@ public Version getOpensearchVersion() {
return opensearchVersion;
}

public String getNodeId() {
return nodeId;
}

public boolean isCommitted() {
return committed;
}
Expand All @@ -137,6 +150,7 @@ public ClusterMetadataMarker(
String clusterUUID,
String stateUUID,
Version opensearchVersion,
String nodeId,
boolean committed,
List<UploadedIndexMetadata> indices
) {
Expand All @@ -145,21 +159,38 @@ public ClusterMetadataMarker(
this.clusterUUID = clusterUUID;
this.stateUUID = stateUUID;
this.opensearchVersion = opensearchVersion;
this.nodeId = nodeId;
this.committed = committed;
this.indices = Collections.unmodifiableList(indices);
}

public ClusterMetadataMarker(StreamInput in) throws IOException {
this.clusterTerm = in.readVLong();
this.stateVersion = in.readVLong();
this.clusterUUID = in.readString();
this.stateUUID = in.readString();
this.opensearchVersion = Version.fromId(in.readInt());
this.nodeId = in.readString();
this.committed = in.readBoolean();
this.indices = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new));
}

public static Builder builder() {
return new Builder();
}

public static Builder builder(ClusterMetadataMarker marker) {
return new Builder(marker);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(CLUSTER_TERM_FIELD.getPreferredName(), getClusterTerm())
.field(STATE_VERSION_FIELD.getPreferredName(), getStateVersion())
.field(CLUSTER_UUID_FIELD.getPreferredName(), getClusterUUID())
.field(STATE_UUID_FIELD.getPreferredName(), getStateUUID())
.field(OPENSEARCH_VERSION_FIELD.getPreferredName(), getOpensearchVersion().id)
.field(NODE_ID_FIELD.getPreferredName(), getNodeId())
.field(COMMITTED_FIELD.getPreferredName(), isCommitted());
builder.startArray(INDICES_FIELD.getPreferredName());
{
Expand All @@ -178,6 +209,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(clusterUUID);
out.writeString(stateUUID);
out.writeInt(opensearchVersion.id);
out.writeString(nodeId);
out.writeBoolean(committed);
out.writeCollection(indices);
}
Expand All @@ -197,12 +229,13 @@ public boolean equals(Object o) {
&& Objects.equals(clusterUUID, that.clusterUUID)
&& Objects.equals(stateUUID, that.stateUUID)
&& Objects.equals(opensearchVersion, that.opensearchVersion)
&& Objects.equals(nodeId, that.nodeId)
&& Objects.equals(committed, that.committed);
}

@Override
public int hashCode() {
return Objects.hash(indices, clusterTerm, stateVersion, clusterUUID, stateUUID, opensearchVersion, committed);
return Objects.hash(indices, clusterTerm, stateVersion, clusterUUID, stateUUID, opensearchVersion, nodeId, committed);
}

@Override
Expand All @@ -227,6 +260,7 @@ public static class Builder {
private String clusterUUID;
private String stateUUID;
private Version opensearchVersion;
private String nodeId;
private boolean committed;

public Builder indices(List<UploadedIndexMetadata> indices) {
Expand Down Expand Up @@ -259,6 +293,11 @@ public Builder opensearchVersion(Version opensearchVersion) {
return this;
}

public Builder nodeId(String nodeId) {
this.nodeId = nodeId;
return this;
}

public Builder committed(boolean committed) {
this.committed = committed;
return this;
Expand All @@ -272,8 +311,28 @@ public Builder() {
indices = new ArrayList<>();
}

public Builder(ClusterMetadataMarker marker) {
this.clusterTerm = marker.clusterTerm;
this.stateVersion = marker.stateVersion;
this.clusterUUID = marker.clusterUUID;
this.stateUUID = marker.stateUUID;
this.opensearchVersion = marker.opensearchVersion;
this.nodeId = marker.nodeId;
this.committed = marker.committed;
this.indices = new ArrayList<>(marker.indices);
}

public ClusterMetadataMarker build() {
return new ClusterMetadataMarker(clusterTerm, stateVersion, clusterUUID, stateUUID, opensearchVersion, committed, indices);
return new ClusterMetadataMarker(
clusterTerm,
stateVersion,
clusterUUID,
stateUUID,
opensearchVersion,
nodeId,
committed,
indices
);
}

}
Expand Down Expand Up @@ -322,6 +381,12 @@ public UploadedIndexMetadata(String indexName, String indexUUID, String uploaded
this.uploadedFilename = uploadedFileName;
}

public UploadedIndexMetadata(StreamInput in) throws IOException {
this.indexName = in.readString();
this.indexUUID = in.readString();
this.uploadedFilename = in.readString();
}

public String getUploadedFilename() {
return uploadedFilename;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.gateway.remote.ClusterMetadataMarker.UploadedIndexMetadata;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;

import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -46,7 +48,7 @@
*
* @opensearch.internal
*/
public class RemoteClusterStateService {
public class RemoteClusterStateService implements Closeable {

public static final String METADATA_NAME_FORMAT = "%s.dat";

Expand Down Expand Up @@ -85,18 +87,21 @@ public class RemoteClusterStateService {

private static final String DELIMITER = "__";

private final String nodeId;
private final Supplier<RepositoriesService> repositoriesService;
private final Settings settings;
private final LongSupplier relativeTimeMillisSupplier;
private BlobStoreRepository blobStoreRepository;
private volatile TimeValue slowWriteLoggingThreshold;

public RemoteClusterStateService(
String nodeId,
Supplier<RepositoriesService> repositoriesService,
Settings settings,
ClusterSettings clusterSettings,
LongSupplier relativeTimeMillisSupplier
) {
this.nodeId = nodeId;
this.repositoriesService = repositoriesService;
this.settings = settings;
this.relativeTimeMillisSupplier = relativeTimeMillisSupplier;
Expand Down Expand Up @@ -264,6 +269,13 @@ public ClusterState getLatestClusterState(String clusterUUID) {
return null;
}

@Override
public void close() throws IOException {
if (blobStoreRepository != null) {
IOUtils.close(blobStoreRepository);
}
}

// Visible for testing
void ensureRepositorySet() {
if (blobStoreRepository != null) {
Expand All @@ -289,6 +301,7 @@ private ClusterMetadataMarker uploadMarker(
clusterState.metadata().clusterUUID(),
clusterState.stateUUID(),
Version.CURRENT,
nodeId,
committed,
uploadedIndexMetadata
);
Expand Down
Loading

0 comments on commit 9050ef8

Please sign in to comment.