Skip to content

Commit 46357ed

Browse files
update ingestion status in index metadata xcontent and avoid retry for parsing error
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent b856451 commit 46357ed

File tree

4 files changed

+56
-2
lines changed

4 files changed

+56
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
5151
- Fix QueryPhaseResultConsumer incomplete callback loops ([#19231](https://github.com/opensearch-project/OpenSearch/pull/19231))
5252
- Fix the `scaled_float` precision issue ([#19188](https://github.com/opensearch-project/OpenSearch/pull/19188))
5353
- Fix Using an excessively large reindex slice can lead to a JVM OutOfMemoryError on coordinator.([#18964](https://github.com/opensearch-project/OpenSearch/pull/18964))
54+
- Fix ingestion state xcontent serialization in IndexMetadata ([#19320](https://github.com/opensearch-project/OpenSearch/pull/19320))
5455

5556
### Dependencies
5657
- Bump `com.netflix.nebula.ospackage-base` from 12.0.0 to 12.1.0 ([#19019](https://github.com/opensearch-project/OpenSearch/pull/19019))

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -943,6 +943,7 @@ public Iterator<Setting<?>> settings() {
943943
public static final String TRANSLOG_METADATA_KEY = "translog_metadata";
944944
public static final String CONTEXT_KEY = "context";
945945
public static final String INGESTION_SOURCE_KEY = "ingestion_source";
946+
public static final String INGESTION_STATUS_KEY = "ingestion_status";
946947

947948
public static final String INDEX_STATE_FILE_PREFIX = "state-";
948949

@@ -2264,6 +2265,11 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build
22642265
indexMetadata.context.toXContent(builder, params);
22652266
}
22662267

2268+
if (indexMetadata.ingestionStatus != null) {
2269+
builder.field(INGESTION_STATUS_KEY);
2270+
indexMetadata.ingestionStatus.toXContent(builder, params);
2271+
}
2272+
22672273
builder.endObject();
22682274
}
22692275

@@ -2347,6 +2353,8 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti
23472353
parser.skipChildren();
23482354
} else if (CONTEXT_KEY.equals(currentFieldName)) {
23492355
builder.context(Context.fromXContent(parser));
2356+
} else if (INGESTION_STATUS_KEY.equals(currentFieldName)) {
2357+
builder.ingestionStatus(IngestionStatus.fromXContent(parser));
23502358
} else {
23512359
// assume it's custom index metadata
23522360
builder.putCustom(currentFieldName, parser.mapStrings());

server/src/main/java/org/opensearch/cluster/metadata/IngestionStatus.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,18 @@
1212
import org.opensearch.core.common.io.stream.StreamInput;
1313
import org.opensearch.core.common.io.stream.StreamOutput;
1414
import org.opensearch.core.common.io.stream.Writeable;
15+
import org.opensearch.core.xcontent.ToXContent;
16+
import org.opensearch.core.xcontent.XContentBuilder;
17+
import org.opensearch.core.xcontent.XContentParser;
1518

1619
import java.io.IOException;
1720

1821
/**
1922
* Indicates pull-based ingestion status.
2023
*/
2124
@ExperimentalApi
22-
public record IngestionStatus(boolean isPaused) implements Writeable {
25+
public record IngestionStatus(boolean isPaused) implements Writeable, ToXContent {
26+
public static final String IS_PAUSED = "is_paused";
2327

2428
public IngestionStatus(StreamInput in) throws IOException {
2529
this(in.readBoolean());
@@ -30,6 +34,37 @@ public void writeTo(StreamOutput out) throws IOException {
3034
out.writeBoolean(isPaused);
3135
}
3236

37+
@Override
38+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
39+
builder.startObject();
40+
builder.field(IS_PAUSED, isPaused);
41+
builder.endObject();
42+
return builder;
43+
}
44+
45+
public static IngestionStatus fromXContent(XContentParser parser) throws IOException {
46+
boolean isPaused = false;
47+
48+
XContentParser.Token token = parser.currentToken();
49+
if (token == null) {
50+
token = parser.nextToken();
51+
}
52+
53+
if (token == XContentParser.Token.START_OBJECT) {
54+
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
55+
if (token == XContentParser.Token.FIELD_NAME) {
56+
String fieldName = parser.currentName();
57+
if (IS_PAUSED.equals(fieldName)) {
58+
parser.nextToken();
59+
isPaused = parser.booleanValue();
60+
}
61+
}
62+
}
63+
}
64+
65+
return new IngestionStatus(isPaused);
66+
}
67+
3368
public static IngestionStatus getDefaultValue() {
3469
return new IngestionStatus(false);
3570
}

server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.opensearch.index.engine.IngestionEngine;
2727
import org.opensearch.index.engine.VersionConflictEngineException;
2828
import org.opensearch.index.mapper.IdFieldMapper;
29+
import org.opensearch.index.mapper.MapperParsingException;
2930
import org.opensearch.index.mapper.ParseContext;
3031
import org.opensearch.index.mapper.ParsedDocument;
3132
import org.opensearch.index.mapper.SourceToParse;
@@ -311,7 +312,7 @@ public void run() {
311312
} catch (Exception e) {
312313
messageProcessorMetrics.failedMessageCounter.inc();
313314
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.PROCESSING);
314-
boolean retriesExhausted = retryCount >= MIN_RETRY_COUNT || e instanceof IllegalArgumentException;
315+
boolean retriesExhausted = hasExhaustedRetries(e, retryCount);
315316
if (retriesExhausted && errorStrategy.shouldIgnoreError(e, IngestionErrorStrategy.ErrorStage.PROCESSING)) {
316317
logDroppedMessage(shardUpdateMessage);
317318
shardUpdateMessage = null;
@@ -336,6 +337,15 @@ private void waitBeforeRetry() {
336337
}
337338
}
338339

340+
private boolean hasExhaustedRetries(Exception e, int retryCount) {
341+
if (retryCount >= MIN_RETRY_COUNT) {
342+
return true;
343+
}
344+
345+
// Don't retry validation/parsing errors
346+
return e instanceof IllegalArgumentException || e instanceof MapperParsingException;
347+
}
348+
339349
public MessageProcessorMetrics getMessageProcessorMetrics() {
340350
return messageProcessorMetrics;
341351
}

0 commit comments

Comments
 (0)