diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java index 599a68dcfaf06..3403baf4d8279 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java @@ -18,7 +18,20 @@ */ package org.apache.pulsar.io.elasticsearch; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; import eu.rekawek.toxiproxy.model.ToxicDirection; +import java.io.IOException; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.atomic.LongAdder; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.schema.GenericObject; @@ -34,20 +47,6 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.io.IOException; -import java.util.Optional; -import java.util.UUID; - -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertThrows; -import static org.testng.Assert.assertTrue; - @Slf4j public class ElasticSearchClientTests extends ElasticSearchTestBase { @@ -67,8 +66,16 @@ public static void closeAfterClass() { } static class MockRecord implements Record { - int acked = 0; - int failed = 0; + LongAdder acked = new LongAdder(); + LongAdder failed = new LongAdder(); + + public int getAcked() { + return acked.intValue(); + } + + public int getFailed() { + return failed.intValue(); + } @Override public T getValue() { @@ -77,12 +84,12 @@ public T getValue() { @Override public void ack() { - acked++; + acked.increment(); } @Override public void fail() { - failed++; + failed.increment(); } } @@ -166,13 +173,13 @@ public void testIndexDelete() throws Exception { try { MockRecord mockRecord = new MockRecord<>(); client.indexDocument(mockRecord, Pair.of("1", "{ \"a\":1}")); - assertEquals(mockRecord.acked, 1); - assertEquals(mockRecord.failed, 0); + assertEquals(mockRecord.getAcked(), 1); + assertEquals(mockRecord.getFailed(), 0); assertEquals(client.totalHits(index), 1); client.deleteDocument(mockRecord, "1"); - assertEquals(mockRecord.acked, 2); - assertEquals(mockRecord.failed, 0); + assertEquals(mockRecord.getAcked(), 2); + assertEquals(mockRecord.getFailed(), 0); assertEquals(client.totalHits(index), 0); } finally { client.delete(index); @@ -229,11 +236,11 @@ public void testMalformedDocFails() throws Exception { client.flush(); assertNotNull(client.irrecoverableError.get()); assertTrue(client.irrecoverableError.get().getMessage().contains("mapper_parsing_exception")); - assertEquals(mockRecord.acked, 1); - assertEquals(mockRecord.failed, 1); + assertEquals(mockRecord.getAcked(), 1); + assertEquals(mockRecord.getFailed(), 1); assertThrows(Exception.class, () -> client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"))); - assertEquals(mockRecord.acked, 1); - assertEquals(mockRecord.failed, 2); + assertEquals(mockRecord.getAcked(), 1); + assertEquals(mockRecord.getFailed(), 2); } } @@ -251,8 +258,8 @@ public void testMalformedDocIgnore() throws Exception { client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":\"toto\"}")); client.flush(); assertNull(client.irrecoverableError.get()); - assertEquals(mockRecord.acked, 1); - assertEquals(mockRecord.failed, 1); + assertEquals(mockRecord.getAcked(), 1); + assertEquals(mockRecord.getFailed(), 1); } } @@ -278,8 +285,8 @@ public void testBulkRetry() throws Exception { MockRecord mockRecord = new MockRecord<>(); client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}")); client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}")); - assertEquals(mockRecord.acked, 2); - assertEquals(mockRecord.failed, 0); + assertEquals(mockRecord.getAcked(), 2); + assertEquals(mockRecord.getFailed(), 0); assertEquals(client.totalHits(index), 2); log.info("starting the toxic"); @@ -288,13 +295,13 @@ public void testBulkRetry() throws Exception { toxiproxy.removeToxicAfterDelay("elasticpause", 15000); client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}")); - assertEquals(mockRecord.acked, 2); - assertEquals(mockRecord.failed, 0); + assertEquals(mockRecord.getAcked(), 2); + assertEquals(mockRecord.getFailed(), 0); assertEquals(client.totalHits(index), 2); client.flush(); - assertEquals(mockRecord.acked, 3); - assertEquals(mockRecord.failed, 0); + assertEquals(mockRecord.getAcked(), 3); + assertEquals(mockRecord.getFailed(), 0); assertEquals(client.totalHits(index), 3); } finally { client.delete(index); @@ -328,14 +335,14 @@ public void testBulkBlocking() throws Exception { } Awaitility.await().untilAsserted(() -> { - assertThat("acked record", mockRecord.acked, greaterThanOrEqualTo(4)); - assertEquals(mockRecord.failed, 0); + assertThat("acked record", mockRecord.getAcked(), greaterThanOrEqualTo(4)); + assertEquals(mockRecord.getFailed(), 0); assertThat("totalHits", client.totalHits(index), greaterThanOrEqualTo(4L)); }); client.flush(); Awaitility.await().untilAsserted(() -> { - assertEquals(mockRecord.failed, 0); - assertEquals(mockRecord.acked, 5); + assertEquals(mockRecord.getFailed(), 0); + assertEquals(mockRecord.getAcked(), 5); assertEquals(client.totalHits(index), 5); }); @@ -355,10 +362,10 @@ public void testBulkBlocking() throws Exception { log.info("elapsed = {}", elapsed); assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy - Thread.sleep(3000L); - assertEquals(mockRecord.acked, 15); - assertEquals(mockRecord.failed, 0); - assertEquals(client.records.size(), 0); + Awaitility.await().untilAsserted(() -> { + assertEquals(mockRecord.getAcked(), 15); + assertEquals(mockRecord.getFailed(), 0); + }); } finally { client.delete(index);