Skip to content

Commit

Permalink
[fix][flaky-test] ElasticSearchClientTests.testBulkBlocking (#16920)
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc authored Aug 9, 2022
1 parent fd8ebaa commit f02679d
Showing 1 changed file with 50 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,20 @@
*/
package org.apache.pulsar.io.elasticsearch;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.LongAdder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.schema.GenericObject;
Expand All @@ -34,20 +47,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.Optional;
import java.util.UUID;

import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;

@Slf4j
public abstract class ElasticSearchClientTests extends ElasticSearchTestBase {
public final static String INDEX = "myindex";
Expand Down Expand Up @@ -78,8 +77,16 @@ public static void closeAfterClass() {
}

static class MockRecord<T> implements Record<T> {
int acked = 0;
int failed = 0;
LongAdder acked = new LongAdder();
LongAdder failed = new LongAdder();

public int getAcked() {
return acked.intValue();
}

public int getFailed() {
return failed.intValue();
}

@Override
public T getValue() {
Expand All @@ -88,12 +95,12 @@ public T getValue() {

@Override
public void ack() {
acked++;
acked.increment();
}

@Override
public void fail() {
failed++;
failed.increment();
}
}

Expand Down Expand Up @@ -152,13 +159,13 @@ public void testIndexDelete() throws Exception {
try {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.indexDocument(mockRecord, Pair.of("1", "{ \"a\":1}"));
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.getAcked(), 1);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.getRestClient().totalHits(index), 1);

client.deleteDocument(mockRecord, "1");
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.getAcked(), 2);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.getRestClient().totalHits(index), 0);
} finally {
client.getRestClient().deleteIndex(index);
Expand Down Expand Up @@ -216,11 +223,11 @@ public void testMalformedDocFails() throws Exception {
client.flush();
assertNotNull(client.irrecoverableError.get());
assertTrue(client.irrecoverableError.get().getMessage().contains("mapper_parsing_exception"));
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 1);
assertEquals(mockRecord.getAcked(), 1);
assertEquals(mockRecord.getFailed(), 1);
assertThrows(Exception.class, () -> client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}")));
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 2);
assertEquals(mockRecord.getAcked(), 1);
assertEquals(mockRecord.getFailed(), 2);
}
}

Expand All @@ -239,8 +246,8 @@ public void testMalformedDocIgnore() throws Exception {
client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":\"toto\"}"));
client.flush();
assertNull(client.irrecoverableError.get());
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 1);
assertEquals(mockRecord.getAcked(), 1);
assertEquals(mockRecord.getFailed(), 1);
}
}

Expand All @@ -266,8 +273,8 @@ public void testBulkRetry() throws Exception {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}"));
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.getAcked(), 2);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.getRestClient().totalHits(index), 2);

log.info("starting the toxic");
Expand All @@ -276,13 +283,13 @@ public void testBulkRetry() throws Exception {
toxiproxy.removeToxicAfterDelay("elasticpause", 15000);

client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.getAcked(), 2);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.getRestClient().totalHits(index), 2);

client.flush();
assertEquals(mockRecord.acked, 3);
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.getAcked(), 3);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.getRestClient().totalHits(index), 3);
} finally {
client.getRestClient().deleteIndex(index);
Expand Down Expand Up @@ -316,14 +323,14 @@ public void testBulkBlocking() throws Exception {
}

Awaitility.await().untilAsserted(() -> {
assertThat("acked record", mockRecord.acked, greaterThanOrEqualTo(4));
assertEquals(mockRecord.failed, 0);
assertThat("acked record", mockRecord.getAcked(), greaterThanOrEqualTo(4));
assertEquals(mockRecord.getFailed(), 0);
assertThat("totalHits", client.getRestClient().totalHits(index), greaterThanOrEqualTo(4L));
});
client.flush();
Awaitility.await().untilAsserted(() -> {
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.acked, 5);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(mockRecord.getAcked(), 5);
assertEquals(client.getRestClient().totalHits(index), 5);
});

Expand All @@ -344,8 +351,8 @@ public void testBulkBlocking() throws Exception {
assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy

Awaitility.await().untilAsserted(() -> {
assertEquals(mockRecord.acked, 15);
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.getAcked(), 15);
assertEquals(mockRecord.getFailed(), 0);
});

} finally {
Expand All @@ -372,13 +379,13 @@ public void testBulkIndexAndDelete() throws Exception {
client.bulkIndex(mockRecord, Pair.of("key" + i, "{\"a\":" + i + "}"));
client.bulkDelete(mockRecord, "key" + i);
}
assertEquals(mockRecord.acked, 10);
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.getAcked(), 10);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.getRestClient().totalHits(index), 0);
// no effect
client.flush();

assertEquals(mockRecord.acked, 10);
assertEquals(mockRecord.getAcked(), 10);
}
}

Expand Down

0 comments on commit f02679d

Please sign in to comment.