diff --git a/elasticsearch-client/src/main/java/fr/pilato/elasticsearch/crawler/fs/client/ElasticsearchClient.java b/elasticsearch-client/src/main/java/fr/pilato/elasticsearch/crawler/fs/client/ElasticsearchClient.java index 7573c3c60..dc1efc199 100644 --- a/elasticsearch-client/src/main/java/fr/pilato/elasticsearch/crawler/fs/client/ElasticsearchClient.java +++ b/elasticsearch-client/src/main/java/fr/pilato/elasticsearch/crawler/fs/client/ElasticsearchClient.java @@ -235,6 +235,7 @@ public void start() throws ElasticsearchClientException { ElasticsearchBulkRequest::new) .setBulkActions(settings.getElasticsearch().getBulkSize()) .setFlushInterval(settings.getElasticsearch().getFlushInterval()) + .setByteSize(settings.getElasticsearch().getByteSize()) .build(); } diff --git a/framework/src/main/java/fr/pilato/elasticsearch/crawler/fs/framework/bulk/FsCrawlerBulkProcessor.java b/framework/src/main/java/fr/pilato/elasticsearch/crawler/fs/framework/bulk/FsCrawlerBulkProcessor.java index 57c0ce086..f97c7916f 100644 --- a/framework/src/main/java/fr/pilato/elasticsearch/crawler/fs/framework/bulk/FsCrawlerBulkProcessor.java +++ b/framework/src/main/java/fr/pilato/elasticsearch/crawler/fs/framework/bulk/FsCrawlerBulkProcessor.java @@ -19,6 +19,7 @@ package fr.pilato.elasticsearch.crawler.fs.framework.bulk; +import fr.pilato.elasticsearch.crawler.fs.framework.ByteSizeValue; import fr.pilato.elasticsearch.crawler.fs.framework.TimeValue; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -30,7 +31,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; - +import static fr.pilato.elasticsearch.crawler.fs.framework.JsonUtil.serialize; /** * Bulk processor */ @@ -43,6 +44,7 @@ public class FsCrawlerBulkProcessor< private static final Logger logger = LogManager.getLogger(FsCrawlerBulkProcessor.class); private final int bulkActions; + private final ByteSizeValue byteSize; private final Listener listener; private final Engine engine; private Req bulkRequest; @@ -50,15 +52,18 @@ public class FsCrawlerBulkProcessor< private final ScheduledExecutorService executor; private volatile boolean closed = false; private final AtomicLong executionIdGen = new AtomicLong(); - + private int totalByteSize = 0; + private int lastJsonSize = 0; public FsCrawlerBulkProcessor(Engine engine, Listener listener, int bulkActions, TimeValue flushInterval, + ByteSizeValue byteSize, Supplier requestSupplier) { this.engine = engine; this.listener = listener; this.bulkActions = bulkActions; + this.byteSize = byteSize; this.requestSupplier = requestSupplier; this.bulkRequest = requestSupplier.get(); this.listener.setBulkProcessor(this); @@ -108,10 +113,11 @@ private void internalClose() throws InterruptedException { * @return this so we can link methods. */ public synchronized FsCrawlerBulkProcessor add(O request) { - ensureOpen(); - bulkRequest.add(request); - executeIfNeeded(); - return this; + ensureOpen(); + String jsonValue = serialize(request); + addingByteSize(jsonValue); + executeIfNeededWithByteCheck(request); + return this; } private void ensureOpen() { @@ -119,7 +125,25 @@ private void ensureOpen() { throw new IllegalStateException("bulk process already closed"); } } - + + private void executeIfNeededWithByteCheck(O request) { + ensureOpen(); + if (compareWithByteSize(byteSize.getBytes(), totalByteSize)) { + logger.trace("crossed byte size limit of {}", byteSize); + execute(); + totalByteSize = lastJsonSize; + bulkRequest.add(request); + } + else if (bulkSizeLimitCheck()) { + bulkRequest.add(request); + logger.trace("crossed bulk size limit of {}", bulkActions); + execute(); + totalByteSize = 0; + } else { + bulkRequest.add(request); + executeIfNeeded(); + } + } private void executeIfNeeded() { ensureOpen(); if (isOverTheLimit()) { @@ -156,7 +180,21 @@ private void execute() { private boolean isOverTheLimit() { return (bulkActions != -1) && (bulkRequest.numberOfActions() >= bulkActions); } - + + private boolean bulkSizeLimitCheck() { + return (bulkActions != -1) && (bulkRequest.numberOfActions() >= (bulkActions - 1)); + } + + private void addingByteSize(String jsonObj) { + byte[] bytes = jsonObj.getBytes(); + lastJsonSize = bytes.length; + totalByteSize += bytes.length; + } + + private boolean compareWithByteSize(long limit, int totalSize) { + return (totalSize >= limit); + } + public Listener getListener() { return listener; } @@ -171,6 +209,7 @@ public static class Builder, private int bulkActions; private TimeValue flushInterval; + private ByteSizeValue byteSize; private final Engine engine; private final Listener listener; private final Supplier requestSupplier; @@ -190,9 +229,14 @@ public Builder setFlushInterval(TimeValue flushInterval) { this.flushInterval = flushInterval; return this; } - + + public Builder setByteSize(ByteSizeValue byteSizeValue) { + this.byteSize = byteSizeValue; + return this; + } + public FsCrawlerBulkProcessor build() { - return new FsCrawlerBulkProcessor<>(engine, listener, bulkActions, flushInterval, requestSupplier); + return new FsCrawlerBulkProcessor<>(engine, listener, bulkActions, flushInterval, byteSize, requestSupplier); } }