Skip to content

Commit

Permalink
[fix][flaky-test] ElasticSearchClientTests.testBulkBlocking (#16920)
Browse files Browse the repository at this point in the history
(cherry picked from commit f02679d)
  • Loading branch information
coderzc authored and congbobo184 committed Dec 7, 2022
1 parent 3ba07aa commit 79de91b
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -262,7 +263,7 @@ protected void cleanup() throws Exception {
@Test
public void testAdmin() throws PulsarAdminException {
admin.tenants().createTenant("test-tenant-1",
TenantInfo.builder().allowedClusters(Set.of(configClusterName)).build());
TenantInfo.builder().allowedClusters(Collections.singleton(configClusterName)).build());
admin.namespaces().createNamespace("test-tenant-1/test-namespace-1");
String partitionedTopic = UUID.randomUUID().toString();
admin.topics().createPartitionedTopic(partitionedTopic,3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@
*/
package org.apache.pulsar.io.elasticsearch;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.LongAdder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.schema.GenericObject;
Expand All @@ -32,20 +45,6 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.Optional;
import java.util.UUID;

import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;

@Slf4j
public class ElasticSearchClientTests extends ElasticSearchTestBase {

Expand All @@ -63,8 +62,16 @@ public static void closeAfterClass() {
}

static class MockRecord<T> implements Record<T> {
int acked = 0;
int failed = 0;
LongAdder acked = new LongAdder();
LongAdder failed = new LongAdder();

public int getAcked() {
return acked.intValue();
}

public int getFailed() {
return failed.intValue();
}

@Override
public T getValue() {
Expand All @@ -73,12 +80,12 @@ public T getValue() {

@Override
public void ack() {
acked++;
acked.increment();
}

@Override
public void fail() {
failed++;
failed.increment();
}
}

Expand Down Expand Up @@ -130,13 +137,13 @@ public void testIndexDelete() throws Exception {
try {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.indexDocument(mockRecord, Pair.of("1", "{ \"a\":1}"));
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.getAcked(), 1);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.totalHits(index), 1);

client.deleteDocument(mockRecord, "1");
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.getAcked(), 2);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.totalHits(index), 0);
} finally {
client.delete(index);
Expand Down Expand Up @@ -193,11 +200,11 @@ public void testMalformedDocFails() throws Exception {
client.flush();
assertNotNull(client.irrecoverableError.get());
assertTrue(client.irrecoverableError.get().getMessage().contains("mapper_parsing_exception"));
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 1);
assertEquals(mockRecord.getAcked(), 1);
assertEquals(mockRecord.getFailed(), 1);
assertThrows(Exception.class, () -> client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}")));
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 2);
assertEquals(mockRecord.getAcked(), 1);
assertEquals(mockRecord.getFailed(), 2);
}
}

Expand All @@ -215,8 +222,8 @@ public void testMalformedDocIgnore() throws Exception {
client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":\"toto\"}"));
client.flush();
assertNull(client.irrecoverableError.get());
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 1);
assertEquals(mockRecord.getAcked(), 1);
assertEquals(mockRecord.getFailed(), 1);
}
}

Expand All @@ -239,29 +246,28 @@ public void testBulkRetry() throws Exception {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}"));
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.getAcked(), 2);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.totalHits(index), 2);

ChaosContainer<?> chaosContainer = ChaosContainer.pauseContainerForSeconds(container.getContainerName(), 15);
chaosContainer.start();

client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.getAcked(), 2);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.totalHits(index), 2);

chaosContainer.stop();
client.flush();
assertEquals(mockRecord.acked, 3);
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.getAcked(), 3);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.totalHits(index), 3);
} finally {
client.delete(index);
}
}
}

@Test
public void testBulkBlocking() throws Exception {
final String index = "indexblocking-" + UUID.randomUUID();
Expand All @@ -284,14 +290,14 @@ public void testBulkBlocking() throws Exception {
}

Awaitility.await().untilAsserted(() -> {
assertThat("acked record", mockRecord.acked, greaterThanOrEqualTo(4));
assertEquals(mockRecord.failed, 0);
assertThat("acked record", mockRecord.getAcked(), greaterThanOrEqualTo(4));
assertEquals(mockRecord.getFailed(), 0);
assertThat("totalHits", client.totalHits(index), greaterThanOrEqualTo(4L));
});
client.flush();
Awaitility.await().untilAsserted(() -> {
assertEquals(mockRecord.failed, 0);
assertEquals(mockRecord.acked, 5);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(mockRecord.getAcked(), 5);
assertEquals(client.totalHits(index), 5);
});

Expand All @@ -310,9 +316,11 @@ public void testBulkBlocking() throws Exception {
assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy

Thread.sleep(1000L);
assertEquals(mockRecord.acked, 15);
assertEquals(mockRecord.failed, 0);
assertEquals(client.records.size(), 0);
Awaitility.await().untilAsserted(() -> {
assertEquals(mockRecord.getAcked(), 15);
assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.records.size(), 0);
});

chaosContainer.stop();
} finally {
Expand Down

0 comments on commit 79de91b

Please sign in to comment.