Skip to content

Commit

Permalink
Add delay optimisations & log time taken for upload
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Jan 16, 2023
1 parent b0d55c0 commit 07561b9
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public abstract class AsyncIOProcessor<Item> {
final ArrayBlockingQueue<Tuple<Item, Consumer<Exception>>> queue;
private final ThreadContext threadContext;
final Semaphore promiseSemaphore = new Semaphore(1);
long lastRunStartTimeInMs;

protected AsyncIOProcessor(Logger logger, int queueSize, ThreadContext threadContext) {
this.logger = logger;
Expand Down Expand Up @@ -95,22 +96,28 @@ public void put(Item item, Consumer<Exception> listener) {
// no need to preserve context for listener since it runs in current thread.
candidates.add(new Tuple<>(item, listener));
}
// since we made the promise to process we gotta do it here at least once
process(candidates);
}
}

protected void process(List<Tuple<Item, Consumer<Exception>>> candidates) {
// since we made the promise to process we gotta do it here at least once
drainAndProcessAndRelease(candidates);
while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) {
// yet if the queue is not empty AND nobody else has yet made the promise to take over we continue processing
drainAndProcessAndRelease(candidates);
while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) {
// yet if the queue is not empty AND nobody else has yet made the promise to take over we continue processing
drainAndProcessAndRelease(candidates);
}
}
}

void drainAndProcessAndRelease(List<Tuple<Item, Consumer<Exception>>> candidates) {
lastRunStartTimeInMs = System.currentTimeMillis();
Exception exception;
try {
queue.drainTo(candidates);
exception = processList(candidates);
} finally {
promiseSemaphore.release();
logger.info("step=drainAndProcessAndRelease timeTakenInMs={}", (System.currentTimeMillis() - lastRunStartTimeInMs));
}
notifyList(candidates, exception);
candidates.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,14 @@
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
* A variant of AsyncIOProcessor which will wait for interval before processing items
* processing happens in another thread from same threadpool after {@link #bufferInterval}
*
* <p>
* Requests are buffered till processor thread calls @{link drainAndProcessAndRelease} after bufferInterval.
* If more requests are enequed between invocations of drainAndProcessAndRelease, another processor thread
* gets scheduled. Subsequent requests will get buffered till drainAndProcessAndRelease gets called in this new
Expand Down Expand Up @@ -95,19 +94,11 @@ protected long getPutBlockingTimeoutMillis() {
}

private void scheduleRefresh() {
Runnable processor = () -> {
final List<Tuple<Item, Consumer<Exception>>> candidates = new ArrayList<>();
// since we made the promise to process we gotta do it here at least once
drainAndProcessAndRelease(candidates);
while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) {
// yet if the queue is not empty AND nobody else has yet made the promise to take over we continue processing
drainAndProcessAndRelease(candidates);
}
};
Runnable processor = () -> { process(new ArrayList<>()); };

if (promiseSemaphore.tryAcquire()) {
try {
threadpool.schedule(processor, this.bufferInterval, getBufferRefreshThreadPoolName());
threadpool.schedule(processor, getBufferInterval(), getBufferRefreshThreadPoolName());
} catch (Exception e) {
logger.error("failed to schedule refresh");
promiseSemaphore.release();
Expand All @@ -116,6 +107,14 @@ private void scheduleRefresh() {
}
}

private TimeValue getBufferInterval() {
long timeSinceLastRunStartInMS = System.currentTimeMillis() - lastRunStartTimeInMs;
if (timeSinceLastRunStartInMS >= bufferInterval.getMillis()) {
return TimeValue.ZERO;
}
return TimeValue.timeValueMillis(bufferInterval.getMillis() - timeSinceLastRunStartInMS);
}

protected String getBufferRefreshThreadPoolName() {
return ThreadPool.Names.TRANSLOG_SYNC;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc
}

private boolean upload(Long primaryTerm, Long generation) throws IOException {
long uploadStartTime = System.currentTimeMillis();
boolean primaryMode = primaryModeSupplier.getAsBoolean();
if (primaryMode == false) {
logger.trace("skipped uploading translog for {} {}", primaryTerm, generation);
Expand All @@ -227,11 +228,13 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOExcepti
closeFilesIfNoPendingRetentionLocks();
maxRemoteTranslogGenerationUploaded = generation;
logger.trace("uploaded translog for {} {} ", primaryTerm, generation);
logUploadStats(uploadStartTime, true);
}

@Override
public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException {
transferReleasable.close();
logUploadStats(uploadStartTime, false);
closeFilesIfNoPendingRetentionLocks();
if (ex instanceof IOException) {
throw (IOException) ex;
Expand All @@ -244,6 +247,11 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro

}

private void logUploadStats(long uploadStartTime, boolean uploadStatus) {
logger.info("Translog Upload status={} timeTaken={}", uploadStatus, (System.currentTimeMillis() - uploadStartTime));

}

// Visible for testing
public Set<String> allUploaded() {
return fileTransferTracker.allUploaded();
Expand Down

0 comments on commit 07561b9

Please sign in to comment.