Skip to content

Commit

Permalink
WIP runSegmented
Browse files Browse the repository at this point in the history
  • Loading branch information
Myyyvothrr committed Apr 5, 2024
1 parent 4ef4241 commit de2f0b7
Show file tree
Hide file tree
Showing 10 changed files with 456 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
import org.apache.uima.util.InvalidXMLException;
import org.luaj.vm2.Globals;
import org.luaj.vm2.lib.jse.JsePlatform;
import org.texttechnologylab.DockerUnifiedUIMAInterface.composer.DUUISegmentedWorker;
import org.texttechnologylab.DockerUnifiedUIMAInterface.connection.IDUUIConnectionHandler;
import org.texttechnologylab.DockerUnifiedUIMAInterface.driver.*;
import org.texttechnologylab.DockerUnifiedUIMAInterface.io.AsyncCollectionReader;
import org.texttechnologylab.DockerUnifiedUIMAInterface.io.DUUIAsynchronousProcessor;
import org.texttechnologylab.DockerUnifiedUIMAInterface.io.DUUICollectionReader;
import org.texttechnologylab.DockerUnifiedUIMAInterface.io.DUUICollectionDBReader;
import org.texttechnologylab.DockerUnifiedUIMAInterface.lua.DUUILuaContext;
import org.texttechnologylab.DockerUnifiedUIMAInterface.monitoring.DUUIMonitor;
import org.texttechnologylab.DockerUnifiedUIMAInterface.pipeline_storage.DUUIPipelineDocumentPerformance;
Expand All @@ -45,6 +46,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static java.lang.String.format;

Expand Down Expand Up @@ -695,87 +697,69 @@ public void run(DUUIAsynchronousProcessor collectionReader, String name) throws
}
}

public void runSegmented(DUUICollectionReader collectionReader, String name) throws Exception {
ConcurrentLinkedQueue<JCas> emptyCasDocuments = new ConcurrentLinkedQueue<>();
AtomicInteger aliveThreads = new AtomicInteger(0);

_shutdownAtomic.set(false);

System.out.printf("[Composer] Running in segmented mode, %d threads at most!\n", _workers);

public void runSegmented(DUUICollectionDBReader collectionReader, String name) throws Exception {
try {
if(_storage!=null) {
_storage.addNewRun(name,this);
_shutdownAtomic.set(false);

if(_storage != null) {
_storage.addNewRun(name, this);
}

TypeSystemDescription desc = instantiate_pipeline();
if (_cas_poolsize == null) {
_cas_poolsize = (int)Math.ceil(_workers*1.5);
System.out.printf("[Composer] Calculated CAS poolsize of %d!\n", _cas_poolsize);
} else {
if (_cas_poolsize < _workers) {
System.err.println("[Composer] WARNING: Pool size is smaller than the available threads, this is likely a bottleneck.");
}
}

for(int i = 0; i < _cas_poolsize; i++) {
emptyCasDocuments.add(JCasFactory.createJCas(desc));
List<String> pipelineUUIDs = _instantiatedPipeline.stream().map(PipelinePart::getUUID).collect(Collectors.toList());

int threadsPerTool = _workers / _instantiatedPipeline.size();
System.out.printf("[Composer] Running in segmented mode, %d threads with %d threads per tool!\n", _workers, threadsPerTool);

List<Thread> threads = new ArrayList<>();
int tId = 0;
for (PipelinePart part : _instantiatedPipeline) {
for (int i = 0; i < threadsPerTool; i++) {
System.out.printf("[Composer] Starting worker thread for pipeline part %s [%d/%d]\n", part.getUUID(), tId+1, _workers);
Thread thread = new Thread(new DUUISegmentedWorker(
tId,
_shutdownAtomic,
part,
collectionReader,
desc,
_storage,
name,
pipelineUUIDs
));
thread.start();
threads.add(thread);
tId += 1;
}
}

Thread []arr = new Thread[_workers];
for(int i = 0; i < _workers; i++) {
System.out.printf("[Composer] Starting worker thread [%d/%d]\n",i+1,_workers);
arr[i] = new DUUIWorkerAsyncReader(_instantiatedPipeline,emptyCasDocuments.poll(),_shutdownAtomic,aliveThreads,_storage,name,collectionReader);
arr[i].start();
}
Instant starttime = Instant.now();
final int maxNumberOfFutures = 20;
CompletableFuture<Integer> []futures = new CompletableFuture[maxNumberOfFutures];
boolean breakit = false;
while(!_shutdownAtomic.get()) {
if(collectionReader.getCachedSize() > collectionReader.getMaxMemory()) {
Thread.sleep(50);
continue;
}
for(int i = 0; i < maxNumberOfFutures; i++) {
futures[i] = collectionReader.getAsyncNextByteArray();
}
CompletableFuture.allOf(futures).join();
for(int i = 0; i < maxNumberOfFutures; i++) {
if(futures[i].join() != 0) {
breakit=true;
}
}
if(breakit) break;
while(!collectionReader.finishedLoading() || collectionReader.getDone() < collectionReader.getSize()) {
System.out.println(collectionReader.getProgress());
Thread.sleep(10L);
}

AtomicInteger waitCount = new AtomicInteger();
waitCount.set(0);
// Wartet, bis die Dokumente fertig verarbeitet wurden.
while(emptyCasDocuments.size() != _cas_poolsize && !collectionReader.isEmpty()) {
if (waitCount.incrementAndGet() % 500 == 0) {
System.out.println("[Composer] Waiting for threads to finish document processing...");
}
Thread.sleep(1000*_workers); // to fast or in relation with threads?

}
System.out.println("[Composer] All documents have been processed. Signaling threads to shut down now...");
_shutdownAtomic.set(true);

for(int i = 0; i < arr.length; i++) {
System.out.printf("[Composer] Waiting for thread [%d/%d] to shut down\n",i+1,arr.length);
arr[i].join();
System.out.printf("[Composer] Thread %d returned.\n",i);
for(int i = 0; i < threads.size(); i++) {
System.out.printf("[Composer] Waiting for thread [%d/%d] to shut down\n", i+1, threads.size());
threads.get(i).join();
System.out.printf("[Composer] Thread %d returned.\n", i);
}
if(_storage!=null) {
_storage.finalizeRun(name,starttime,Instant.now());

if(_storage != null) {
_storage.finalizeRun(name, starttime, Instant.now());
}

System.out.println("[Composer] All threads returned.");
shutdown_pipeline();

} catch (Exception e) {
e.printStackTrace();

System.out.println("[Composer] Something went wrong, shutting down remaining components...");
//shutdown_pipeline();
shutdown_pipeline();

throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package org.texttechnologylab.DockerUnifiedUIMAInterface.composer;

import org.apache.uima.UIMAException;
import org.apache.uima.fit.factory.JCasFactory;
import org.apache.uima.jcas.JCas;
import org.apache.uima.resource.metadata.TypeSystemDescription;
import org.texttechnologylab.DockerUnifiedUIMAInterface.DUUIComposer;
import org.texttechnologylab.DockerUnifiedUIMAInterface.io.DUUICollectionDBReader;
import org.texttechnologylab.DockerUnifiedUIMAInterface.pipeline_storage.DUUIPipelineDocumentPerformance;
import org.texttechnologylab.DockerUnifiedUIMAInterface.pipeline_storage.IDUUIStorageBackend;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

public class DUUISegmentedWorker implements Runnable {
private final int threadIndex;
private final AtomicBoolean shutdown;
private final DUUIComposer.PipelinePart pipelinePart;
private final DUUICollectionDBReader collectionReader;
private final TypeSystemDescription typesystem;
private final IDUUIStorageBackend backend;
private final String name;
private final List<String> pipelineUUIDs;
private final int pipelinePosition;

public DUUISegmentedWorker(int threadIndex, AtomicBoolean shutdown, DUUIComposer.PipelinePart pipelinePart, DUUICollectionDBReader collectionReader, TypeSystemDescription typesystem, IDUUIStorageBackend backend, String name, List<String> pipelineUUIDs) {
this.threadIndex = threadIndex;
this.shutdown = shutdown;
this.pipelinePart = pipelinePart;
this.collectionReader = collectionReader;
this.typesystem = typesystem;
this.backend = backend;
this.name = name;
this.pipelineUUIDs = pipelineUUIDs;
this.pipelinePosition = pipelineUUIDs.indexOf(pipelinePart.getUUID());
}

@Override
public void run() {
JCas jCas;
try {
jCas = JCasFactory.createJCas(typesystem);
} catch (UIMAException e) {
throw new RuntimeException(e);
}

boolean trackErrorDocs = false;
if (backend != null) {
trackErrorDocs = backend.shouldTrackErrorDocs();
}

while (true) {
long waitTimeStart = System.nanoTime();
while (true) {
if (shutdown.get()) {
jCas.reset();
return;
}
try {
if (!collectionReader.getNextCas(jCas, pipelinePart.getUUID(), pipelinePosition)) {
Thread.sleep(300);
} else {
break;
}
}
catch (Exception e) {
e.printStackTrace();
}
}
long waitTimeEnd = System.nanoTime();

boolean status = false;
try {
DUUIPipelineDocumentPerformance perf = new DUUIPipelineDocumentPerformance(name, waitTimeEnd - waitTimeStart, jCas, trackErrorDocs);

pipelinePart.getDriver().run(pipelinePart.getUUID(), jCas, perf);

if (backend != null) {
backend.addMetricsForDocument(perf);
}

status = true;

} catch (Exception e) {
status = false;
e.printStackTrace();
System.err.println(e.getMessage());
System.err.println("Error in pipeline part " + pipelinePart.getUUID() + ", continuing with next document!");
}
finally {
collectionReader.updateCas(jCas, pipelinePart.getUUID(), status, pipelineUUIDs);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.texttechnologylab.DockerUnifiedUIMAInterface.io;

import org.apache.uima.jcas.JCas;

import java.util.List;

public interface DUUICollectionDBReader extends DUUICollectionReader {
/**
* Fill and get the next JCas based on the tool
*
* @param pCas : JCas to be filled
* @param toolUUID : toolUUID
* @param pipelinePosition
* @return true if cas was filled, false if no cas was filled
*/
public boolean getNextCas(JCas pCas, String toolUUID, int pipelinePosition);

void updateCas(JCas pCas, String toolUUID, boolean status, List<String> pipelineUUIDs);

boolean finishedLoading();
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.texttechnologylab.DockerUnifiedUIMAInterface.io;

import de.tudarmstadt.ukp.dkpro.core.api.io.ProgressMeter;
import org.apache.uima.jcas.JCas;

/**
Expand Down
Loading

0 comments on commit de2f0b7

Please sign in to comment.