Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Indexing using multiple threads #5367

Merged
merged 12 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -618,8 +618,11 @@ public enum ParameterCore implements ParameterInterface {
/*
* Elasticsearch properties
*/
ELASTICSEARCH_BATCH(new Parameter<>("elasticsearch.batch", 500)),
ELASTICSEARCH_INDEXLIMIT(new Parameter<>("elasticsearch.indexLimit", 5000)),
ELASTICSEARCH_BATCH(new Parameter<>("elasticsearch.batch", 500)),
ELASTICSEARCH_ATTEMPTS(new Parameter<>("elasticsearch.attempts", 10)),
ELASTICSEARCH_TIME_BETWEEN_ATTEMPTS(new Parameter<>("elasticsearch.timeBetweenAttempts", 2000)),
ELASTICSEARCH_THREADS(new Parameter<>("elasticsearch.threads", 4)),

/*
* Security properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ public void callIndexing(ObjectType type) {
indexingStartedTime = LocalDateTime.now();
indexingStartedUser = ServiceManager.getUserService().getAuthenticatedUser().getFullName();
try {
ServiceManager.getIndexingService().startIndexing(type, pollingChannel);
} catch (DataException | CustomResponseException e) {
ServiceManager.getIndexingService().startIndexing(pollingChannel, type);
} catch (IllegalStateException e) {
Helper.setErrorMessage(e.getLocalizedMessage(), logger, e);
}
}
Expand All @@ -156,7 +156,11 @@ public void callIndexing(ObjectType type) {
public void callIndexingRemaining(ObjectType type) {
indexingStartedTime = LocalDateTime.now();
indexingStartedUser = ServiceManager.getUserService().getAuthenticatedUser().getFullName();
ServiceManager.getIndexingService().startIndexingRemaining(type, pollingChannel);
try {
ServiceManager.getIndexingService().startIndexingRemaining(pollingChannel, type);
} catch (IllegalStateException e) {
Helper.setErrorMessage(e.getLocalizedMessage(), logger, e);
}
}

/**
Expand Down
131 changes: 46 additions & 85 deletions Kitodo/src/main/java/org/kitodo/production/helper/IndexWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.hibernate.HibernateException;
import org.kitodo.config.ConfigCore;
import org.kitodo.config.enums.ParameterCore;
import org.kitodo.data.database.exceptions.DAOException;
import org.kitodo.data.elasticsearch.exceptions.CustomResponseException;
import org.kitodo.production.enums.ObjectType;
import org.kitodo.production.services.data.base.SearchService;

public class IndexWorker implements Runnable {

private int indexedObjects = 0;
private int startIndexing;
private boolean indexAllObjects = true;
private SearchService searchService;
private static final Logger logger = LogManager.getLogger(IndexWorker.class);

private final boolean indexAllObjects;
private final ObjectType objectType;
private final SearchService searchService;
private final IndexWorkerStatus indexWorkerStatus;

/**
* Constructor initializing an IndexWorker object with the given SearchService
Expand All @@ -38,103 +39,63 @@ public class IndexWorker implements Runnable {
* @param searchService
* SearchService instance used for indexing
*/
public IndexWorker(SearchService searchService) {
public IndexWorker(SearchService searchService, ObjectType objectType, IndexWorkerStatus indexWorkerStatus, boolean indexAllObjects) {
this.searchService = searchService;
this.startIndexing = 0;
}

/**
* Constructor initializing an IndexWorker object with the given SearchService
* and list of objects that will be indexed.
*
* @param searchService
* SearchService instance used for indexing
*/
public IndexWorker(SearchService searchService, int startIndexing) {
this.searchService = searchService;
this.startIndexing = startIndexing;
this.indexWorkerStatus = indexWorkerStatus;
this.indexAllObjects = indexAllObjects;
this.objectType = objectType;
}

@Override
@SuppressWarnings("unchecked")
public void run() {
this.indexedObjects = 0;
int maxAttempts = ConfigCore.getIntParameterOrDefaultValue(ParameterCore.ELASTICSEARCH_ATTEMPTS);
int batchSize = ConfigCore.getIntParameterOrDefaultValue(ParameterCore.ELASTICSEARCH_BATCH);
int indexLimit = ConfigCore.getIntParameterOrDefaultValue(ParameterCore.ELASTICSEARCH_INDEXLIMIT);
try {
int amountToIndex = getAmountToIndex();

if (amountToIndex < batchSize) {
if (indexAllObjects) {
indexObjects(searchService.getAll(this.startIndexing, amountToIndex));
} else {
indexObjects(searchService.getAllNotIndexed(this.startIndexing, amountToIndex));
}
} else {
if (amountToIndex > indexLimit) {
amountToIndex = indexLimit;
}
while (this.indexedObjects < amountToIndex) {
indexChunks(batchSize);
int timeBetweenAttempts = ConfigCore.getIntParameterOrDefaultValue(ParameterCore.ELASTICSEARCH_TIME_BETWEEN_ATTEMPTS);
int maxBatch = indexWorkerStatus.getMaxBatch();
boolean failed = false;

int nextBatch = indexWorkerStatus.getAndIncrementNextBatch();
while (!failed && nextBatch < maxBatch) {
// nextBatch is a valid batch that needs to be processed

int attempt = 1;
while (attempt < maxAttempts) {
try {
int offset = nextBatch * batchSize;
logger.info("index " + objectType.toString() + " with offset " + offset + " and attempt "
+ attempt + "/" + maxAttempts);

if (indexAllObjects) {
indexObjects(searchService.getAll(offset, batchSize));
} else {
indexObjects(searchService.getAllNotIndexed(offset, batchSize));
}

break;
} catch (Exception e) {
logger.error(e.getMessage(), e);
attempt += 1;
try {
Thread.sleep(timeBetweenAttempts);
} catch (InterruptedException e2) {
logger.trace("Index worker sleep is interrupted while waiting for next indexing attempt");
}
}
}
} catch (CustomResponseException | DAOException | HibernateException | IOException e) {
logger.error(e.getMessage(), e);
}
}

private int getAmountToIndex() throws DAOException {
if (indexAllObjects) {
return searchService.countDatabaseRows().intValue() - this.startIndexing;
} else {
return searchService.countNotIndexedDatabaseRows().intValue() - this.startIndexing;
}
}

@SuppressWarnings("unchecked")
private void indexChunks(int batchSize) throws CustomResponseException, DAOException, IOException {
List<Object> objectsToIndex;
int indexLimit = ConfigCore.getIntParameterOrDefaultValue(ParameterCore.ELASTICSEARCH_INDEXLIMIT);
while (this.indexedObjects < indexLimit) {
int offset = this.indexedObjects + this.startIndexing;

if (indexAllObjects) {
objectsToIndex = searchService.getAll(offset, batchSize);
if (attempt >= maxAttempts) {
logger.error("stop indexing after maximum amount of attempts");
failed = true;
} else {
objectsToIndex = searchService.getAllNotIndexed(offset, batchSize);
}
if (objectsToIndex.isEmpty()) {
break;
// find next batch that can be indexed
nextBatch = indexWorkerStatus.getAndIncrementNextBatch();
}

indexObjects(objectsToIndex);
}
}

@SuppressWarnings("unchecked")
private void indexObjects(List<Object> objectsToIndex) throws CustomResponseException, DAOException, IOException {
this.searchService.addAllObjectsToIndex(objectsToIndex);
this.indexedObjects = this.indexedObjects + objectsToIndex.size();
}

/**
* Return the number of objects that have already been indexed during the
* current indexing process.
*
* @return int the number of objects indexed during the current indexing run
*/
public int getIndexedObjects() {
return indexedObjects + startIndexing;
}

/**
* Set value for indexAllObjects. If true, it indexes all objects, if false it
* indexes only objects with flag IndexAction.INDEX.
*
* @param indexAllObjects
* as boolean
*/
public void setIndexAllObjects(boolean indexAllObjects) {
this.indexAllObjects = indexAllObjects;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* (c) Kitodo. Key to digital objects e. V. <contact@kitodo.org>
*
* This file is part of the Kitodo project.
*
* It is licensed under GNU General Public License version 3 or later.
*
* For the full copyright and license information, please read the
* GPL3-License.txt file that was distributed with this source code.
*/

package org.kitodo.production.helper;

import java.util.concurrent.atomic.AtomicInteger;

public class IndexWorkerStatus {

/**
* Stores the maximum batch number that needs to be indexed.
*
* <p>If a worker thread receives a batch number higher than this number, there are
* no additional batches that need to be processed, and the worker thread stops.</p>
*/
private final Integer maxBatch;

/**
* Stores the number of the next batch that needs to be indexed.
*/
private final AtomicInteger nextBatch = new AtomicInteger(0);


/**
* Initialize index worker status.
*
* @param maxBatch the maximum number of batches to be processed
*/
public IndexWorkerStatus(Integer maxBatch) {
this.maxBatch = maxBatch;
}

public int getMaxBatch() {
thomaslow marked this conversation as resolved.
Show resolved Hide resolved
return this.maxBatch;
}

public int getAndIncrementNextBatch() {
thomaslow marked this conversation as resolved.
Show resolved Hide resolved
return this.nextBatch.getAndIncrement();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public void saveToIndex(T baseIndexedBean, boolean forceRefresh)
public void addAllObjectsToIndex(List<T> baseIndexedBeans) throws CustomResponseException, DAOException, IOException {
indexer.setMethod(HttpMethod.PUT);
if (!baseIndexedBeans.isEmpty()) {
indexer.performMultipleRequests(baseIndexedBeans, type, true);
indexer.performMultipleRequests(baseIndexedBeans, type, false);
saveAsIndexed(baseIndexedBeans);
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* (c) Kitodo. Key to digital objects e. V. <contact@kitodo.org>
*
* This file is part of the Kitodo project.
*
* It is licensed under GNU General Public License version 3 or later.
*
* For the full copyright and license information, please read the
* GPL3-License.txt file that was distributed with this source code.
*/

package org.kitodo.production.services.index;

import java.util.Objects;

import javax.faces.push.PushContext;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.kitodo.data.database.exceptions.DAOException;
import org.kitodo.data.elasticsearch.exceptions.CustomResponseException;
import org.kitodo.data.exceptions.DataException;
import org.kitodo.production.enums.ObjectType;
import org.kitodo.production.helper.Helper;

public class IndexManagmentThread extends Thread {

private static final Logger logger = LogManager.getLogger(IndexManagmentThread.class);

private final PushContext context;
private final IndexingService indexingService;
private final boolean indexAllObjects;
private final ObjectType objectType;

/**
* Initialize indexing managment thread.
* @param pushContext the UI channel used to trigger refresh
* @param service the service class for indexing
* @param objectType optional objectType (if null, all types are indexed, otherwise only that one)
* @param indexAllObjects whether all objects are indexed or only remaining ones
*/
IndexManagmentThread(PushContext pushContext, IndexingService service, ObjectType objectType, boolean indexAllObjects) {
context = pushContext;
indexingService = service;
this.indexAllObjects = indexAllObjects;
this.objectType = objectType;
}

@Override
public void run() {
try {
indexingService.setIndexingAll(true);

for (ObjectType currentType : ObjectType.getIndexableObjectTypes()) {
if (Objects.isNull(this.objectType) || currentType.equals(objectType)) {
try {
indexingService.runIndexing(currentType, context, indexAllObjects);
} catch (DataException | CustomResponseException | DAOException | RuntimeException e) {
logger.error(e);
Helper.setErrorMessage(e.getLocalizedMessage(), IndexingService.getLogger(), e);
}
}
}
try {
sleep(IndexingService.PAUSE);
} catch (InterruptedException e) {
logger.trace("Index management sleep interrupted while waiting to finish indexing");
}
} finally {
indexingService.resetCurrentIndexState();
indexingService.setIndexingAll(false);
context.send(IndexingService.INDEXING_FINISHED_MESSAGE);
}
}
}
Loading