Skip to content

Commit

Permalink
Small tweaks to logprocessor
Browse files Browse the repository at this point in the history
  • Loading branch information
Corey-Dean Arthur committed Apr 7, 2020
1 parent 88f4647 commit 9ffd8c1
Showing 1 changed file with 25 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ public LogProcessor(LoggerPlusPlus loggerPlusPlus, LogTableController logTableCo
this.proxyIdToUUIDMap = new ConcurrentHashMap<>();
this.entriesPendingProcessing = new ConcurrentHashMap<>();
this.entryProcessingFutures = new ConcurrentHashMap<>();
this.entryProcessExecutor = new PausableThreadPoolExecutor(10, 10,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("LPP-LogManager"));
this.entryImportExecutor = new PausableThreadPoolExecutor(10, 10, 0L,
this.entryProcessExecutor = new PausableThreadPoolExecutor(0, 2147483647,
30L, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("LPP-LogManager"));
this.entryImportExecutor = new PausableThreadPoolExecutor(0, 10, 60L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("LPP-Import"));

//Create incomplete request cleanup thread so map doesn't get too big.
this.cleanupExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LPP-LogManager-Cleanup"));
this.cleanupExecutor.scheduleAtFixedRate(new AbandonedRequestCleanupRunnable(),30000L, 30000L, TimeUnit.MILLISECONDS);
this.cleanupExecutor.scheduleAtFixedRate(new AbandonedRequestCleanupRunnable(),30, 30, TimeUnit.SECONDS);

LoggerPlusPlus.callbacks.registerHttpListener(this);
LoggerPlusPlus.callbacks.registerProxyListener(this);
Expand Down Expand Up @@ -219,35 +219,24 @@ LogEntry processEntry(final LogEntry logEntry){
private RunnableFuture<LogEntry> createEntryUpdateRunnable(final Future<LogEntry> processingFuture,
final IHttpRequestResponse requestResponse,
final Date arrivalTime){
return new SwingWorker<LogEntry, Void>(){
@Override
protected LogEntry doInBackground() throws Exception {
//Block until initial processing is complete.
LogEntry logEntry = processingFuture.get();
if(logEntry == null){
return null; //Request to an ignored host. Stop processing.
}
logEntry.addResponse(requestResponse, arrivalTime);
processEntry(logEntry);

if(logEntry.getStatus() == Status.PROCESSED) {
//If the entry was fully processed, remove it from the processing list.
entryProcessingFutures.remove(logEntry.getIdentifier());
}

return logEntry;
return new FutureTask<>(() -> {
//Block until initial processing is complete.
LogEntry logEntry = processingFuture.get();
if (logEntry == null) {
return null; //Request to an ignored host. Stop processing.
}
logEntry.addResponse(requestResponse, arrivalTime);
processEntry(logEntry);

@Override
protected void done() {
try{
LogEntry logEntry = get();
logTableController.getLogTableModel().updateEntry(logEntry);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
if (logEntry.getStatus() == Status.PROCESSED) {
//If the entry was fully processed, remove it from the processing list.
entryProcessingFutures.remove(logEntry.getIdentifier());
}
};

updateExistingEntry(logEntry);

return logEntry;
});
}

public EntryImportWorker.Builder createEntryImportBuilder(){
Expand Down Expand Up @@ -296,6 +285,12 @@ void addProcessedEntry(LogEntry logEntry){
});
}

void updateExistingEntry(LogEntry logEntry){
SwingUtilities.invokeLater(() -> {
logTableController.getLogTableModel().updateEntry(logEntry);
});
}

PausableThreadPoolExecutor getEntryImportExecutor() {
return entryImportExecutor;
}
Expand Down

0 comments on commit 9ffd8c1

Please sign in to comment.