From 93a31bb37e39a693b21805267146cbc294579c5f Mon Sep 17 00:00:00 2001 From: Patrick Schrottenbacher Date: Wed, 10 Apr 2024 16:25:11 +0200 Subject: [PATCH 1/3] feat: added document locking --- .../io/reader/DUUISegmentationReader.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/reader/DUUISegmentationReader.java b/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/reader/DUUISegmentationReader.java index 6e4185de..f5813677 100644 --- a/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/reader/DUUISegmentationReader.java +++ b/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/reader/DUUISegmentationReader.java @@ -263,8 +263,9 @@ public boolean getNextCas(JCas pCas, String toolUUID, int pipelinePosition) { Bson notFinishedFilter = Filters.ne("finished", true); Bson sizeCondition = Filters.expr(new Document("$gte", Arrays.asList(new Document("$size", "$metadata.duui_status_tools"), pipelinePosition))); Bson toolExistsCondition = Filters.ne("metadata.duui_status_tools", toolUUID); - Bson query = Filters.and(notFinishedFilter, sizeCondition, toolExistsCondition); - Document next = (Document) mongoCollection.find(query).first(); + Bson notLockedCondition = Filters.ne("metadata.duui_locked", true); + Bson query = Filters.and(notFinishedFilter, sizeCondition, toolExistsCondition, notLockedCondition); + Document next = (Document) mongoCollection.findOneAndUpdate(query, new Document("$set", new Document("metadata.duui_locked", true))); // TODO fehlerhandling! if (next == null) { return false; @@ -308,6 +309,8 @@ public void updateCas(JCas pCas, String toolUUID, boolean status, List p boolean finished = new HashSet<>(toolUUIDs).containsAll(pipelineUUIDs); docMeta.append("duui_status_finished", finished); + docMeta.append("duui_locked", false); + GridFSUploadOptions options = new GridFSUploadOptions() .metadata(docMeta); From fe2775ed27357c38317ba8a39dd64d92e1aa4ba8 Mon Sep 17 00:00:00 2001 From: Patrick Schrottenbacher Date: Thu, 11 Apr 2024 17:10:03 +0000 Subject: [PATCH 2/3] feat: added merging and basic benchmark test --- pom.xml | 2 +- .../DUUIComposer.java | 5 +- .../io/DUUICollectionDBReader.java | 2 + .../io/reader/DUUISegmentationReader.java | 273 ++++++++++++++++-- .../io/transport/GZIPLocalFile.java | 21 ++ .../TestSegmentationReader.java | 107 ++++++- 6 files changed, 371 insertions(+), 39 deletions(-) create mode 100644 src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/transport/GZIPLocalFile.java diff --git a/pom.xml b/pom.xml index b4e31c06..2e09a0d8 100644 --- a/pom.xml +++ b/pom.xml @@ -294,7 +294,7 @@ com.github.texttechnologylab UIMATypeSystem - 74a8489af5 + 1.9.5 diff --git a/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/DUUIComposer.java b/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/DUUIComposer.java index 97caa331..4e7e55e9 100644 --- a/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/DUUIComposer.java +++ b/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/DUUIComposer.java @@ -736,7 +736,7 @@ public void runSegmented(DUUICollectionDBReader collectionReader, String name) t Instant starttime = Instant.now(); while(!collectionReader.finishedLoading() || collectionReader.getDone() < collectionReader.getSize()) { System.out.println(collectionReader.getProgress()); - Thread.sleep(10L); + Thread.sleep(500L); } System.out.println("[Composer] All documents have been processed. Signaling threads to shut down now..."); @@ -747,6 +747,9 @@ public void runSegmented(DUUICollectionDBReader collectionReader, String name) t System.out.printf("[Composer] Thread %d returned.\n", i); } + System.out.println("[Composer] Merging documents..."); + collectionReader.merge(); + if(_storage != null) { _storage.finalizeRun(name, starttime, Instant.now()); } diff --git a/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/DUUICollectionDBReader.java b/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/DUUICollectionDBReader.java index 00405b54..1a07758d 100644 --- a/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/DUUICollectionDBReader.java +++ b/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/DUUICollectionDBReader.java @@ -18,4 +18,6 @@ public interface DUUICollectionDBReader extends DUUICollectionReader { void updateCas(JCas pCas, String toolUUID, boolean status, List pipelineUUIDs); boolean finishedLoading(); + + void merge(); } diff --git a/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/reader/DUUISegmentationReader.java b/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/reader/DUUISegmentationReader.java index f5813677..859562c6 100644 --- a/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/reader/DUUISegmentationReader.java +++ b/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/reader/DUUISegmentationReader.java @@ -10,6 +10,7 @@ import com.mongodb.client.gridfs.model.GridFSUploadOptions; import com.mongodb.client.model.Filters; import de.tudarmstadt.ukp.dkpro.core.api.metadata.type.DocumentMetaData; +import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.SerializationUtils; import org.apache.uima.UIMAException; import org.apache.uima.cas.impl.XmiCasDeserializer; @@ -19,23 +20,28 @@ import org.apache.uima.jcas.JCas; import org.bson.Document; import org.bson.conversions.Bson; +import org.bson.types.ObjectId; import org.texttechnologylab.DockerUnifiedUIMAInterface.connection.mongodb.MongoDBConfig; import org.texttechnologylab.DockerUnifiedUIMAInterface.connection.mongodb.MongoDBConnectionHandler; import org.texttechnologylab.DockerUnifiedUIMAInterface.io.DUUICollectionDBReader; import org.texttechnologylab.DockerUnifiedUIMAInterface.io.ProgressMeter; import org.texttechnologylab.DockerUnifiedUIMAInterface.io.format.IDUUIFormat; -import org.texttechnologylab.DockerUnifiedUIMAInterface.io.format.TxtLoader; +import org.texttechnologylab.DockerUnifiedUIMAInterface.io.format.XmiLoader; +import org.texttechnologylab.DockerUnifiedUIMAInterface.io.transport.GZIPLocalFile; import org.texttechnologylab.DockerUnifiedUIMAInterface.io.transport.IDUUITransport; -import org.texttechnologylab.DockerUnifiedUIMAInterface.io.transport.LocalFile; import org.texttechnologylab.DockerUnifiedUIMAInterface.segmentation.DUUISegmentationStrategy; import org.texttechnologylab.DockerUnifiedUIMAInterface.segmentation.DUUISegmentationStrategyNone; import org.texttechnologylab.annotation.AnnotationComment; import org.xml.sax.SAXException; import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.net.URLEncoder; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; @@ -49,12 +55,15 @@ public class DUUISegmentationReader implements DUUICollectionDBReader { private static final String MONGO_BUCKET_NAME_FILES = MONGO_BUCKET_NAME + ".files"; private static final String DUUI_SEGMENTATION_READER_SEGMENT_ID = "__duui_segmentation_reader_segment_id__"; - - private final MongoCollection mongoCollection; + private final MongoDBConfig mongoDBConfig; + private final MongoCollection mongoCollection; private final GridFSBucket mongoBucket; private final DUUISegmentationStrategy segmentationStrategy; private final ProgressMeter progress; private final AtomicBoolean loadingFinished; + private final Path outPath; + private final int workers; + private final int capacity; protected static String getGridId(String docId, long segmentIndex) { return docId + "_" + segmentIndex; @@ -68,6 +77,16 @@ public PathMessage(Path path) { } } + static class MergeMessage { + public String id; + public List ids; + + public MergeMessage(String id, List ids) { + this.id = id; + this.ids = ids; + } + } + static class Segmenter implements Runnable { private final BlockingQueue queue; private final GridFSBucket mongoBucket; @@ -78,6 +97,7 @@ public Segmenter(BlockingQueue queue, MongoDBConfig mongoConfig, DU // TODO use single connection (pool?) for all threads? MongoDBConnectionHandler mongoConnectionHandler = new MongoDBConnectionHandler(mongoConfig); + this.mongoBucket = GridFSBuckets.create(mongoConnectionHandler.getDatabase(), MONGO_BUCKET_NAME); this.segmentationStrategy = segmentationStrategy; @@ -111,8 +131,8 @@ public void run() { System.out.println(Thread.currentThread().getId() + " - start " + path); try { - IDUUITransport transport = new LocalFile(path); - IDUUIFormat format = new TxtLoader("de"); + IDUUITransport transport = new GZIPLocalFile(path); + IDUUIFormat format = new XmiLoader(true); format.load(transport.load(), jCas); String docId = UUID.randomUUID().toString(); @@ -161,7 +181,7 @@ private void store(JCas jCas, String docId, long segmentIndex) { ); String segmentId = getGridId(docId, segmentIndex); - try(GridFSUploadStream upload = this.mongoBucket.openUploadStream(segmentId, options)) { + try (GridFSUploadStream upload = this.mongoBucket.openUploadStream(segmentId, options)) { // Add metainfo // TODO use special annotation type? @@ -180,15 +200,107 @@ private void store(JCas jCas, String docId, long segmentIndex) { } } - public DUUISegmentationReader(Path sourcePath, MongoDBConfig mongoConfig, DUUISegmentationStrategy segmentationStrategy, int workers) throws IOException, InterruptedException { - this(sourcePath, mongoConfig, segmentationStrategy, workers, Integer.MAX_VALUE); + static class Merger implements Runnable { + private final BlockingQueue queue; + private final GridFSBucket mongoBucket; + private final DUUISegmentationStrategy segmentationStrategy; + private final Path outPath; + + public Merger(BlockingQueue queue, Path outPath, MongoDBConfig mongoConfig, DUUISegmentationStrategy segmentationStrategy) { + this.queue = queue; + this.outPath = outPath; + // TODO use single connection (pool?) for all threads? + MongoDBConnectionHandler mongoConnectionHandler = new MongoDBConnectionHandler(mongoConfig); + + this.mongoBucket = GridFSBuckets.create(mongoConnectionHandler.getDatabase(), MONGO_BUCKET_NAME); + + this.segmentationStrategy = segmentationStrategy; + } + + @Override + public void run() { + JCas jCas; + try { + jCas = JCasFactory.createJCas(); + } catch (UIMAException e) { + throw new RuntimeException(e); + } + + while (true) { + MergeMessage message; + try { + message = queue.poll(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + if (message != null) { + if (message.id == null) + break; + try { + boolean first = true; + + for (ObjectId id : message.ids) { + // Bit ugly but it's fine for now... + if (first) { + fetch(id, jCas); + segmentationStrategy.initialize(jCas); + first = false; + } else { + JCas currentCas = JCasFactory.createJCas(); + fetch(id, currentCas); + segmentationStrategy.merge(currentCas); + } + } + } catch (UIMAException e) { + throw new RuntimeException(e); + } + String dir = String.format("%s/%s", this.outPath.toString(), getRelativePath(jCas, true, false)); + Path path = Paths.get(dir + ".xmi.gz"); + try { + Files.createDirectories(Paths.get(dir.substring(0, dir.lastIndexOf("/")))); + } catch (IOException e) { + throw new RuntimeException(e); + } + try (GZIPOutputStream stream = new GZIPOutputStream(Files.newOutputStream(path))) { + XmiCasSerializer.serialize(jCas.getCas(), stream); + } catch (SAXException | IOException e) { + throw new RuntimeException(e); + } + + jCas.reset(); + } + } + } + + private void fetch(ObjectId id, JCas cas) { + cas.reset(); + try (GridFSDownloadStream download = this.mongoBucket.openDownloadStream(id)) { + + // TODO different compressions using io/transport/format + try (GZIPInputStream gzip = new GZIPInputStream(download)) { + XmiCasDeserializer.deserialize(gzip, cas.getCas()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + public DUUISegmentationReader(Path sourcePath, Path outPath, MongoDBConfig mongoConfig, DUUISegmentationStrategy segmentationStrategy, int workers) throws IOException, InterruptedException { + this(sourcePath, outPath, mongoConfig, segmentationStrategy, workers, Integer.MAX_VALUE); } - public DUUISegmentationReader(Path sourcePath, MongoDBConfig mongoConfig, DUUISegmentationStrategy segmentationStrategy, int workers, int capacity) { + public DUUISegmentationReader(Path sourcePath, Path outPath, MongoDBConfig mongoConfig, DUUISegmentationStrategy segmentationStrategy, int workers, int capacity) { this.segmentationStrategy = segmentationStrategy; - + this.mongoDBConfig = mongoConfig; + this.outPath = outPath; + this.workers = workers; + this.capacity = capacity; MongoDBConnectionHandler mongoConnectionHandler = new MongoDBConnectionHandler(mongoConfig); this.mongoCollection = mongoConnectionHandler.getCollection(MONGO_BUCKET_NAME_FILES); + // TODO: Make this configurable + GridFSBuckets.create(mongoConnectionHandler.getDatabase(), MONGO_BUCKET_NAME).drop(); this.mongoBucket = GridFSBuckets.create(mongoConnectionHandler.getDatabase(), MONGO_BUCKET_NAME); this.progress = new ProgressMeter(getSize()); @@ -202,20 +314,13 @@ public DUUISegmentationReader(Path sourcePath, MongoDBConfig mongoConfig, DUUISe System.out.println("Starting " + workers + " workers to segment files..."); List segmenterThreads = new ArrayList<>(); for (int i = 0; i < workers; i++) { - Thread thread = new Thread(new Segmenter(queue, mongoConfig, SerializationUtils.clone(segmentationStrategy))); + Thread thread = new Thread(new Segmenter(queue, this.mongoDBConfig, SerializationUtils.clone(segmentationStrategy))); thread.start(); segmenterThreads.add(thread); } System.out.println("Collecting files from " + sourcePath + "..."); - long counter = 0; - try (DirectoryStream stream = Files.newDirectoryStream(sourcePath)) { - for (Path entry : stream) { - //System.out.println("Adding " + entry); - queue.put(new PathMessage(entry)); - counter++; - } - } + long counter = addFilesInFolder(sourcePath, queue); System.out.println("Added " + counter + " files to the queue."); System.out.println("Waiting for workers to finish..."); @@ -226,17 +331,32 @@ public DUUISegmentationReader(Path sourcePath, MongoDBConfig mongoConfig, DUUISe thread.join(); } System.out.println("All workers finished."); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); System.err.println("CollectionReader failed!"); - } - finally { + } finally { loadingFinished.set(true); } }).start(); } + private int addFilesInFolder(Path sourcePath, BlockingQueue queue) { + int counter = 0; + try (DirectoryStream stream = Files.newDirectoryStream(sourcePath)) { + for (Path entry : stream) { + if (entry.toFile().isDirectory()) { + counter += addFilesInFolder(entry, queue); + } else { + queue.put(new PathMessage(entry)); + counter++; + } + } + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); + } + return counter; + } + @Override public boolean finishedLoading() { return loadingFinished.get(); @@ -257,15 +377,13 @@ public void getNextCas(JCas pCas) { public boolean getNextCas(JCas pCas, String toolUUID, int pipelinePosition) { pCas.reset(); - // TODO document locking - // Not finished, has n tools processed, and not own tool Bson notFinishedFilter = Filters.ne("finished", true); Bson sizeCondition = Filters.expr(new Document("$gte", Arrays.asList(new Document("$size", "$metadata.duui_status_tools"), pipelinePosition))); Bson toolExistsCondition = Filters.ne("metadata.duui_status_tools", toolUUID); Bson notLockedCondition = Filters.ne("metadata.duui_locked", true); Bson query = Filters.and(notFinishedFilter, sizeCondition, toolExistsCondition, notLockedCondition); - Document next = (Document) mongoCollection.findOneAndUpdate(query, new Document("$set", new Document("metadata.duui_locked", true))); + Document next = mongoCollection.findOneAndUpdate(query, new Document("$set", new Document("metadata.duui_locked", true))); // TODO fehlerhandling! if (next == null) { return false; @@ -273,7 +391,7 @@ public boolean getNextCas(JCas pCas, String toolUUID, int pipelinePosition) { String segmentId = next.getString("filename"); GridFSDownloadOptions options = new GridFSDownloadOptions(); - try(GridFSDownloadStream download = mongoBucket.openDownloadStream(segmentId, options)) { + try (GridFSDownloadStream download = mongoBucket.openDownloadStream(segmentId, options)) { try (GZIPInputStream gzip = new GZIPInputStream(download)) { XmiCasDeserializer.deserialize(gzip, pCas.getCas()); } @@ -314,7 +432,7 @@ public void updateCas(JCas pCas, String toolUUID, boolean status, List p GridFSUploadOptions options = new GridFSUploadOptions() .metadata(docMeta); - try(GridFSUploadStream upload = this.mongoBucket.openUploadStream(segmentId, options)) { + try (GridFSUploadStream upload = this.mongoBucket.openUploadStream(segmentId, options)) { try (GZIPOutputStream gzip = new GZIPOutputStream(upload)) { XmiCasSerializer.serialize(pCas.getCas(), gzip); } @@ -323,6 +441,41 @@ public void updateCas(JCas pCas, String toolUUID, boolean status, List p } } + // TODO: Create a separate thread that merges files + public void merge() { + Bson sort = new Document("metadata.duui_document_id", 1).append("metadata.duui_segment_index", 1); + Bson group = new Document("_id", "$metadata.duui_document_id").append("ids", new Document("$push", "$_id")); + + try { + // TODO: Capacity + BlockingQueue queue = new LinkedBlockingDeque<>(this.capacity); + + System.out.println("Starting " + this.workers + " workers to merge files..."); + List mergerThreads = new ArrayList<>(); + for (int i = 0; i < this.workers; i++) { + Thread thread = new Thread(new Merger(queue, this.outPath, this.mongoDBConfig, SerializationUtils.clone(segmentationStrategy))); + thread.start(); + mergerThreads.add(thread); + } + for (Document doc : mongoCollection.aggregate(List.of(new Document("$sort", sort), new Document("$group", group)))) { + queue.put(new MergeMessage(doc.getString("_id"), doc.getList("ids", ObjectId.class))); + } + // Add termination messages + for (Thread ignored : mergerThreads) { + queue.put(new MergeMessage(null, null)); + } + System.out.println("Waiting for workers to finish..."); + for (Thread thread : mergerThreads) { + thread.join(); + } + System.out.println("All workers finished."); + } catch (Exception e) { + e.printStackTrace(); + System.err.println("CollectionReader failed!"); + } + + } + @Override public boolean hasNext() { return mongoCollection.countDocuments(new Document("metadata.duui_status_finished", false)) > 0; @@ -337,4 +490,66 @@ public long getSize() { public long getDone() { return mongoCollection.countDocuments(new Document("metadata.duui_status_finished", true)); } + + protected static String getRelativePath(JCas aJCas, boolean stripExtension, boolean escapeFilename) { + DocumentMetaData meta = DocumentMetaData.get(aJCas); + String baseUri = meta.getDocumentBaseUri(); + String docUri = meta.getDocumentUri(); + + if (baseUri != null && baseUri.length() != 0) { + // In some cases, the baseUri may not end with a slash - if so, we add one + if (baseUri.length() > 0 && !baseUri.endsWith("/")) { + baseUri += '/'; + } + + if ((docUri == null) || !docUri.startsWith(baseUri)) { + throw new IllegalStateException("Base URI [" + baseUri + + "] is not a prefix of document URI [" + docUri + "]"); + } + String relativeDocumentPath = docUri.substring(baseUri.length()); + if (stripExtension) { + relativeDocumentPath = FilenameUtils.removeExtension(relativeDocumentPath); + } + + // relativeDocumentPath must not start with as slash - if there are any, remove them + while (relativeDocumentPath.startsWith("/")) { + relativeDocumentPath = relativeDocumentPath.substring(1); + } + + if (!escapeFilename) { + try { + relativeDocumentPath = URLDecoder.decode(relativeDocumentPath, "UTF-8"); + } catch (UnsupportedEncodingException e) { + // UTF-8 must be supported on all Java platforms per specification. This should + // not happen. + throw new IllegalStateException(e); + } + } + + return relativeDocumentPath; + } else { + if (meta.getDocumentId() == null) { + throw new IllegalStateException( + "Neither base URI/document URI nor document ID set"); + } + + String relativeDocumentPath = meta.getDocumentId(); + + if (stripExtension) { + relativeDocumentPath = FilenameUtils.removeExtension(relativeDocumentPath); + } + + if (escapeFilename) { + try { + relativeDocumentPath = URLEncoder.encode(relativeDocumentPath, "UTF-8"); + } catch (UnsupportedEncodingException e) { + // UTF-8 must be supported on all Java platforms per specification. This should + // not happen. + throw new IllegalStateException(e); + } + } + + return relativeDocumentPath; + } + } } diff --git a/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/transport/GZIPLocalFile.java b/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/transport/GZIPLocalFile.java new file mode 100644 index 00000000..7fcd85ee --- /dev/null +++ b/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/transport/GZIPLocalFile.java @@ -0,0 +1,21 @@ +package org.texttechnologylab.DockerUnifiedUIMAInterface.io.transport; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.zip.GZIPInputStream; + +public class GZIPLocalFile implements IDUUITransport { + private final Path path; + + public GZIPLocalFile(Path path) { + this.path = path; + } + + @Override + public InputStream load() throws IOException { + return new GZIPInputStream(Files.newInputStream(path, StandardOpenOption.READ)); + } +} diff --git a/src/test/java/org/texttechnologylab/DockerUnifiedUIMAInterface/TestSegmentationReader.java b/src/test/java/org/texttechnologylab/DockerUnifiedUIMAInterface/TestSegmentationReader.java index 17fa6e70..e246a6e2 100644 --- a/src/test/java/org/texttechnologylab/DockerUnifiedUIMAInterface/TestSegmentationReader.java +++ b/src/test/java/org/texttechnologylab/DockerUnifiedUIMAInterface/TestSegmentationReader.java @@ -3,37 +3,126 @@ import org.junit.jupiter.api.Test; import org.texttechnologylab.DockerUnifiedUIMAInterface.connection.mongodb.MongoDBConfig; import org.texttechnologylab.DockerUnifiedUIMAInterface.driver.DUUIRemoteDriver; +import org.texttechnologylab.DockerUnifiedUIMAInterface.driver.DUUISwarmDriver; import org.texttechnologylab.DockerUnifiedUIMAInterface.io.reader.DUUISegmentationReader; import org.texttechnologylab.DockerUnifiedUIMAInterface.lua.DUUILuaContext; +import org.texttechnologylab.DockerUnifiedUIMAInterface.pipeline_storage.sqlite.DUUISqliteStorageBackend; import org.texttechnologylab.DockerUnifiedUIMAInterface.segmentation.DUUISegmentationStrategy; import org.texttechnologylab.DockerUnifiedUIMAInterface.segmentation.DUUISegmentationStrategyByDelemiter; +import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; +import java.util.List; public class TestSegmentationReader { + + private enum Tasks { + FLAIR, + SENTIMENT + } + @Test - void testSegmentationReader() throws Exception { - int toolWorkers = 3; + public void testAll() throws Exception { + List delimiterRange = List.of(200, 500, 1000, 2000, 5000, 10000); + for (int delimiter : delimiterRange) { + testBase("gerparcor_sample1000_RANDOM_100", delimiter, Tasks.SENTIMENT); + System.gc(); + } + } - int segmentationWorkers = 2; + public void testBase(String corpus, int delimeterSize, Tasks task) throws Exception { + int toolWorkers = 1; + + int segmentationWorkers = 1; int segmentationQueueSize = Integer.MAX_VALUE; + Path out = Paths.get("/opt/sample/out/" + corpus + "_" + delimeterSize + "_" + task.name()); + Files.createDirectory(out); + DUUISqliteStorageBackend sqlite = new DUUISqliteStorageBackend("./benchmark.db") + .withConnectionPoolSize(toolWorkers); + DUUIComposer composer = new DUUIComposer() .withSkipVerification(true) .withLuaContext(new DUUILuaContext().withJsonLibrary()) - .withWorkers(toolWorkers); + .withWorkers(toolWorkers) + .withStorageBackend(sqlite); + DUUISwarmDriver swarmDriver = new DUUISwarmDriver(); DUUIRemoteDriver remoteDriver = new DUUIRemoteDriver(); + composer.addDriver(swarmDriver); + composer.addDriver(remoteDriver); + + MongoDBConfig mongoConfig = new MongoDBConfig("segmentation_mongo.properties"); + + DUUISegmentationStrategy segmentationStrategy = new DUUISegmentationStrategyByDelemiter() + .withLength(delimeterSize) + .withDelemiter("."); + + DUUISegmentationReader reader = new DUUISegmentationReader( + Paths.get("/opt/sample/" + corpus), + out, + mongoConfig, + segmentationStrategy, + segmentationWorkers, + segmentationQueueSize + ); + + System.out.println("Size: " + reader.getSize()); + System.out.println("Done: " + reader.getDone()); + System.out.println("Progress: " + reader.getProgress()); + + + switch (task) { + case FLAIR: + composer.add( + new DUUIRemoteDriver + .Component("http://isengart.hucompute.org:8100") + .withScale(toolWorkers) + ); + break; + case SENTIMENT: + composer.add( + new DUUISwarmDriver + .Component("docker.texttechnologylab.org/textimager-duui-transformers-sentiment:latest") + .withScale(toolWorkers) + .withParameter("model_name", "cardiffnlp/twitter-roberta-base-sentiment") + .withParameter("selection", "text") + .withConstraintHost("isengart") + ); + break; + } + composer.runSegmented(reader, corpus + "_" + delimeterSize + "_" + task.name()); + composer.shutdown(); + } + @Test + void testSegmentationReader() throws Exception { + int toolWorkers = 1; + + int segmentationWorkers = 1; + int segmentationQueueSize = Integer.MAX_VALUE; + + DUUISqliteStorageBackend sqlite = new DUUISqliteStorageBackend("./test.db") + .withConnectionPoolSize(toolWorkers); + + DUUIComposer composer = new DUUIComposer() + .withSkipVerification(true) + .withLuaContext(new DUUILuaContext().withJsonLibrary()) + .withWorkers(toolWorkers) + .withStorageBackend(sqlite); + + DUUISwarmDriver remoteDriver = new DUUISwarmDriver(); composer.addDriver(remoteDriver); MongoDBConfig mongoConfig = new MongoDBConfig("segmentation_mongo.properties"); DUUISegmentationStrategy segmentationStrategy = new DUUISegmentationStrategyByDelemiter() - .withLength(50) + .withLength(500) .withDelemiter("."); DUUISegmentationReader reader = new DUUISegmentationReader( - Paths.get("src/test/resources/org.texttechnologylab.DockerUnifiedUIMAInterface.segmentation/segmentation/txt"), + Paths.get("/opt/sample/gerparcor_sample1000_SMALLEST_100/Bayern"), + Paths.get("/opt/sample/out"), mongoConfig, segmentationStrategy, segmentationWorkers, @@ -45,11 +134,13 @@ void testSegmentationReader() throws Exception { System.out.println("Progress: " + reader.getProgress()); composer.add( - new DUUIRemoteDriver - .Component("http://127.0.0.1:9714") + new DUUISwarmDriver + .Component("docker.texttechnologylab.org/duui-spacy-de_core_news_sm:0.4.1") .withScale(toolWorkers) + .withConstraintHost("isengart") ); composer.runSegmented(reader, "test1"); + composer.shutdown(); } } From 8784587ab73bac27203fa552e80835256cfad08a Mon Sep 17 00:00:00 2001 From: Patrick Schrottenbacher Date: Fri, 12 Apr 2024 15:30:07 +0000 Subject: [PATCH 3/3] fix: fixed issues with typesystem, added workaround for issues with xml files not being deserializable --- pom.xml | 2 +- .../io/reader/DUUISegmentationReader.java | 4 ++-- .../DockerUnifiedUIMAInterface/TestSegmentationReader.java | 7 +++++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index 2e09a0d8..4067af02 100644 --- a/pom.xml +++ b/pom.xml @@ -294,7 +294,7 @@ com.github.texttechnologylab UIMATypeSystem - 1.9.5 + 6a3e0729b4 diff --git a/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/reader/DUUISegmentationReader.java b/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/reader/DUUISegmentationReader.java index 859562c6..672dd12c 100644 --- a/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/reader/DUUISegmentationReader.java +++ b/src/main/java/org/texttechnologylab/DockerUnifiedUIMAInterface/io/reader/DUUISegmentationReader.java @@ -149,8 +149,8 @@ public void run() { segmentIndex++; } } - } catch (UIMAException | IOException e) { - throw new RuntimeException(e); + } catch (Exception e) { + e.printStackTrace(); } jCas.reset(); diff --git a/src/test/java/org/texttechnologylab/DockerUnifiedUIMAInterface/TestSegmentationReader.java b/src/test/java/org/texttechnologylab/DockerUnifiedUIMAInterface/TestSegmentationReader.java index e246a6e2..ab4caa8b 100644 --- a/src/test/java/org/texttechnologylab/DockerUnifiedUIMAInterface/TestSegmentationReader.java +++ b/src/test/java/org/texttechnologylab/DockerUnifiedUIMAInterface/TestSegmentationReader.java @@ -1,5 +1,7 @@ package org.texttechnologylab.DockerUnifiedUIMAInterface; +import de.tudarmstadt.ukp.dkpro.core.api.metadata.type.DocumentMetaData; +import de.tudarmstadt.ukp.dkpro.core.api.metadata.type.DocumentMetaData_Type; import org.junit.jupiter.api.Test; import org.texttechnologylab.DockerUnifiedUIMAInterface.connection.mongodb.MongoDBConfig; import org.texttechnologylab.DockerUnifiedUIMAInterface.driver.DUUIRemoteDriver; @@ -24,7 +26,7 @@ private enum Tasks { @Test public void testAll() throws Exception { - List delimiterRange = List.of(200, 500, 1000, 2000, 5000, 10000); + List delimiterRange = List.of(5000, 200, 500, 1000, 10_000, 20_000, 50_000); for (int delimiter : delimiterRange) { testBase("gerparcor_sample1000_RANDOM_100", delimiter, Tasks.SENTIMENT); System.gc(); @@ -87,7 +89,7 @@ public void testBase(String corpus, int delimeterSize, Tasks task) throws Except .Component("docker.texttechnologylab.org/textimager-duui-transformers-sentiment:latest") .withScale(toolWorkers) .withParameter("model_name", "cardiffnlp/twitter-roberta-base-sentiment") - .withParameter("selection", "text") + .withParameter("selection", "de.tudarmstadt.ukp.dkpro.core.api.segmentation.type.Sentence") .withConstraintHost("isengart") ); break; @@ -95,6 +97,7 @@ public void testBase(String corpus, int delimeterSize, Tasks task) throws Except composer.runSegmented(reader, corpus + "_" + delimeterSize + "_" + task.name()); composer.shutdown(); } + @Test void testSegmentationReader() throws Exception { int toolWorkers = 1;