Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve parallelization in SearchIndexApp #18556

Merged
merged 24 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
49ff73d
Improve parallelization in SearchIndexApp
harshach Nov 8, 2024
73cba82
Improve parallelization in SearchIndexApp
harshach Nov 8, 2024
f724ada
add countdown latch
harshach Nov 9, 2024
43bd882
typo
mohityadav766 Nov 8, 2024
856cc8d
Status issues - #1
mohityadav766 Nov 8, 2024
97de919
Improve search indexing
harshach Nov 10, 2024
f5574ab
cleanup the code
harshach Nov 10, 2024
afbc8c2
schemas
pmbrull Nov 10, 2024
9e7d941
Status Updates Via WebSockets
mohityadav766 Nov 10, 2024
985558f
Add Updates
mohityadav766 Nov 10, 2024
24914bf
Make Logs Debug
mohityadav766 Nov 10, 2024
afe2e6b
show duration in table
karanh37 Nov 10, 2024
a0fa2c9
add searchIndexApp params to openmetadata-ops.sh reindex commandline
harshach Nov 10, 2024
69327c0
Fix code style
harshach Nov 10, 2024
612ec61
Add Parallel Entity Level Reader
mohityadav766 Nov 11, 2024
12f47e5
Make code more readable
mohityadav766 Nov 11, 2024
3932bae
Send App Run Record instead of jobData
mohityadav766 Nov 11, 2024
179201f
Sned App Run Record WebSocket for Data Insights
mohityadav766 Nov 11, 2024
9fe9acf
ui: use socket connection to get the real-time updates of application…
Sachin-chaurasiya Nov 11, 2024
257f389
Send WebSocket Final Update
mohityadav766 Nov 11, 2024
b3c72f0
ui: remove failedRecords, totalRecords and successRecordsfrom entitie…
Sachin-chaurasiya Nov 11, 2024
39b5d4c
Invert Condition
mohityadav766 Nov 11, 2024
451b5eb
Merge remote-tracking branch 'origin/search_index_app' into search_in…
mohityadav766 Nov 11, 2024
3908513
Merge branch 'main' into search_index_app
mohityadav766 Nov 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.APP_RUN_STATS;
import static org.openmetadata.service.apps.scheduler.AppScheduler.ON_DEMAND_JOB;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getInitialStatsForEntities;

import es.org.elasticsearch.client.RestClient;
import java.io.IOException;
Expand Down Expand Up @@ -377,7 +377,10 @@ public void updateStats(String entityType, StepStats currentEntityStats) {
if (stats == null) {
stats =
new StepStats()
.withTotalRecords(getTotalRequestToProcess(jobData.getEntities(), collectionDAO));
.withTotalRecords(
getInitialStatsForEntities(jobData.getEntities())
.getJobStats()
.getTotalRecords());
}

stats.setTotalRecords(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import static org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils.END_TIMESTAMP_KEY;
import static org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils.START_TIMESTAMP_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getInitialStatsForEntities;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -19,6 +19,7 @@
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.Stats;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.apps.bundles.insights.DataInsightsApp;
Expand Down Expand Up @@ -97,7 +98,8 @@ public DataAssetsWorkflow(
}

private void initialize() {
int totalRecords = getTotalRequestToProcess(entityTypes, collectionDAO);
Stats stats = getInitialStatsForEntities(entityTypes);
int totalRecords = stats.getJobStats().getTotalRecords();

entityTypes.forEach(
entityType -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import static org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils.END_TIMESTAMP_KEY;
import static org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils.START_TIMESTAMP_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getTotalRequestToProcess;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getInitialStatsForEntities;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -101,7 +101,8 @@ public String getIndexNameByType(String entityType) {
}

private void initialize() {
int totalRecords = getTotalRequestToProcess(Set.of(entityType), collectionDAO);
int totalRecords =
getInitialStatsForEntities(Set.of(entityType)).getJobStats().getTotalRecords();

List<String> fields = List.of("*");
PaginatedEntityTimeSeriesSource source =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.openmetadata.service.apps.bundles.searchIndex;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.openmetadata.schema.system.StepStats;

public interface BulkSink {
void write(List<?> entities, Map<String, Object> contextData) throws Exception;

void updateStats(int currentSuccess, int currentFailed);

StepStats getStats();

void close() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
package org.openmetadata.service.apps.bundles.searchIndex;

import static org.openmetadata.schema.system.IndexingError.ErrorSource.SINK;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getErrorsFromBulkResponse;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;

import es.org.elasticsearch.ElasticsearchException;
import es.org.elasticsearch.action.DocWriteRequest;
import es.org.elasticsearch.action.bulk.BulkRequest;
import es.org.elasticsearch.action.bulk.BulkResponse;
import es.org.elasticsearch.action.update.UpdateRequest;
import es.org.elasticsearch.client.RequestOptions;
import es.org.elasticsearch.rest.RestStatus;
import es.org.elasticsearch.xcontent.XContentType;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import lombok.extern.slf4j.Slf4j;
import org.glassfish.jersey.internal.util.ExceptionUtils;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.EntityTimeSeriesInterface;
import org.openmetadata.schema.system.EntityError;
import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.search.SearchClient;
import org.openmetadata.service.search.models.IndexMapping;
import org.openmetadata.service.util.JsonUtils;

@Slf4j
public class ElasticSearchIndexSink implements BulkSink, Closeable {
private final StepStats stats = new StepStats();
private final SearchClient client;
private final long maxPayloadSizeInBytes;
private final int maxRetries;
private final long initialBackoffMillis;
private final long maxBackoffMillis;
private final Semaphore semaphore;

public ElasticSearchIndexSink(
SearchClient client,
long maxPayloadSizeInBytes,
int maxConcurrentRequests,
int maxRetries,
long initialBackoffMillis,
long maxBackoffMillis) {
this.client = client;
this.maxPayloadSizeInBytes = maxPayloadSizeInBytes;
this.maxRetries = maxRetries;
this.initialBackoffMillis = initialBackoffMillis;
this.maxBackoffMillis = maxBackoffMillis;
this.semaphore = new Semaphore(maxConcurrentRequests);
}

@Override
public void write(List<?> entities, Map<String, Object> contextData) throws SearchIndexException {
String entityType = (String) contextData.get("entityType");
LOG.debug(
"[ElasticSearchIndexSink] Processing {} entities of type {}", entities.size(), entityType);

List<EntityError> entityErrorList = new ArrayList<>();
List<DocWriteRequest<?>> requests = new ArrayList<>();
long currentBatchSize = 0L;

for (Object entity : entities) {
try {
DocWriteRequest<?> request = convertEntityToRequest(entity, entityType);
long requestSize = estimateRequestSizeInBytes(request);

if (currentBatchSize + requestSize > maxPayloadSizeInBytes) {
// Flush current batch
sendBulkRequest(requests, entityErrorList);
requests.clear();
currentBatchSize = 0L;
}

requests.add(request);
currentBatchSize += requestSize;

} catch (Exception e) {
entityErrorList.add(
new EntityError()
.withMessage("Failed to convert entity to request: " + e.getMessage())
.withEntity(entity.toString()));
LOG.error("Error converting entity to request", e);
}
}

if (!requests.isEmpty()) {
sendBulkRequest(requests, entityErrorList);
}

int totalEntities = entities.size();
int failedEntities = entityErrorList.size();
int successfulEntities = totalEntities - failedEntities;
updateStats(successfulEntities, failedEntities);

if (!entityErrorList.isEmpty()) {
throw new SearchIndexException(
new IndexingError()
.withErrorSource(SINK)
.withSubmittedCount(totalEntities)
.withSuccessCount(successfulEntities)
.withFailedCount(failedEntities)
.withMessage(String.format("Issues in Sink to Elasticsearch: %s", entityErrorList))
.withFailedEntities(entityErrorList));
}
}

private void sendBulkRequest(List<DocWriteRequest<?>> requests, List<EntityError> entityErrorList)
throws SearchIndexException {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(requests);

int attempt = 0;
long backoffMillis = initialBackoffMillis;

while (attempt <= maxRetries) {
try {
semaphore.acquire();
try {
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
entityErrorList.addAll(getErrorsFromBulkResponse(response));
break; // Success, exit retry loop
} finally {
semaphore.release();
}
} catch (IOException e) {
if (isRetriableException(e)) {
attempt++;
LOG.warn(
"Bulk request failed with retriable exception, retrying attempt {}/{}",
attempt,
maxRetries);
sleepWithBackoff(backoffMillis);
backoffMillis = Math.min(backoffMillis * 2, maxBackoffMillis);
} else {
LOG.error("Bulk request failed with non-retriable exception", e);
throw new SearchIndexException(createIndexingError(requests.size(), e));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Bulk request interrupted", e);
throw new SearchIndexException(createIndexingError(requests.size(), e));
} catch (ElasticsearchException e) {
if (isRetriableStatusCode(e.status())) {
attempt++;
LOG.warn(
"Bulk request failed with status {}, retrying attempt {}/{}",
e.status(),
attempt,
maxRetries);
sleepWithBackoff(backoffMillis);
backoffMillis = Math.min(backoffMillis * 2, maxBackoffMillis);
} else {
LOG.error("Bulk request failed with non-retriable status {}", e.status(), e);
throw new SearchIndexException(createIndexingError(requests.size(), e));
}
}
}

if (attempt > maxRetries) {
throw new SearchIndexException(
new IndexingError()
.withErrorSource(SINK)
.withSubmittedCount(requests.size())
.withSuccessCount(0)
.withFailedCount(requests.size())
.withMessage("Exceeded maximum retries for bulk request"));
}
}

private boolean isRetriableException(Exception e) {
return e instanceof IOException;
}

private boolean isRetriableStatusCode(RestStatus status) {
return status == RestStatus.TOO_MANY_REQUESTS || status == RestStatus.SERVICE_UNAVAILABLE;
}

private void sleepWithBackoff(long millis) {
try {
Thread.sleep(millis + ThreadLocalRandom.current().nextLong(0, millis));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Sleep interrupted during backoff", e);
}
}

private IndexingError createIndexingError(int requestCount, Exception e) {
return new IndexingError()
.withErrorSource(SINK)
.withSubmittedCount(requestCount)
.withSuccessCount(0)
.withFailedCount(requestCount)
.withMessage(String.format("Issue in Sink to Elasticsearch: %s", e.getMessage()))
.withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e));
}

private DocWriteRequest<?> convertEntityToRequest(Object entity, String entityType) {
if (entity instanceof EntityInterface) {
return getEntityInterfaceRequest(entityType, (EntityInterface) entity);
} else if (entity instanceof EntityTimeSeriesInterface) {
return getEntityTimeSeriesInterfaceReqeust(entityType, (EntityTimeSeriesInterface) entity);
} else {
throw new IllegalArgumentException("Unknown entity type: " + entity.getClass());
}
}

private UpdateRequest getEntityInterfaceRequest(String entityType, EntityInterface entity) {
IndexMapping indexMapping = Entity.getSearchRepository().getIndexMapping(entityType);
UpdateRequest updateRequest =
new UpdateRequest(
indexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias()),
entity.getId().toString());
updateRequest.doc(
JsonUtils.pojoToJson(
Objects.requireNonNull(Entity.buildSearchIndex(entityType, entity))
.buildSearchIndexDoc()),
XContentType.JSON);
updateRequest.docAsUpsert(true);
return updateRequest;
}

private UpdateRequest getEntityTimeSeriesInterfaceReqeust(
String entityType, EntityTimeSeriesInterface entity) {
IndexMapping indexMapping = Entity.getSearchRepository().getIndexMapping(entityType);
UpdateRequest updateRequest =
new UpdateRequest(
indexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias()),
entity.getId().toString());
updateRequest.doc(
JsonUtils.pojoToJson(
Objects.requireNonNull(
Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc())),
XContentType.JSON);
updateRequest.docAsUpsert(true);
return updateRequest;
}

private long estimateRequestSizeInBytes(DocWriteRequest<?> request) {
return new BulkRequest().add(request).estimatedSizeInBytes();
}

@Override
public void updateStats(int currentSuccess, int currentFailed) {
getUpdatedStats(stats, currentSuccess, currentFailed);
}

@Override
public StepStats getStats() {
return stats;
}

@Override
public void close() throws IOException {
client.close();
}
}
Loading
Loading