Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

modified code for byte_size parameter #1846

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ public void start() throws ElasticsearchClientException {
ElasticsearchBulkRequest::new)
.setBulkActions(settings.getElasticsearch().getBulkSize())
.setFlushInterval(settings.getElasticsearch().getFlushInterval())
.setByteSize(settings.getElasticsearch().getByteSize())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package fr.pilato.elasticsearch.crawler.fs.framework.bulk;

import fr.pilato.elasticsearch.crawler.fs.framework.ByteSizeValue;
import fr.pilato.elasticsearch.crawler.fs.framework.TimeValue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -30,7 +31,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import static fr.pilato.elasticsearch.crawler.fs.framework.JsonUtil.serialize;
/**
* Bulk processor
*/
Expand All @@ -43,22 +44,26 @@ public class FsCrawlerBulkProcessor<
private static final Logger logger = LogManager.getLogger(FsCrawlerBulkProcessor.class);

private final int bulkActions;
private final ByteSizeValue byteSize;
private final Listener<O, Req, Res> listener;
private final Engine<O, Req, Res> engine;
private Req bulkRequest;
private final Supplier<Req> requestSupplier;
private final ScheduledExecutorService executor;
private volatile boolean closed = false;
private final AtomicLong executionIdGen = new AtomicLong();

private int totalByteSize = 0;
private int lastJsonSize = 0;
public FsCrawlerBulkProcessor(Engine<O, Req, Res> engine,
Listener<O, Req, Res> listener,
int bulkActions,
TimeValue flushInterval,
ByteSizeValue byteSize,
Supplier<Req> requestSupplier) {
this.engine = engine;
this.listener = listener;
this.bulkActions = bulkActions;
this.byteSize = byteSize;
this.requestSupplier = requestSupplier;
this.bulkRequest = requestSupplier.get();
this.listener.setBulkProcessor(this);
Expand Down Expand Up @@ -108,18 +113,37 @@ private void internalClose() throws InterruptedException {
* @return this so we can link methods.
*/
public synchronized FsCrawlerBulkProcessor<O, Req, Res> add(O request) {
ensureOpen();
bulkRequest.add(request);
executeIfNeeded();
return this;
ensureOpen();
String jsonValue = serialize(request);
addingByteSize(jsonValue);
executeIfNeededWithByteCheck(request);
return this;
}

private void ensureOpen() {
if (closed) {
throw new IllegalStateException("bulk process already closed");
}
}


private void executeIfNeededWithByteCheck(O request) {
ensureOpen();
if (compareWithByteSize(byteSize.getBytes(), totalByteSize)) {
logger.trace("crossed byte size limit of {}", byteSize);
execute();
totalByteSize = lastJsonSize;
bulkRequest.add(request);
}
else if (bulkSizeLimitCheck()) {
bulkRequest.add(request);
logger.trace("crossed bulk size limit of {}", bulkActions);
execute();
totalByteSize = 0;
} else {
bulkRequest.add(request);
executeIfNeeded();
}
}
private void executeIfNeeded() {
ensureOpen();
if (isOverTheLimit()) {
Expand Down Expand Up @@ -156,7 +180,21 @@ private void execute() {
private boolean isOverTheLimit() {
return (bulkActions != -1) && (bulkRequest.numberOfActions() >= bulkActions);
}


private boolean bulkSizeLimitCheck() {
return (bulkActions != -1) && (bulkRequest.numberOfActions() >= (bulkActions - 1));
}

private void addingByteSize(String jsonObj) {
byte[] bytes = jsonObj.getBytes();
lastJsonSize = bytes.length;
totalByteSize += bytes.length;
}

private boolean compareWithByteSize(long limit, int totalSize) {
return (totalSize >= limit);
}

public Listener<O, Req, Res> getListener() {
return listener;
}
Expand All @@ -171,6 +209,7 @@ public static class Builder<O extends FsCrawlerOperation<O>,

private int bulkActions;
private TimeValue flushInterval;
private ByteSizeValue byteSize;
private final Engine<O, Req, Res> engine;
private final Listener<O, Req, Res> listener;
private final Supplier<Req> requestSupplier;
Expand All @@ -190,9 +229,14 @@ public Builder<O, Req, Res> setFlushInterval(TimeValue flushInterval) {
this.flushInterval = flushInterval;
return this;
}


public Builder<O, Req, Res> setByteSize(ByteSizeValue byteSizeValue) {
this.byteSize = byteSizeValue;
return this;
}

public FsCrawlerBulkProcessor<O, Req, Res> build() {
return new FsCrawlerBulkProcessor<>(engine, listener, bulkActions, flushInterval, requestSupplier);
return new FsCrawlerBulkProcessor<>(engine, listener, bulkActions, flushInterval, byteSize, requestSupplier);
}
}

Expand Down
Loading