Skip to content

Commit

Permalink
Fix snapshot de/ser
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
shiv0408 committed Jun 4, 2024
1 parent 65a04a2 commit 2afe186
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("repository_state_id", entry.repositoryStateId);
if (params.param(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_API).equals(Metadata.CONTEXT_MODE_GATEWAY)) {
builder.field("state", entry.state().value);
builder.field("uuid", entry.uuid());
} // else we don't serialize it
}
builder.endObject();
Expand All @@ -218,6 +219,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}

public static SnapshotDeletionsInProgress fromXContent(XContentParser parser) throws IOException {
if (parser.currentToken() == null) { // fresh parser? move to the first token
parser.nextToken();
}
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
parser.nextToken();
}
ensureFieldName(parser, parser.currentToken(), TYPE);
parser.nextToken();
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
Expand All @@ -229,6 +236,7 @@ public static SnapshotDeletionsInProgress fromXContent(XContentParser parser) th
byte stateValue = -1;
List<SnapshotId> snapshotIds = new ArrayList<>();
TimeValue startTime = null;
String entryUUID = null;
while ((parser.nextToken()) != XContentParser.Token.END_OBJECT) {
ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser);
final String fieldName = parser.currentName();
Expand Down Expand Up @@ -270,12 +278,15 @@ public static SnapshotDeletionsInProgress fromXContent(XContentParser parser) th
case "state":
stateValue = (byte) parser.intValue();
break;
case "uuid":
entryUUID = parser.text();
break;
default:
throw new IllegalArgumentException("unknown field [" + fieldName + "]");
}
}
assert startTime != null;
entries.add(new Entry(snapshotIds, repository, startTime.millis(), repositoryStateId, State.fromValue(stateValue)));
entries.add(new Entry(snapshotIds, repository, startTime.millis(), repositoryStateId, State.fromValue(stateValue), entryUUID));
}
return SnapshotDeletionsInProgress.of(entries);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.ToXContent;
Expand All @@ -66,6 +67,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.common.xcontent.XContentUtils.readValue;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureFieldName;
import static org.opensearch.core.xcontent.XContentParserUtils.parseStringList;
Expand Down Expand Up @@ -740,7 +742,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(STATE, status.state());
builder.field(NODE, status.nodeId());
if (params.param(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_API).equals(Metadata.CONTEXT_MODE_GATEWAY)) {
builder.field(INDEX_UUID, shardId.getIndex().getUUID());
if (status.generation() != null) builder.field(GENERATION, status.generation());
if (status.reason() != null) builder.field(REASON, status.reason());
}
Expand Down Expand Up @@ -770,6 +771,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (status.reason() != null) builder.field(REASON, status.reason());
builder.endObject();
}
builder.endArray();
builder.field(REMOTE_STORE_INDEX_SHALLOW_COPY, remoteStoreIndexShallowCopy);
}
builder.endObject();
Expand All @@ -786,19 +788,18 @@ public static Entry fromXContent(XContentParser parser) throws IOException {
Version version = null;
SnapshotId source = null;
Map<String, Object> metadata = null;
byte state = -1;
State state = null;
List<IndexId> indices = new ArrayList<>();
long startTime = 0;
long repositoryStateId = -1L;
Map<ShardId, ShardSnapshotStatus> shards = new HashMap<>();
List<String> dataStreams = new ArrayList<>();
Map<RepositoryShardId, ShardSnapshotStatus> clones = new HashMap<>();
boolean remoteStoreIndexShallowCopy = false;
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String currentFieldName = parser.currentName();
parser.nextToken();

switch (currentFieldName) {
case REPOSITORY:
repository = parser.text();
Expand All @@ -816,7 +817,7 @@ public static Entry fromXContent(XContentParser parser) throws IOException {
partial = parser.booleanValue();
break;
case STATE:
state = (byte) parser.intValue();
state = State.fromString(parser.text());
break;
case INDICES:
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
Expand All @@ -836,32 +837,28 @@ public static Entry fromXContent(XContentParser parser) throws IOException {
case SHARDS:
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
String index = null;
String indexUUID = null;
Index index = null;
int shardId = -1;
String nodeId = null;
ShardState shardState = null;
String reason = null;
String generation = null;
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
final String currentShardField = parser.currentName();
parser.nextToken();
switch (currentShardField) {
case INDEX:
index = parser.text();
index = Index.fromXContent(parser);
break;
case SHARD:
shardId = parser.intValue();
break;
case INDEX_UUID:
indexUUID = parser.text();
break;
case NODE:
nodeId = parser.text();
nodeId = (String) readValue(parser, parser.currentToken());
break;
case STATE:
shardState = ShardState.fromValue((byte) parser.intValue());
shardState = ShardState.fromString(parser.text());
break;
case REASON:
reason = parser.text();
Expand All @@ -873,7 +870,7 @@ public static Entry fromXContent(XContentParser parser) throws IOException {
throw new IllegalArgumentException("unknown field [" + currentShardField + "]");
}
}
shards.put(new ShardId(index, indexUUID, shardId),
shards.put(new ShardId(index, shardId),
reason != null ? new ShardSnapshotStatus(nodeId, shardState, reason, generation) :
new ShardSnapshotStatus(nodeId, shardState, generation));
}
Expand Down Expand Up @@ -950,7 +947,7 @@ public static Entry fromXContent(XContentParser parser) throws IOException {
snapshot,
includeGlobalState,
partial,
State.fromValue(state),
state,
indices,
dataStreams,
startTime,
Expand Down Expand Up @@ -1203,6 +1200,25 @@ public static State fromValue(byte value) {
throw new IllegalArgumentException("No snapshot state for value [" + value + "]");
}
}

public static State fromString(String value) {
switch(value) {
case "INIT":
return INIT;
case "STARTED":
return STARTED;
case "SUCCESS":
return SUCCESS;
case "FAILED":
return FAILED;
case "ABORTED":
return ABORTED;
case "PARTIAL":
return PARTIAL;
default:
throw new IllegalArgumentException("No snapshot state for value [" + value + "]");
}
}
}

private final List<Entry> entries;
Expand Down Expand Up @@ -1311,6 +1327,12 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
}

public static SnapshotsInProgress fromXContent(XContentParser parser) throws IOException {
if (parser.currentToken() == null) { // fresh parser? move to the first token
parser.nextToken();
}
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
parser.nextToken();
}
ensureFieldName(parser, parser.currentToken(), SNAPSHOTS);
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.nextToken(), parser);
List<Entry> entries = new ArrayList<>();
Expand Down Expand Up @@ -1380,5 +1402,26 @@ public static ShardState fromValue(byte value) {
throw new IllegalArgumentException("No shard snapshot state for value [" + value + "]");
}
}

public static ShardState fromString(String state) {
switch (state) {
case "INIT":
return INIT;
case "SUCCESS":
return SUCCESS;
case "FAILED":
return FAILED;
case "ABORTED":
return ABORTED;
case "MISSING":
return MISSING;
case "WAITING":
return WAITING;
case "QUEUED":
return QUEUED;
default:
throw new IllegalArgumentException("No shard snapshot state for value [" + state + "]");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
}

public static IndexId fromXContent(XContentParser parser) throws IOException {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
String name = null;
String id = null;
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,27 @@
import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.ClusterState.Custom;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.SnapshotDeletionsInProgress;
import org.opensearch.cluster.SnapshotsInProgress;
import org.opensearch.cluster.SnapshotsInProgress.Entry;
import org.opensearch.cluster.SnapshotsInProgress.ShardState;
import org.opensearch.cluster.SnapshotsInProgress.State;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.UUIDs;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.repositories.IndexId;
import org.opensearch.test.AbstractDiffableWireSerializationTestCase;
import org.opensearch.test.VersionUtils;
Expand All @@ -60,6 +70,9 @@
import java.util.Map;
import java.util.stream.Collectors;

import static java.lang.Math.abs;
import static java.util.Collections.singletonMap;
import static org.opensearch.cluster.metadata.Metadata.CONTEXT_MODE_GATEWAY;
import static org.opensearch.test.VersionUtils.randomVersion;

public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireSerializationTestCase<Custom> {
Expand All @@ -84,7 +97,7 @@ private Entry randomSnapshot() {
for (int i = 0; i < numberOfIndices; i++) {
indices.add(new IndexId(randomAlphaOfLength(10), randomAlphaOfLength(10)));
}
long startTime = randomLong();
long startTime = abs(randomLong());
long repositoryStateId = randomLong();
Map<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = new HashMap<>();
final List<Index> esIndices = indices.stream()
Expand Down Expand Up @@ -183,6 +196,41 @@ protected Custom mutateInstance(Custom instance) {
return SnapshotsInProgress.of(entries);
}

public void testToXContent() throws IOException {
SnapshotsInProgress sip = SnapshotsInProgress.of(List.of(randomSnapshot(), randomSnapshot()));
boolean humanReadable = false;
XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
final MediaType mediaType = MediaTypeRegistry.JSON;
BytesReference originalBytes = toShuffledXContent(
sip,
mediaType,
new ToXContent.MapParams(singletonMap(Metadata.CONTEXT_MODE_PARAM, CONTEXT_MODE_GATEWAY)),
humanReadable
);
try (XContentParser parser = createParser(mediaType.xContent(), originalBytes)) {
SnapshotsInProgress parsed = SnapshotsInProgress.fromXContent(parser);
assertEquals(sip, parsed);
}
}

public void testToXContent_deletion() throws IOException {
SnapshotDeletionsInProgress.Entry entry = new SnapshotDeletionsInProgress.Entry(List.of(new SnapshotId("name1", "uuid1")), "repo", 10000000L, 10000L, SnapshotDeletionsInProgress.State.WAITING);
SnapshotDeletionsInProgress sdip = SnapshotDeletionsInProgress.of(List.of(entry));
boolean humanReadable = false;
XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
final MediaType mediaType = MediaTypeRegistry.JSON;
BytesReference originalBytes = toShuffledXContent(
sdip,
mediaType,
new ToXContent.MapParams(singletonMap(Metadata.CONTEXT_MODE_PARAM, CONTEXT_MODE_GATEWAY)),
humanReadable
);
try (XContentParser parser = createParser(mediaType.xContent(), originalBytes)) {
SnapshotDeletionsInProgress parsed = SnapshotDeletionsInProgress.fromXContent(parser);
assertEquals(sdip, parsed);
}
}

public void testSerDeRemoteStoreIndexShallowCopy() throws IOException {
SnapshotsInProgress.Entry entry = new SnapshotsInProgress.Entry(
new Snapshot(randomName("repo"), new SnapshotId(randomName("snap"), UUIDs.randomBase64UUID())),
Expand All @@ -191,7 +239,7 @@ public void testSerDeRemoteStoreIndexShallowCopy() throws IOException {
SnapshotsInProgressSerializationTests.randomState(Map.of()),
Collections.emptyList(),
Collections.emptyList(),
Math.abs(randomLong()),
abs(randomLong()),
randomIntBetween(0, 1000),
Map.of(),
null,
Expand Down

0 comments on commit 2afe186

Please sign in to comment.