Skip to content

Commit cbaddd3

Browse files
authored
add metric of skipped messages in pull-based ingestion processing (#17938)
* add metric of skipped messages in pull-based ingestion processing Signed-off-by: Yupeng Fu <yupeng@uber.com> * fix plugin Signed-off-by: Yupeng Fu <yupeng@uber.com> --------- Signed-off-by: Yupeng Fu <yupeng@uber.com>
1 parent 3593def commit cbaddd3

File tree

8 files changed

+69
-91
lines changed

8 files changed

+69
-91
lines changed

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ public void testKafkaIngestion() {
6969
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
7070
.getPollingIngestStats();
7171
assertNotNull(stats);
72-
assertThat(stats.getMessageProcessorStats().getTotalProcessedCount(), is(2L));
73-
assertThat(stats.getConsumerStats().getTotalPolledCount(), is(2L));
72+
assertThat(stats.getMessageProcessorStats().totalProcessedCount(), is(2L));
73+
assertThat(stats.getConsumerStats().totalPolledCount(), is(2L));
7474
});
7575
}
7676

plugins/ingestion-kinesis/src/internalClusterTest/java/org/opensearch/plugin/kinesis/IngestFromKinesisIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ public void testKinesisIngestion() {
9696
PollingIngestStats stats = client().admin().indices().prepareStats("test").get().getIndex("test").getShards()[0]
9797
.getPollingIngestStats();
9898
assertNotNull(stats);
99-
assertThat(stats.getMessageProcessorStats().getTotalProcessedCount(), is(2L));
100-
assertThat(stats.getConsumerStats().getTotalPolledCount(), is(2L));
99+
assertThat(stats.getMessageProcessorStats().totalProcessedCount(), is(2L));
100+
assertThat(stats.getConsumerStats().totalPolledCount(), is(2L));
101101
});
102102
}
103103

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,8 @@ public IngestionShardPointer getBatchStartPointer() {
355355
public PollingIngestStats getStats() {
356356
PollingIngestStats.Builder builder = new PollingIngestStats.Builder();
357357
builder.setTotalPolledCount(totalPolledCount.count());
358-
builder.setTotalProcessedCount(processorRunnable.getStats().count());
358+
builder.setTotalProcessedCount(processorRunnable.getProcessedCounter().count());
359+
builder.setTotalSkippedCount(processorRunnable.getSkippedCounter().count());
359360
return builder.build();
360361
}
361362

server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ public class MessageProcessorRunnable implements Runnable {
6060
private volatile IngestionErrorStrategy errorStrategy;
6161
private final BlockingQueue<IngestionShardConsumer.ReadResult<? extends IngestionShardPointer, ? extends Message>> blockingQueue;
6262
private final MessageProcessor messageProcessor;
63-
private final CounterMetric stats = new CounterMetric();
63+
private final CounterMetric processedCounter = new CounterMetric();
64+
private final CounterMetric skippedCounter = new CounterMetric();
6465

6566
// tracks the most recent pointer that is being processed
6667
@Nullable
@@ -121,12 +122,13 @@ static class MessageProcessor {
121122
*
122123
* @param message the message to process
123124
* @param pointer the pointer to the message
125+
* @param skippedCounter the counter for skipped messages
124126
*/
125-
protected void process(Message message, IngestionShardPointer pointer) {
127+
protected void process(Message message, IngestionShardPointer pointer, CounterMetric skippedCounter) {
126128
byte[] payload = (byte[]) message.getPayload();
127129

128130
try {
129-
Engine.Operation operation = getOperation(payload, pointer);
131+
Engine.Operation operation = getOperation(payload, pointer, skippedCounter);
130132
switch (operation.operationType()) {
131133
case INDEX:
132134
engine.indexInternal((Engine.Index) operation);
@@ -149,13 +151,15 @@ protected void process(Message message, IngestionShardPointer pointer) {
149151
* Visible for testing. Get the engine operation from the message.
150152
* @param payload the payload of the message
151153
* @param pointer the pointer to the message
154+
* @param skippedCounter the counter for skipped messages
152155
* @return the engine operation
153156
*/
154-
protected Engine.Operation getOperation(byte[] payload, IngestionShardPointer pointer) throws IOException {
157+
protected Engine.Operation getOperation(byte[] payload, IngestionShardPointer pointer, CounterMetric skippedCounter)
158+
throws IOException {
155159
Map<String, Object> payloadMap = getParsedPayloadMap(payload);
156160

157161
if (payloadMap.containsKey(OP_TYPE) && !(payloadMap.get(OP_TYPE) instanceof String)) {
158-
// TODO: add metric
162+
skippedCounter.inc();
159163
logger.error("_op_type field is of type {} but not string, skipping the message", payloadMap.get(OP_TYPE).getClass());
160164
return null;
161165
}
@@ -185,12 +189,12 @@ protected Engine.Operation getOperation(byte[] payload, IngestionShardPointer po
185189
switch (opType) {
186190
case INDEX:
187191
if (!payloadMap.containsKey(SOURCE)) {
188-
// TODO: add metric
192+
skippedCounter.inc();
189193
logger.error("missing _source field, skipping the message");
190194
return null;
191195
}
192196
if (!(payloadMap.get(SOURCE) instanceof Map)) {
193-
// TODO: add metric
197+
skippedCounter.inc();
194198
logger.error("_source field does not contain a map, skipping the message");
195199
return null;
196200
}
@@ -289,9 +293,9 @@ public void run() {
289293
}
290294
if (readResult != null) {
291295
try {
292-
stats.inc();
296+
processedCounter.inc();
293297
currentShardPointer = readResult.getPointer();
294-
messageProcessor.process(readResult.getMessage(), readResult.getPointer());
298+
messageProcessor.process(readResult.getMessage(), readResult.getPointer(), skippedCounter);
295299
readResult = null;
296300
} catch (VersionConflictEngineException e) {
297301
// Messages with version conflicts will be dropped. This should not have any impact to data
@@ -320,8 +324,12 @@ private void waitBeforeRetry() {
320324
}
321325
}
322326

323-
public CounterMetric getStats() {
324-
return stats;
327+
public CounterMetric getProcessedCounter() {
328+
return processedCounter;
329+
}
330+
331+
public CounterMetric getSkippedCounter() {
332+
return skippedCounter;
325333
}
326334

327335
public IngestionErrorStrategy getErrorStrategy() {

server/src/main/java/org/opensearch/indices/pollingingest/PollingIngestStats.java

Lines changed: 17 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -34,25 +34,28 @@ public PollingIngestStats(MessageProcessorStats messageProcessorStats, ConsumerS
3434

3535
public PollingIngestStats(StreamInput in) throws IOException {
3636
long totalProcessedCount = in.readLong();
37-
this.messageProcessorStats = new MessageProcessorStats(totalProcessedCount);
37+
long totalSkippedCount = in.readLong();
38+
this.messageProcessorStats = new MessageProcessorStats(totalProcessedCount, totalSkippedCount);
3839
long totalPolledCount = in.readLong();
3940
this.consumerStats = new ConsumerStats(totalPolledCount);
4041
}
4142

4243
@Override
4344
public void writeTo(StreamOutput out) throws IOException {
44-
out.writeLong(messageProcessorStats.getTotalProcessedCount());
45-
out.writeLong(consumerStats.getTotalPolledCount());
45+
out.writeLong(messageProcessorStats.totalProcessedCount);
46+
out.writeLong(messageProcessorStats.totalSkippedCount);
47+
out.writeLong(consumerStats.totalPolledCount);
4648
}
4749

4850
@Override
4951
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
5052
builder.startObject("polling_ingest_stats");
5153
builder.startObject("message_processor_stats");
52-
builder.field("total_processed_count", messageProcessorStats.getTotalProcessedCount());
54+
builder.field("total_processed_count", messageProcessorStats.totalProcessedCount);
55+
builder.field("total_skipped_count", messageProcessorStats.totalSkippedCount);
5356
builder.endObject();
5457
builder.startObject("consumer_stats");
55-
builder.field("total_polled_count", consumerStats.getTotalPolledCount());
58+
builder.field("total_polled_count", consumerStats.totalPolledCount);
5659
builder.endObject();
5760
builder.endObject();
5861
return builder;
@@ -83,58 +86,14 @@ public int hashCode() {
8386
* Stats for message processor
8487
*/
8588
@ExperimentalApi
86-
public static class MessageProcessorStats {
87-
private final long totalProcessedCount;
88-
89-
public MessageProcessorStats(long totalProcessedCount) {
90-
this.totalProcessedCount = totalProcessedCount;
91-
}
92-
93-
public long getTotalProcessedCount() {
94-
return totalProcessedCount;
95-
}
96-
97-
@Override
98-
public boolean equals(Object o) {
99-
if (this == o) return true;
100-
if (!(o instanceof MessageProcessorStats)) return false;
101-
MessageProcessorStats that = (MessageProcessorStats) o;
102-
return totalProcessedCount == that.totalProcessedCount;
103-
}
104-
105-
@Override
106-
public int hashCode() {
107-
return Objects.hash(totalProcessedCount);
108-
}
89+
public record MessageProcessorStats(long totalProcessedCount, long totalSkippedCount) {
10990
}
11091

11192
/**
11293
* Stats for consumer (poller)
11394
*/
11495
@ExperimentalApi
115-
public static class ConsumerStats {
116-
private final long totalPolledCount;
117-
118-
public ConsumerStats(long totalPolledCount) {
119-
this.totalPolledCount = totalPolledCount;
120-
}
121-
122-
public long getTotalPolledCount() {
123-
return totalPolledCount;
124-
}
125-
126-
@Override
127-
public boolean equals(Object o) {
128-
if (this == o) return true;
129-
if (!(o instanceof ConsumerStats)) return false;
130-
ConsumerStats that = (ConsumerStats) o;
131-
return totalPolledCount == that.totalPolledCount;
132-
}
133-
134-
@Override
135-
public int hashCode() {
136-
return Objects.hash(totalPolledCount);
137-
}
96+
public record ConsumerStats(long totalPolledCount) {
13897
}
13998

14099
/**
@@ -143,6 +102,7 @@ public int hashCode() {
143102
@ExperimentalApi
144103
public static class Builder {
145104
private long totalProcessedCount;
105+
private long totalSkippedCount;
146106
private long totalPolledCount;
147107

148108
public Builder() {}
@@ -157,8 +117,13 @@ public Builder setTotalPolledCount(long totalPolledCount) {
157117
return this;
158118
}
159119

120+
public Builder setTotalSkippedCount(long totalSkippedCount) {
121+
this.totalSkippedCount = totalSkippedCount;
122+
return this;
123+
}
124+
160125
public PollingIngestStats build() {
161-
MessageProcessorStats messageProcessorStats = new MessageProcessorStats(totalProcessedCount);
126+
MessageProcessorStats messageProcessorStats = new MessageProcessorStats(totalProcessedCount, totalSkippedCount);
162127
ConsumerStats consumerStats = new ConsumerStats(totalPolledCount);
163128
return new PollingIngestStats(messageProcessorStats, consumerStats);
164129
}

server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void testPauseAndResume() throws InterruptedException {
8888
doAnswer(invocation -> {
8989
pauseLatch.countDown();
9090
return null;
91-
}).when(processor).process(any(), any());
91+
}).when(processor).process(any(), any(), any());
9292

9393
poller.pause();
9494
poller.start();
@@ -99,19 +99,19 @@ public void testPauseAndResume() throws InterruptedException {
9999
assertFalse("Messages should not be processed while paused", processedWhilePaused);
100100
assertEquals(DefaultStreamPoller.State.PAUSED, poller.getState());
101101
assertTrue(poller.isPaused());
102-
verify(processor, never()).process(any(), any());
102+
verify(processor, never()).process(any(), any(), any());
103103

104104
CountDownLatch resumeLatch = new CountDownLatch(2);
105105
doAnswer(invocation -> {
106106
resumeLatch.countDown();
107107
return null;
108-
}).when(processor).process(any(), any());
108+
}).when(processor).process(any(), any(), any());
109109

110110
poller.resume();
111111
resumeLatch.await();
112112
assertFalse(poller.isPaused());
113113
// 2 messages are processed
114-
verify(processor, times(2)).process(any(), any());
114+
verify(processor, times(2)).process(any(), any(), any());
115115
}
116116

117117
public void testSkipProcessed() throws InterruptedException {
@@ -134,12 +134,12 @@ public void testSkipProcessed() throws InterruptedException {
134134
doAnswer(invocation -> {
135135
latch.countDown();
136136
return null;
137-
}).when(processor).process(any(), any());
137+
}).when(processor).process(any(), any(), any());
138138

139139
poller.start();
140140
latch.await();
141141
// 2 messages are processed, 2 messages are skipped
142-
verify(processor, times(2)).process(any(), any());
142+
verify(processor, times(2)).process(any(), any(), any());
143143
assertEquals(new FakeIngestionSource.FakeIngestionShardPointer(2), poller.getMaxPersistedPointer());
144144
}
145145

@@ -171,13 +171,13 @@ public void testResetStateEarliest() throws InterruptedException {
171171
doAnswer(invocation -> {
172172
latch.countDown();
173173
return null;
174-
}).when(processor).process(any(), any());
174+
}).when(processor).process(any(), any(), any());
175175

176176
poller.start();
177177
latch.await();
178178

179179
// 2 messages are processed
180-
verify(processor, times(2)).process(any(), any());
180+
verify(processor, times(2)).process(any(), any(), any());
181181
}
182182

183183
public void testResetStateLatest() throws InterruptedException {
@@ -195,7 +195,7 @@ public void testResetStateLatest() throws InterruptedException {
195195
poller.start();
196196
waitUntil(() -> poller.getState() == DefaultStreamPoller.State.POLLING, awaitTime, TimeUnit.MILLISECONDS);
197197
// no messages processed
198-
verify(processor, never()).process(any(), any());
198+
verify(processor, never()).process(any(), any(), any());
199199
// reset to the latest
200200
assertEquals(new FakeIngestionSource.FakeIngestionShardPointer(2), poller.getBatchStartPointer());
201201
}
@@ -215,12 +215,12 @@ public void testResetStateRewindByOffset() throws InterruptedException {
215215
doAnswer(invocation -> {
216216
latch.countDown();
217217
return null;
218-
}).when(processor).process(any(), any());
218+
}).when(processor).process(any(), any(), any());
219219

220220
poller.start();
221221
latch.await();
222222
// 1 message is processed
223-
verify(processor, times(1)).process(any(), any());
223+
verify(processor, times(1)).process(any(), any(), any());
224224
}
225225

226226
public void testStartPollWithoutStart() {
@@ -345,7 +345,7 @@ public void testProcessingErrorWithBlockErrorIngestionStrategy() throws TimeoutE
345345
messages.add("{\"_id\":\"3\",\"_source\":{\"name\":\"bob\", \"age\": 24}}".getBytes(StandardCharsets.UTF_8));
346346
messages.add("{\"_id\":\"4\",\"_source\":{\"name\":\"alice\", \"age\": 21}}".getBytes(StandardCharsets.UTF_8));
347347

348-
doThrow(new RuntimeException("Error processing update")).when(processor).process(any(), any());
348+
doThrow(new RuntimeException("Error processing update")).when(processor).process(any(), any(), any());
349349
BlockIngestionErrorStrategy mockErrorStrategy = spy(new BlockIngestionErrorStrategy("ingestion_source"));
350350
processorRunnable = new MessageProcessorRunnable(new ArrayBlockingQueue<>(5), processor, mockErrorStrategy);
351351

@@ -363,7 +363,7 @@ public void testProcessingErrorWithBlockErrorIngestionStrategy() throws TimeoutE
363363
Thread.sleep(sleepTime);
364364

365365
verify(mockErrorStrategy, times(1)).handleError(any(), eq(IngestionErrorStrategy.ErrorStage.PROCESSING));
366-
verify(processor, times(1)).process(any(), any());
366+
verify(processor, times(1)).process(any(), any(), any());
367367
// poller will continue to poll if an error is encountered during message processing but will be blocked by
368368
// the write to blockingQueue
369369
assertEquals(DefaultStreamPoller.State.POLLING, poller.getState());
@@ -402,7 +402,7 @@ public void testPersistedBatchStartPointer() throws TimeoutException, Interrupte
402402
// This test publishes 4 messages, so use blocking queue of size 3. This ensures the poller is blocked when adding the 4th message
403403
// for validation.
404404
IngestionErrorStrategy errorStrategy = spy(new BlockIngestionErrorStrategy("ingestion_source"));
405-
doThrow(new RuntimeException()).when(processor).process(any(), any());
405+
doThrow(new RuntimeException()).when(processor).process(any(), any(), any());
406406
processorRunnable = new MessageProcessorRunnable(new ArrayBlockingQueue<>(3), processor, errorStrategy);
407407

408408
IngestionShardConsumer mockConsumer = mock(IngestionShardConsumer.class);

0 commit comments

Comments
 (0)