Skip to content

Commit

Permalink
[fix][flaky-test] ElasticSearchClientTests.testBulkBlocking (#16920)
Browse files Browse the repository at this point in the history
(cherry picked from commit f02679d)
  • Loading branch information
coderzc authored and Jason918 committed Sep 2, 2022
1 parent 6db6679 commit 3c25a1f
Showing 1 changed file with 49 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,20 @@
*/
package org.apache.pulsar.io.elasticsearch;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.LongAdder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.schema.GenericObject;
Expand All @@ -34,20 +47,6 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.Optional;
import java.util.UUID;

import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;

@Slf4j
public class ElasticSearchClientTests extends ElasticSearchTestBase {

Expand All @@ -67,8 +66,16 @@ public static void closeAfterClass() {
}

static class MockRecord<T> implements Record<T> {
int acked = 0;
int failed = 0;
LongAdder acked = new LongAdder();
LongAdder failed = new LongAdder();

public int getAcked() {
return acked.intValue();
}

public int getFailed() {
return failed.intValue();
}

@Override
public T getValue() {
Expand All @@ -77,12 +84,12 @@ public T getValue() {

@Override
public void ack() {
acked++;
acked.increment();
}

@Override
public void fail() {
failed++;
failed.increment();
}
}

Expand Down Expand Up @@ -166,13 +173,13 @@ public void testIndexDelete() throws Exception {
try {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.indexDocument(mockRecord, Pair.of("1", "{ \"a\":1}"));
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.getAcked(), 1);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.totalHits(index), 1);

client.deleteDocument(mockRecord, "1");
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.getAcked(), 2);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.totalHits(index), 0);
} finally {
client.delete(index);
Expand Down Expand Up @@ -229,11 +236,11 @@ public void testMalformedDocFails() throws Exception {
client.flush();
assertNotNull(client.irrecoverableError.get());
assertTrue(client.irrecoverableError.get().getMessage().contains("mapper_parsing_exception"));
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 1);
assertEquals(mockRecord.getAcked(), 1);
assertEquals(mockRecord.getFailed(), 1);
assertThrows(Exception.class, () -> client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}")));
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 2);
assertEquals(mockRecord.getAcked(), 1);
assertEquals(mockRecord.getFailed(), 2);
}
}

Expand All @@ -251,8 +258,8 @@ public void testMalformedDocIgnore() throws Exception {
client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":\"toto\"}"));
client.flush();
assertNull(client.irrecoverableError.get());
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 1);
assertEquals(mockRecord.getAcked(), 1);
assertEquals(mockRecord.getFailed(), 1);
}
}

Expand All @@ -278,8 +285,8 @@ public void testBulkRetry() throws Exception {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}"));
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.getAcked(), 2);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.totalHits(index), 2);

log.info("starting the toxic");
Expand All @@ -288,13 +295,13 @@ public void testBulkRetry() throws Exception {
toxiproxy.removeToxicAfterDelay("elasticpause", 15000);

client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.getAcked(), 2);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.totalHits(index), 2);

client.flush();
assertEquals(mockRecord.acked, 3);
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.getAcked(), 3);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.totalHits(index), 3);
} finally {
client.delete(index);
Expand Down Expand Up @@ -328,14 +335,14 @@ public void testBulkBlocking() throws Exception {
}

Awaitility.await().untilAsserted(() -> {
assertThat("acked record", mockRecord.acked, greaterThanOrEqualTo(4));
assertEquals(mockRecord.failed, 0);
assertThat("acked record", mockRecord.getAcked(), greaterThanOrEqualTo(4));
assertEquals(mockRecord.getFailed(), 0);
assertThat("totalHits", client.totalHits(index), greaterThanOrEqualTo(4L));
});
client.flush();
Awaitility.await().untilAsserted(() -> {
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.acked, 5);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(mockRecord.getAcked(), 5);
assertEquals(client.totalHits(index), 5);
});

Expand All @@ -355,10 +362,10 @@ public void testBulkBlocking() throws Exception {
log.info("elapsed = {}", elapsed);
assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy

Thread.sleep(3000L);
assertEquals(mockRecord.acked, 15);
assertEquals(mockRecord.failed, 0);
assertEquals(client.records.size(), 0);
Awaitility.await().untilAsserted(() -> {
assertEquals(mockRecord.getAcked(), 15);
assertEquals(mockRecord.getFailed(), 0);
});

} finally {
client.delete(index);
Expand Down

0 comments on commit 3c25a1f

Please sign in to comment.