Skip to content

Commit 08522b5

Browse files
Add ingestion error metrics and make internal queue size configurable
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent 8d3386c commit 08522b5

File tree

15 files changed

+334
-80
lines changed

15 files changed

+334
-80
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1313
- Update API of Message in index to add the timestamp for lag calculation in ingestion polling ([#17977](https://github.com/opensearch-project/OpenSearch/pull/17977/))
1414
- Enabled default throttling for all tasks submitted to cluster manager ([#17711](https://github.com/opensearch-project/OpenSearch/pull/17711))
1515
- Add composite directory factory ([#17988](https://github.com/opensearch-project/OpenSearch/pull/17988))
16+
- Add pull-based ingestion error metrics and make internal queue size configurable ([#18088](https://github.com/opensearch-project/OpenSearch/pull/18088))
1617

1718
### Changed
1819
- Change the default max header size from 8KB to 16KB. ([#18024](https://github.com/opensearch-project/OpenSearch/pull/18024))

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.opensearch.index.query.BoolQueryBuilder;
2323
import org.opensearch.index.query.RangeQueryBuilder;
2424
import org.opensearch.index.query.TermQueryBuilder;
25+
import org.opensearch.indices.pollingingest.PollingIngestStats;
2526
import org.opensearch.test.InternalTestCluster;
2627
import org.opensearch.test.OpenSearchIntegTestCase;
2728
import org.opensearch.transport.client.Requests;
@@ -135,6 +136,8 @@ public void testErrorStrategy() throws Exception {
135136
// malformed message
136137
produceData("2", "", "");
137138
produceData("3", "name3", "25");
139+
produceData("{\"_op_type\":\"invalid\",\"_source\":{\"name\":\"name4\", \"age\": 25}}");
140+
produceData("5", "name5", "25");
138141

139142
internalCluster().startClusterManagerOnlyNode();
140143
final String node = internalCluster().startDataOnlyNode();
@@ -147,6 +150,7 @@ public void testErrorStrategy() throws Exception {
147150
.put("ingestion_source.type", "kafka")
148151
.put("ingestion_source.error_strategy", "block")
149152
.put("ingestion_source.pointer.init.reset", "earliest")
153+
.put("ingestion_source.internal_queue_size", "1000")
150154
.put("ingestion_source.param.topic", topicName)
151155
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
152156
.put("index.replication.type", "SEGMENT")
@@ -165,7 +169,15 @@ public void testErrorStrategy() throws Exception {
165169
.get();
166170
waitForState(() -> "drop".equalsIgnoreCase(getSettings(indexName, "index.ingestion_source.error_strategy")));
167171
resumeIngestion(indexName);
168-
waitForSearchableDocs(2, Arrays.asList(node));
172+
waitForSearchableDocs(3, Arrays.asList(node));
173+
174+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
175+
.getPollingIngestStats();
176+
assertNotNull(stats);
177+
assertThat(stats.getMessageProcessorStats().totalFailedCount(), is(1L));
178+
assertThat(stats.getMessageProcessorStats().totalFailuresDroppedCount(), is(1L));
179+
assertThat(stats.getConsumerStats().totalConsumerErrorCount(), is(0L));
180+
assertThat(stats.getConsumerStats().totalPollerMessageDroppedCount(), is(1L));
169181
}
170182

171183
public void testPauseAndResumeIngestion() throws Exception {
@@ -372,6 +384,13 @@ public void testExternalVersioning() throws Exception {
372384
assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L));
373385
return true;
374386
});
387+
388+
// validate processor stats
389+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
390+
.getPollingIngestStats();
391+
assertNotNull(stats);
392+
assertThat(stats.getMessageProcessorStats().totalProcessedCount(), is(11L));
393+
assertThat(stats.getMessageProcessorStats().totalVersionConflictsCount(), is(3L));
375394
}
376395

377396
public void testExternalVersioningWithDisabledGCDeletes() throws Exception {

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,18 @@ public Iterator<Setting<?>> settings() {
843843
Setting.Property.Final
844844
);
845845

846+
/**
847+
* Defines the internal blocking queue size that is used to decouple poller and processor in pull-based ingestion.
848+
*/
849+
public static final String SETTING_INGESTION_SOURCE_INTERNAL_QUEUE_SIZE = "index.ingestion_source.internal_queue_size";
850+
public static final Setting<Integer> INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING = Setting.intSetting(
851+
SETTING_INGESTION_SOURCE_INTERNAL_QUEUE_SIZE,
852+
100,
853+
100,
854+
Property.IndexScope,
855+
Setting.Property.Final
856+
);
857+
846858
public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(
847859
"index.ingestion_source.param.",
848860
key -> new Setting<>(key, "", (value) -> {
@@ -1086,13 +1098,15 @@ public IngestionSource getIngestionSource() {
10861098
final long maxPollSize = INGESTION_SOURCE_MAX_POLL_SIZE.get(settings);
10871099
final int pollTimeout = INGESTION_SOURCE_POLL_TIMEOUT.get(settings);
10881100
final int numProcessorThreads = INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING.get(settings);
1101+
final int blockingQueueSize = INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING.get(settings);
10891102

10901103
return new IngestionSource.Builder(ingestionSourceType).setParams(ingestionSourceParams)
10911104
.setPointerInitReset(pointerInitReset)
10921105
.setErrorStrategy(errorStrategy)
10931106
.setMaxPollSize(maxPollSize)
10941107
.setPollTimeout(pollTimeout)
10951108
.setNumProcessorThreads(numProcessorThreads)
1109+
.setBlockingQueueSize(blockingQueueSize)
10961110
.build();
10971111
}
10981112
return null;

server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.Map;
1818
import java.util.Objects;
1919

20+
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING;
2021
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_MAX_POLL_SIZE;
2122
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING;
2223
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_POLL_TIMEOUT;
@@ -33,6 +34,7 @@ public class IngestionSource {
3334
private final long maxPollSize;
3435
private final int pollTimeout;
3536
private int numProcessorThreads;
37+
private int blockingQueueSize;
3638

3739
private IngestionSource(
3840
String type,
@@ -41,7 +43,8 @@ private IngestionSource(
4143
Map<String, Object> params,
4244
long maxPollSize,
4345
int pollTimeout,
44-
int numProcessorThreads
46+
int numProcessorThreads,
47+
int blockingQueueSize
4548
) {
4649
this.type = type;
4750
this.pointerInitReset = pointerInitReset;
@@ -50,6 +53,7 @@ private IngestionSource(
5053
this.maxPollSize = maxPollSize;
5154
this.pollTimeout = pollTimeout;
5255
this.numProcessorThreads = numProcessorThreads;
56+
this.blockingQueueSize = blockingQueueSize;
5357
}
5458

5559
public String getType() {
@@ -80,6 +84,10 @@ public int getNumProcessorThreads() {
8084
return numProcessorThreads;
8185
}
8286

87+
public int getBlockingQueueSize() {
88+
return blockingQueueSize;
89+
}
90+
8391
@Override
8492
public boolean equals(Object o) {
8593
if (this == o) return true;
@@ -91,12 +99,22 @@ public boolean equals(Object o) {
9199
&& Objects.equals(params, ingestionSource.params)
92100
&& Objects.equals(maxPollSize, ingestionSource.maxPollSize)
93101
&& Objects.equals(pollTimeout, ingestionSource.pollTimeout)
94-
&& Objects.equals(numProcessorThreads, ingestionSource.numProcessorThreads);
102+
&& Objects.equals(numProcessorThreads, ingestionSource.numProcessorThreads)
103+
&& Objects.equals(blockingQueueSize, ingestionSource.blockingQueueSize);
95104
}
96105

97106
@Override
98107
public int hashCode() {
99-
return Objects.hash(type, pointerInitReset, params, errorStrategy, maxPollSize, pollTimeout, numProcessorThreads);
108+
return Objects.hash(
109+
type,
110+
pointerInitReset,
111+
params,
112+
errorStrategy,
113+
maxPollSize,
114+
pollTimeout,
115+
numProcessorThreads,
116+
blockingQueueSize
117+
);
100118
}
101119

102120
@Override
@@ -119,6 +137,8 @@ public String toString() {
119137
+ pollTimeout
120138
+ ", numProcessorThreads="
121139
+ numProcessorThreads
140+
+ ", blockingQueueSize="
141+
+ blockingQueueSize
122142
+ '}';
123143
}
124144

@@ -175,6 +195,7 @@ public static class Builder {
175195
private long maxPollSize = INGESTION_SOURCE_MAX_POLL_SIZE.getDefault(Settings.EMPTY);
176196
private int pollTimeout = INGESTION_SOURCE_POLL_TIMEOUT.getDefault(Settings.EMPTY);
177197
private int numProcessorThreads = INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING.getDefault(Settings.EMPTY);
198+
private int blockingQueueSize = INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING.getDefault(Settings.EMPTY);
178199

179200
public Builder(String type) {
180201
this.type = type;
@@ -186,6 +207,7 @@ public Builder(IngestionSource ingestionSource) {
186207
this.pointerInitReset = ingestionSource.pointerInitReset;
187208
this.errorStrategy = ingestionSource.errorStrategy;
188209
this.params = ingestionSource.params;
210+
this.blockingQueueSize = ingestionSource.blockingQueueSize;
189211
}
190212

191213
public Builder setPointerInitReset(PointerInitReset pointerInitReset) {
@@ -223,8 +245,22 @@ public Builder setNumProcessorThreads(int numProcessorThreads) {
223245
return this;
224246
}
225247

248+
public Builder setBlockingQueueSize(int blockingQueueSize) {
249+
this.blockingQueueSize = blockingQueueSize;
250+
return this;
251+
}
252+
226253
public IngestionSource build() {
227-
return new IngestionSource(type, pointerInitReset, errorStrategy, params, maxPollSize, pollTimeout, numProcessorThreads);
254+
return new IngestionSource(
255+
type,
256+
pointerInitReset,
257+
errorStrategy,
258+
params,
259+
maxPollSize,
260+
pollTimeout,
261+
numProcessorThreads,
262+
blockingQueueSize
263+
);
228264
}
229265

230266
}

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
273273
IndexMetadata.INGESTION_SOURCE_MAX_POLL_SIZE,
274274
IndexMetadata.INGESTION_SOURCE_POLL_TIMEOUT,
275275
IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING,
276+
IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING,
276277

277278
// Settings for search replica
278279
IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING,

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ public void start() {
129129
initialPollerState,
130130
ingestionSource.getMaxPollSize(),
131131
ingestionSource.getPollTimeout(),
132-
ingestionSource.getNumProcessorThreads()
132+
ingestionSource.getNumProcessorThreads(),
133+
ingestionSource.getBlockingQueueSize()
133134
);
134135
streamPoller.start();
135136
}

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ public class DefaultStreamPoller implements StreamPoller {
5858
private Set<IngestionShardPointer> persistedPointers;
5959

6060
private final CounterMetric totalPolledCount = new CounterMetric();
61+
private final CounterMetric totalConsumerErrorCount = new CounterMetric();
62+
private final CounterMetric totalPollerMessageFailureCount = new CounterMetric();
63+
private final CounterMetric totalPollerMessageDroppedCount = new CounterMetric();
6164

6265
// A pointer to the max persisted pointer for optimizing the check
6366
@Nullable
@@ -76,13 +79,20 @@ public DefaultStreamPoller(
7679
State initialState,
7780
long maxPollSize,
7881
int pollTimeout,
79-
int numProcessorThreads
82+
int numProcessorThreads,
83+
int blockingQueueSize
8084
) {
8185
this(
8286
startPointer,
8387
persistedPointers,
8488
consumer,
85-
new PartitionedBlockingQueueContainer(numProcessorThreads, consumer.getShardId(), ingestionEngine, errorStrategy),
89+
new PartitionedBlockingQueueContainer(
90+
numProcessorThreads,
91+
consumer.getShardId(),
92+
ingestionEngine,
93+
errorStrategy,
94+
blockingQueueSize
95+
),
8696
resetState,
8797
resetValue,
8898
errorStrategy,
@@ -227,6 +237,7 @@ protected void startPoll() {
227237
// The user will have the option to manually update the offset and resume ingestion.
228238
// todo: support retry?
229239
logger.error("Pausing ingestion. Fatal error occurred in polling the shard {}: {}", consumer.getShardId(), e);
240+
totalConsumerErrorCount.inc();
230241
pause();
231242
}
232243
}
@@ -263,12 +274,15 @@ private IngestionShardPointer processRecords(
263274
e
264275
);
265276
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.POLLING);
277+
totalPollerMessageFailureCount.inc();
266278

267-
if (!errorStrategy.shouldIgnoreError(e, IngestionErrorStrategy.ErrorStage.POLLING)) {
279+
if (errorStrategy.shouldIgnoreError(e, IngestionErrorStrategy.ErrorStage.POLLING) == false) {
268280
// Blocking error encountered. Pause poller to stop processing remaining updates.
269281
pause();
270282
failedShardPointer = result.getPointer();
271283
break;
284+
} else {
285+
totalPollerMessageDroppedCount.inc();
272286
}
273287
}
274288
}
@@ -364,10 +378,20 @@ public IngestionShardPointer getBatchStartPointer() {
364378

365379
@Override
366380
public PollingIngestStats getStats() {
381+
MessageProcessorRunnable.MessageProcessorMetrics processorMetrics = blockingQueueContainer.getMessageProcessorMetrics();
367382
PollingIngestStats.Builder builder = new PollingIngestStats.Builder();
383+
// set processor stats
384+
builder.setTotalProcessedCount(processorMetrics.processedCounter().count());
385+
builder.setTotalInvalidMessageCount(processorMetrics.invalidMessageCounter().count());
386+
builder.setTotalProcessorVersionConflictsCount(processorMetrics.versionConflictCounter().count());
387+
builder.setTotalProcessorFailedCount(processorMetrics.failedMessageCounter().count());
388+
builder.setTotalProcessorFailuresDroppedCount(processorMetrics.failedMessageDroppedCounter().count());
389+
builder.setTotalProcessorThreadInterruptCount(processorMetrics.processorThreadInterruptCounter().count());
390+
// set consumer stats
368391
builder.setTotalPolledCount(totalPolledCount.count());
369-
builder.setTotalProcessedCount(blockingQueueContainer.getTotalProcessedCount());
370-
builder.setTotalSkippedCount(blockingQueueContainer.getTotalSkippedCount());
392+
builder.setTotalConsumerErrorCount(totalConsumerErrorCount.count());
393+
builder.setTotalPollerMessageFailureCount(totalPollerMessageFailureCount.count());
394+
builder.setTotalPollerMessageDroppedCount(totalPollerMessageDroppedCount.count());
371395
builder.setLagInMillis(computeLag());
372396
return builder.build();
373397
}

0 commit comments

Comments
 (0)