Skip to content

Commit

Permalink
Parallelize the search indexing process (#18445)
Browse files Browse the repository at this point in the history
* Parallelize the indexing process

* Parallelize indexing
  • Loading branch information
harshach committed Nov 3, 2024
1 parent 0b8b0c6 commit 45975e3
Showing 1 changed file with 130 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.exception.ExceptionUtils;
Expand Down Expand Up @@ -112,7 +117,9 @@ public class SearchIndexApp extends AbstractNativeApplication {
private Sink searchIndexSink;

@Getter EventPublisherJob jobData;
private final Object jobDataLock = new Object(); // Dedicated final lock object
private volatile boolean stopped = false;
@Getter private volatile ExecutorService executorService;

public SearchIndexApp(CollectionDAO collectionDAO, SearchRepository searchRepository) {
super(collectionDAO, searchRepository);
Expand All @@ -136,19 +143,12 @@ public void startApp(JobExecutionContext jobExecutionContext) {
try {
initializeJob();
LOG.info("Executing Reindexing Job with JobData : {}", jobData);
// Update Job Status
jobData.setStatus(EventPublisherJob.Status.RUNNING);

// Make recreate as false for onDemand
String runType =
(String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType");

// Schedule Run has re-create set to false
if (!runType.equals(ON_DEMAND_JOB)) {
jobData.setRecreateIndex(false);
}

// Run ReIndexing
performReindex(jobExecutionContext);
} catch (Exception ex) {
IndexingError indexingError =
Expand All @@ -162,7 +162,6 @@ public void startApp(JobExecutionContext jobExecutionContext) {
jobData.setStatus(EventPublisherJob.Status.RUNNING);
jobData.setFailure(indexingError);
} finally {
// Send update
sendUpdates(jobExecutionContext);
}
}
Expand All @@ -179,11 +178,8 @@ private void cleanUpStaleJobsFromRuns() {

private void initializeJob() {
List<Source> paginatedEntityTimeSeriesSources = new ArrayList<>();

// Remove any Stale Jobs
cleanUpStaleJobsFromRuns();

// Initialize New Job
int totalRecords = getTotalRequestToProcess(jobData.getEntities(), collectionDAO);
this.jobData.setStats(
new Stats()
Expand Down Expand Up @@ -214,7 +210,7 @@ private void initializeJob() {
paginatedEntityTimeSeriesSources.add(source);
}
});
// Add Time Series Sources at the End of the List to Process them last

paginatedSources.addAll(paginatedEntityTimeSeriesSources);
if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
this.entityProcessor = new OpenSearchEntitiesProcessor(totalRecords);
Expand All @@ -232,16 +228,11 @@ private void initializeJob() {
public void updateRecordToDb(JobExecutionContext jobExecutionContext) {
AppRunRecord appRecord = getJobRecord(jobExecutionContext);

// Update Run Record with Status
appRecord.setStatus(AppRunRecord.Status.fromValue(jobData.getStatus().value()));

// Update Error
if (jobData.getFailure() != null) {
appRecord.setFailureContext(
new FailureContext().withAdditionalProperty("failure", jobData.getFailure()));
}

// Update Stats
if (jobData.getStats() != null) {
appRecord.setSuccessContext(
new SuccessContext().withAdditionalProperty("stats", jobData.getStats()));
Expand All @@ -251,48 +242,109 @@ public void updateRecordToDb(JobExecutionContext jobExecutionContext) {
}

private void performReindex(JobExecutionContext jobExecutionContext) {
Map<String, Object> contextData = new HashMap<>();
if (jobData.getStats() == null) {
jobData.setStats(new Stats());
}

if (executorService == null || executorService.isShutdown() || executorService.isTerminated()) {
int numThreads =
Math.min(paginatedSources.size(), Runtime.getRuntime().availableProcessors());
this.executorService = Executors.newFixedThreadPool(numThreads);
LOG.debug("Initialized new ExecutorService with {} threads.", numThreads);
}
List<Future<?>> futures = new ArrayList<>();

for (Source paginatedSource : paginatedSources) {
List<String> entityName;
reCreateIndexes(paginatedSource.getEntityType());
contextData.put(ENTITY_TYPE_KEY, paginatedSource.getEntityType());
Object resultList;
while (!isJobInterrupted && !stopped && !paginatedSource.isDone()) {
try {
resultList = paginatedSource.readNext(null);
if (!TIME_SERIES_ENTITIES.contains(paginatedSource.getEntityType())) {
entityName =
getEntityNameFromEntity(
(ResultList<? extends EntityInterface>) resultList,
paginatedSource.getEntityType());
contextData.put(ENTITY_NAME_LIST_KEY, entityName);
processEntity(
(ResultList<? extends EntityInterface>) resultList, contextData, paginatedSource);
} else {
entityName =
getEntityNameFromEntityTimeSeries(
(ResultList<? extends EntityTimeSeriesInterface>) resultList,
paginatedSource.getEntityType());
contextData.put(ENTITY_NAME_LIST_KEY, entityName);
processEntityTimeSeries(
(ResultList<? extends EntityTimeSeriesInterface>) resultList,
contextData,
paginatedSource);
}

} catch (SearchIndexException rx) {
jobData.setStatus(EventPublisherJob.Status.RUNNING);
jobData.setFailure(rx.getIndexingError());
paginatedSource.updateStats(
rx.getIndexingError().getSuccessCount(), rx.getIndexingError().getFailedCount());
} finally {
if (isJobInterrupted) {
LOG.info("Search Indexing will now return since the Job has been interrupted.");
jobData.setStatus(EventPublisherJob.Status.STOPPED);
}
updateStats(paginatedSource.getEntityType(), paginatedSource.getStats());
sendUpdates(jobExecutionContext);
}
Future<?> future =
executorService.submit(
() -> {
String entityType = paginatedSource.getEntityType();
Map<String, Object> contextData = new HashMap<>();
contextData.put(ENTITY_TYPE_KEY, entityType);
try {
reCreateIndexes(entityType);
contextData.put(ENTITY_TYPE_KEY, entityType);

while (!paginatedSource.isDone()) {
Object resultList = paginatedSource.readNext(null);
if (resultList == null) {
break;
}

if (!TIME_SERIES_ENTITIES.contains(entityType)) {
List<String> entityNames =
getEntityNameFromEntity(
(ResultList<? extends EntityInterface>) resultList, entityType);
contextData.put(ENTITY_NAME_LIST_KEY, entityNames);
processEntity(
(ResultList<? extends EntityInterface>) resultList,
contextData,
paginatedSource);
} else {
List<String> entityNames =
getEntityNameFromEntityTimeSeries(
(ResultList<? extends EntityTimeSeriesInterface>) resultList,
entityType);
contextData.put(ENTITY_NAME_LIST_KEY, entityNames);
processEntityTimeSeries(
(ResultList<? extends EntityTimeSeriesInterface>) resultList,
contextData,
paginatedSource);
}
synchronized (jobDataLock) {
updateStats(entityType, paginatedSource.getStats());
}
sendUpdates(jobExecutionContext);
}

} catch (SearchIndexException e) {
synchronized (jobDataLock) {
jobData.setStatus(EventPublisherJob.Status.RUNNING);
jobData.setFailure(e.getIndexingError());
paginatedSource.updateStats(
e.getIndexingError().getSuccessCount(),
e.getIndexingError().getFailedCount());
updateStats(entityType, paginatedSource.getStats());
}
sendUpdates(jobExecutionContext);
} catch (Exception e) {
synchronized (jobDataLock) {
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(
new IndexingError()
.withErrorSource(IndexingError.ErrorSource.JOB)
.withMessage(e.getMessage()));
}
sendUpdates(jobExecutionContext);
LOG.error("Unexpected error during reindexing for entity {}", entityType, e);
}
});

futures.add(future);
}

executorService.shutdown();

try {
boolean allTasksCompleted = executorService.awaitTermination(1, TimeUnit.HOURS);
if (!allTasksCompleted) {
LOG.warn("Reindexing tasks did not complete within the expected time.");
executorService.shutdownNow();
}
} catch (InterruptedException e) {
LOG.error("Reindexing was interrupted", e);
executorService.shutdownNow();
Thread.currentThread().interrupt();
}

for (Future<?> future : futures) {
try {
future.get();
} catch (InterruptedException e) {
LOG.error("Task was interrupted", e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
LOG.error("Exception in reindexing task", e.getCause());
}
}
}
Expand Down Expand Up @@ -360,9 +412,7 @@ private void processEntityTimeSeries(

private void sendUpdates(JobExecutionContext jobExecutionContext) {
try {
// store job details in Database
jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats());
// Update Record to db
updateRecordToDb(jobExecutionContext);
if (WebSocketManager.getInstance() != null) {
WebSocketManager.getInstance()
Expand All @@ -375,18 +425,15 @@ private void sendUpdates(JobExecutionContext jobExecutionContext) {
}

public void updateStats(String entityType, StepStats currentEntityStats) {
// Job Level Stats
Stats jobDataStats = jobData.getStats();

// Update Entity Level Stats
StepStats entityLevelStats = jobDataStats.getEntityStats();
if (entityLevelStats == null) {
entityLevelStats =
new StepStats().withTotalRecords(null).withFailedRecords(null).withSuccessRecords(null);
}
entityLevelStats.withAdditionalProperty(entityType, currentEntityStats);

// Total Stats
StepStats stats = jobData.getStats().getJobStats();
if (stats == null) {
stats =
Expand All @@ -405,7 +452,6 @@ public void updateStats(String entityType, StepStats currentEntityStats) {
.mapToInt(StepStats::getFailedRecords)
.sum());

// Update for the Job
jobDataStats.setJobStats(stats);
jobDataStats.setEntityStats(entityLevelStats);

Expand All @@ -418,13 +464,31 @@ private void reCreateIndexes(String entityType) {
}

IndexMapping indexType = searchRepository.getIndexMapping(entityType);
// Delete index
searchRepository.deleteIndex(indexType);
// Create index
searchRepository.createIndex(indexType);
}

public void stopJob() {
LOG.info("Stopping reindexing job.");
stopped = true;
if (executorService != null && !executorService.isShutdown()) {
List<Runnable> awaitingTasks = executorService.shutdownNow();
LOG.info(
"ExecutorService has been shutdown. Awaiting termination. {} tasks were awaiting execution.",
awaitingTasks.size());

try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
LOG.warn("ExecutorService did not terminate within the specified timeout.");
}
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for ExecutorService to terminate.", e);
List<Runnable> stillAwaitingTasks = executorService.shutdownNow(); // Force shutdown
LOG.info(
"Forced shutdown initiated due to interruption. {} tasks were awaiting execution.",
stillAwaitingTasks.size());
Thread.currentThread().interrupt(); // Restore interrupt status
}
}
}
}

0 comments on commit 45975e3

Please sign in to comment.