Skip to content

Commit

Permalink
Reuse syncAfterWrite in other ES tests
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Aug 25, 2021
1 parent 3541a43 commit 4a88273
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.metadata;

import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;

public class ElasticSearchTestUtils {

private ElasticSearchTestUtils() {
}

public static void syncAfterWrite(RestHighLevelClient searchClient) throws Exception {
// flush changes in ES to disk
FlushResponse fResponse = searchClient.indices().flush(new FlushRequest(), RequestOptions.DEFAULT);
if (fResponse.getFailedShards() > 0) {
throw new RuntimeException("Failed to flush " + fResponse.getFailedShards() + " of " + fResponse.getTotalShards() + " shards");
}

// wait for all indices to be refreshed
RefreshResponse rResponse = searchClient.indices().refresh(new RefreshRequest(), RequestOptions.DEFAULT);
if (rResponse.getFailedShards() > 0) {
throw new RuntimeException("Failed to refresh " + rResponse.getFailedShards() + " of " + rResponse.getTotalShards() + " shards");
}
}

}
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
package com.linkedin.metadata.graph;

import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.ElasticSearchTestUtils;
import com.linkedin.metadata.graph.elastic.ESGraphQueryDAO;
import com.linkedin.metadata.graph.elastic.ESGraphWriteDAO;
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl;
import org.apache.http.HttpHost;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
Expand Down Expand Up @@ -92,16 +88,6 @@ public void tearDown() {

@Override
protected void syncAfterWrite() throws Exception {
// flush changes in ES to disk
FlushResponse fResponse = _searchClient.indices().flush(new FlushRequest(), RequestOptions.DEFAULT);
if (fResponse.getFailedShards() > 0) {
throw new RuntimeException("Failed to flush " + fResponse.getFailedShards() + " of " + fResponse.getTotalShards() + " shards");
}

// wait for all indices to be refreshed
RefreshResponse rResponse = _searchClient.indices().refresh(new RefreshRequest(), RequestOptions.DEFAULT);
if (rResponse.getFailedShards() > 0) {
throw new RuntimeException("Failed to refresh " + rResponse.getFailedShards() + " of " + rResponse.getTotalShards() + " shards");
}
ElasticSearchTestUtils.syncAfterWrite(_searchClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import static org.testng.Assert.assertEquals;

import static com.linkedin.metadata.ElasticSearchTestUtils.syncAfterWrite;

public class ElasticSearchServiceTest {

Expand Down Expand Up @@ -84,7 +85,7 @@ public void tearDown() {
}

@Test
public void testElasticSearchService() throws InterruptedException {
public void testElasticSearchService() throws Exception {
_elasticSearchService.configure();

SearchResult searchResult = _elasticSearchService.search(ENTITY_NAME, "test", null, null, 0, 10);
Expand All @@ -100,7 +101,8 @@ public void testElasticSearchService() throws InterruptedException {
document.set("textFieldOverride", JsonNodeFactory.instance.textNode("textFieldOverride"));
document.set("browsePaths", JsonNodeFactory.instance.textNode("/a/b/c"));
_elasticSearchService.upsertDocument(ENTITY_NAME, document.toString(), urn.toString());
TimeUnit.SECONDS.sleep(5);
syncAfterWrite(_searchClient);
TimeUnit.SECONDS.sleep(1); // for some reason we still need to wait here though we 'syncAfterWrite'
searchResult = _elasticSearchService.search(ENTITY_NAME, "test", null, null, 0, 10);
assertEquals(searchResult.getNumEntities().intValue(), 1);
assertEquals(searchResult.getEntities().get(0), urn);
Expand All @@ -110,7 +112,7 @@ public void testElasticSearchService() throws InterruptedException {
assertEquals(_elasticSearchService.docCount(ENTITY_NAME), 1);

_elasticSearchService.deleteDocument(ENTITY_NAME, urn.toString());
TimeUnit.SECONDS.sleep(5);
syncAfterWrite(_searchClient);
searchResult = _elasticSearchService.search(ENTITY_NAME, "test", null, null, 0, 10);
assertEquals(searchResult.getNumEntities().intValue(), 0);
browseResult = _elasticSearchService.browse(ENTITY_NAME, "", null, 0, 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl;
import com.linkedin.mxe.SystemMetadata;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.http.HttpHost;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
Expand All @@ -20,6 +18,7 @@
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

import static com.linkedin.metadata.ElasticSearchTestUtils.syncAfterWrite;
import static org.testng.Assert.*;

public class ElasticSearchSystemMetadataServiceTest {
Expand All @@ -32,11 +31,6 @@ public class ElasticSearchSystemMetadataServiceTest {
private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:7.9.3";
private static final int HTTP_PORT = 9200;

@BeforeMethod
public void wipe() throws URISyntaxException {
_client.clear();
}

@BeforeTest
public void setup() {
_indexConvention = new IndexConventionImpl(null);
Expand All @@ -47,6 +41,12 @@ public void setup() {
_client.configure();
}

@BeforeMethod
public void wipe() throws Exception {
_client.clear();
syncAfterWrite(_searchClient);
}

@Nonnull
private RestHighLevelClient buildRestClient() {
final RestClientBuilder builder =
Expand Down Expand Up @@ -88,7 +88,7 @@ public void testListRuns() throws Exception {
_client.insert(metadata2, "urn:li:chart:2", "chartKey");
_client.insert(metadata2, "urn:li:chart:2", "Ownership");

TimeUnit.SECONDS.sleep(5);
syncAfterWrite(_searchClient);

List<IngestionRunSummary> runs = _client.listRuns(0, 20);

Expand Down Expand Up @@ -117,7 +117,7 @@ public void testOverwriteRuns() throws Exception {
_client.insert(metadata2, "urn:li:chart:2", "chartKey");
_client.insert(metadata2, "urn:li:chart:2", "Ownership");

TimeUnit.SECONDS.sleep(5);
syncAfterWrite(_searchClient);

List<IngestionRunSummary> runs = _client.listRuns(0, 20);

Expand Down Expand Up @@ -146,7 +146,7 @@ public void testFindByRunId() throws Exception {
_client.insert(metadata2, "urn:li:chart:2", "chartKey");
_client.insert(metadata2, "urn:li:chart:2", "Ownership");

TimeUnit.SECONDS.sleep(5);
syncAfterWrite(_searchClient);

List<AspectRowSummary> rows = _client.findByRunId("abc-456");

Expand Down Expand Up @@ -174,11 +174,11 @@ public void testDelete() throws Exception {
_client.insert(metadata2, "urn:li:chart:2", "chartKey");
_client.insert(metadata2, "urn:li:chart:2", "Ownership");

TimeUnit.SECONDS.sleep(5);
syncAfterWrite(_searchClient);

_client.deleteUrn("urn:li:chart:1");

TimeUnit.SECONDS.sleep(5);
syncAfterWrite(_searchClient);

List<AspectRowSummary> rows = _client.findByRunId("abc-456");

Expand All @@ -190,7 +190,7 @@ public void testDelete() throws Exception {
public void testInsertNullData() throws Exception {
_client.insert(null, "urn:li:chart:1", "chartKey");

TimeUnit.SECONDS.sleep(5);
syncAfterWrite(_searchClient);

List<IngestionRunSummary> runs = _client.listRuns(0, 20);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -35,6 +34,7 @@

import static org.testng.Assert.*;

import static com.linkedin.metadata.ElasticSearchTestUtils.syncAfterWrite;

public class ElasticSearchTimeseriesAspectServiceTest {

Expand Down Expand Up @@ -122,7 +122,7 @@ private void validateAspectValues(List<EnvelopedAspect> aspects, long numResults
}

@Test(groups = "upsert")
public void testUpsertProfiles() throws InterruptedException {
public void testUpsertProfiles() throws Exception {
// Create the testEntity profiles that we would like to use for testing.
_startTime = Calendar.getInstance().getTimeInMillis();

Expand All @@ -146,7 +146,7 @@ public void testUpsertProfiles() throws InterruptedException {
}
});

TimeUnit.SECONDS.sleep(5);
syncAfterWrite(_searchClient);
}

@Test(groups = "query", dependsOnGroups = "upsert")
Expand Down

0 comments on commit 4a88273

Please sign in to comment.