Skip to content

Commit

Permalink
Issue #30 - Improved ETC calculation by using last 10 (default number)
Browse files Browse the repository at this point in the history
tps values instead of total average and takes pausing into account.
  • Loading branch information
bbandlamudi committed Jul 25, 2016
1 parent 82a77c8 commit 657d46e
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 35 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ Option | Description
**DISK-QUEUE** | Boolean value indicating whether the CoRB job should spill to disk when a maximum number of URIs have been loaded in memory, in order to control memory consumption and avoid Out of Memory exceptions for extremely large sets of URIs.
**DISK-QUEUE-MAX-IN-MEMORY-SIZE** | The maximum number of URIs to hold in memory before spilling over to disk. Default is 1000.
**DISK-QUEUE-TEMP-DIR** | The directory where the URIs queue can write to disk when the maximum in-memory items has been exceeded. Default behavior is to use java.io.tmpdir.
**NUM_TPS_FOR_ETC** | Default is 10. Number of recent transactions per second (tps) values used to calculate estimated completion time (ETC).

### Alternate XCC connection configuration
Option | Description
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/marklogic/developer/corb/Manager.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static com.marklogic.developer.corb.Options.INSTALL;
import static com.marklogic.developer.corb.Options.MODULES_DATABASE;
import static com.marklogic.developer.corb.Options.MODULE_ROOT;
import static com.marklogic.developer.corb.Options.NUM_TPS_FOR_ETC;
import static com.marklogic.developer.corb.Options.OPTIONS_FILE;
import static com.marklogic.developer.corb.Options.POST_BATCH_MODULE;
import static com.marklogic.developer.corb.Options.POST_BATCH_TASK;
Expand Down Expand Up @@ -234,6 +235,8 @@ protected void initOptions(String[] args) throws ClassNotFoundException, Instant
options.setUseDiskQueue(stringToBoolean(getOption(DISK_QUEUE)));
String diskQueueMaxInMemorySize = getOption(DISK_QUEUE_MAX_IN_MEMORY_SIZE);
String diskQueueTempDir = getOption(DISK_QUEUE_TEMP_DIR);

String numTpsForETC = getOption(NUM_TPS_FOR_ETC);

//Check legacy properties keys, for backwards compatability
if (processModule == null) {
Expand Down Expand Up @@ -275,6 +278,9 @@ protected void initOptions(String[] args) throws ClassNotFoundException, Instant
if (diskQueueMaxInMemorySize != null) {
options.setDiskQueueMaxInMemorySize(Integer.parseInt(diskQueueMaxInMemorySize));
}
if (numTpsForETC != null) {
options.setNumTpsForETC(Integer.parseInt(numTpsForETC));
}
if (!this.properties.containsKey(EXPORT_FILE_DIR) && exportFileDir != null) {
this.properties.put(EXPORT_FILE_DIR, exportFileDir);
}
Expand Down Expand Up @@ -800,6 +806,10 @@ public void pause() {
pool.pause();
}
}

public boolean isPaused(){
return pool != null && pool.isPaused();
}

/**
* Resume pool execution (if paused).
Expand Down
59 changes: 46 additions & 13 deletions src/main/java/com/marklogic/developer/corb/Monitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.math.RoundingMode;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand All @@ -39,6 +40,8 @@
public class Monitor implements Runnable {

protected static final Logger LOG = Logger.getLogger(Monitor.class.getName());

protected static final int DEFAULT_NUM_TPS_FOR_ETC = 10;

private final CompletionService<String[]> cs;
private long lastProgress = 0;
Expand All @@ -51,6 +54,9 @@ public class Monitor implements Runnable {
protected long completed = 0;
private long prevCompleted = 0;
private long prevMillis = 0;

private final ArrayList<Double> tpsForETCList;
private final int numTpsForEtc;

/**
* @param pool
Expand All @@ -61,6 +67,9 @@ public Monitor(PausableThreadPoolExecutor pool, CompletionService<String[]> cs,
this.pool = pool;
this.cs = cs;
this.manager = manager;

this.numTpsForEtc = manager.getOptions() != null ? manager.getOptions().getNumTpsForETC() : DEFAULT_NUM_TPS_FOR_ETC;
this.tpsForETCList = new ArrayList<Double>(this.numTpsForEtc);
}

/*
Expand Down Expand Up @@ -154,48 +163,72 @@ public long getCompletedCount() {

private String getProgressMessage(long completed) {
long curMillis = System.currentTimeMillis();
double tps = calculateThreadsPerSecond(completed, curMillis, startMillis);
double tps = calculateTransactionsPerSecond(completed, curMillis, startMillis);
double curTps = tps;
if (prevMillis > 0) {
curTps = calculateThreadsPerSecond(completed, prevCompleted, curMillis, prevMillis);
curTps = calculateTransactionsPerSecond(completed, prevCompleted, curMillis, prevMillis);
}
prevCompleted = completed;
prevMillis = curMillis;

return getProgressMessage(completed, taskCount, tps, curTps, pool.getActiveCount());

boolean isPaused = manager.isPaused();
double tpsForETC = calculateTpsForETC(curTps,isPaused);
return getProgressMessage(completed, taskCount, tps, curTps, tpsForETC, pool.getActiveCount(), isPaused);
}

protected double calculateTpsForETC(double curTps, boolean isPaused){
if(curTps == 0 && isPaused){
this.tpsForETCList.clear();
}else {
if(this.tpsForETCList.size() >= this.numTpsForEtc){
this.tpsForETCList.remove(0);
}
this.tpsForETCList.add(curTps);
}

double tpsForETC = 0;
double sum = 0;
for(Double next:this.tpsForETCList){
sum += next;
}
if(this.tpsForETCList.size() > 0){
tpsForETC = sum / this.tpsForETCList.size();
}
return tpsForETC;
}

protected static double calculateThreadsPerSecond(long amountCompleted, long currentMillis, long previousMillis) {
return calculateThreadsPerSecond(amountCompleted, 0, currentMillis, previousMillis);
static protected double calculateTransactionsPerSecond(long amountCompleted, long currentMillis, long previousMillis) {
return calculateTransactionsPerSecond(amountCompleted, 0, currentMillis, previousMillis);
}

protected static double calculateThreadsPerSecond(long amountCompleted, long previouslyCompleted, long currentMillis, long previousMillis) {
static protected double calculateTransactionsPerSecond(long amountCompleted, long previouslyCompleted, long currentMillis, long previousMillis) {
return (amountCompleted - previouslyCompleted) * 1000d / (currentMillis - previousMillis);
}

protected static String getProgressMessage(long completed, long taskCount, double tps, double curTps, int threads) {
String etc = getEstimatedTimeCompletion(taskCount, completed, tps);
static protected String getProgressMessage(long completed, long taskCount, double tps, double curTps, double tpsForETC, int threads, boolean isPaused) {
String etc = getEstimatedTimeCompletion(taskCount, completed, tpsForETC, isPaused);
return completed + "/" + taskCount + ", " +
formatTransactionsPerSecond(tps) + " tps(avg), " +
formatTransactionsPerSecond(curTps) + " tps(cur), " +
"ETC " + etc + ", " +
threads + " active threads.";
}

protected static String getEstimatedTimeCompletion(double taskCount, double completed, double tps) {
double ets = (tps != 0) ? (taskCount - completed) / tps : -1;
static protected String getEstimatedTimeCompletion(double taskCount, double completed, double tpsForETC, boolean isPaused) {
double ets = (tpsForETC != 0) ? (taskCount - completed) / tpsForETC : -1;
int hours = (int) ets / 3600;
int minutes = (int) (ets % 3600) / 60;
int seconds = (int) ets % 60;
return String.format("%02d:%02d:%02d", hours, minutes, seconds);
return String.format("%02d:%02d:%02d", hours, minutes, seconds)
+ (isPaused ? " (paused)":"");
}
/**
* Returns a string representation of the number.
* Returns a decimal number to two places if the value is less than 1.
* @param n
* @return
*/
protected static String formatTransactionsPerSecond(Number n) {
static protected String formatTransactionsPerSecond(Number n) {
NumberFormat format = DecimalFormat.getInstance();
format.setRoundingMode(RoundingMode.HALF_UP);
format.setMinimumFractionDigits(0);
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/marklogic/developer/corb/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,12 @@ public class Options {
@Usage(description = "Default is '/'.")
public static final String MODULE_ROOT = "MODULE-ROOT";

/**
* Default is 10. Max number of recent tps (transaction per second) values used to calculate ETC (estimated time to completion)
*/
@Usage(description = "Default is 10. Max number of recent tps values used to calculate ETC")
public static final String NUM_TPS_FOR_ETC="NUM_TPS_FOR_ETC";

/**
* A properties file containing any of the CoRB2 options. Relative and full
* file system paths are supported.
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/marklogic/developer/corb/TransformOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class TransformOptions {
private int diskQueueMaxInMemorySize = 1000;
private File diskQueueTempDir;
private boolean doInstall;

private int numTpsForETC = 10;

private boolean failOnError = true;

Expand Down Expand Up @@ -353,4 +355,14 @@ public void setDiskQueueTempDir(File directory) {
public File getDiskQueueTempDir() {
return this.diskQueueTempDir;
}

public void setNumTpsForETC(int numTpsForETC){
if(numTpsForETC > 0){
this.numTpsForETC = numTpsForETC;
}
}

public int getNumTpsForETC(){
return this.numTpsForETC;
}
}
47 changes: 25 additions & 22 deletions src/test/java/com/marklogic/developer/corb/MonitorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ public void testRun_whenPaused() {
* Test of calculateThreadsPerSecond method, of class Monitor.
*/
@Test
public void testCalculateThreadsPerSecond_3args() {
public void testCalculateTransactionsPerSecond_3args() {
long amountCompleted = 10L;
long previousMillis = 1000L;
long currentMillis = 2000L;
double expResult = Monitor.calculateThreadsPerSecond(amountCompleted, 0, currentMillis, previousMillis);
double result = Monitor.calculateThreadsPerSecond(amountCompleted, currentMillis, previousMillis);
double expResult = Monitor.calculateTransactionsPerSecond(amountCompleted, 0, currentMillis, previousMillis);
double result = Monitor.calculateTransactionsPerSecond(amountCompleted, currentMillis, previousMillis);
assertEquals(expResult, result, DOUBLE_DELTA);
}

Expand All @@ -84,7 +84,7 @@ public void testCalculateThreadsPerSecond_4args() {
long currentMillis = 2000L;
long previousMillis = 1000L;
double expResult = 100.0;
double result = Monitor.calculateThreadsPerSecond(amountCompleted, previouslyCompleted, currentMillis, previousMillis);
double result = Monitor.calculateTransactionsPerSecond(amountCompleted, previouslyCompleted, currentMillis, previousMillis);
assertEquals(expResult, result, DOUBLE_DELTA);
}

Expand All @@ -98,39 +98,42 @@ public void testCalculateThreadsPerSecond_fractional() {
long currentMillis = 3000L;
long previousMillis = 1000L;
double expResult = 0.5;
double result = Monitor.calculateThreadsPerSecond(amountCompleted, previouslyCompleted, currentMillis, previousMillis);
double result = Monitor.calculateTransactionsPerSecond(amountCompleted, previouslyCompleted, currentMillis, previousMillis);
assertEquals(expResult, result, DOUBLE_DELTA);
}

@Test
public void testGetProgressMessage() {
assertEquals("10/100, 4 tps(avg), 3 tps(cur), ETC 00:00:22, 2 active threads.", getProgressMessage(10, 100, 4, 3, 2));
assertEquals("10/100, 0.4 tps(avg), 3 tps(cur), ETC 00:03:45, 2 active threads.", getProgressMessage(10, 100, 0.4, 3, 2));
assertEquals("10/100, 0.49 tps(avg), 3 tps(cur), ETC 00:03:03, 2 active threads.", getProgressMessage(10, 100, 0.49, 3, 2));
assertEquals("10/100, 0.45 tps(avg), 3 tps(cur), ETC 00:03:20, 2 active threads.", getProgressMessage(10, 100, 0.449, 3, 2));
assertEquals("10/100, 0.04 tps(avg), 3 tps(cur), ETC 00:34:05, 2 active threads.", getProgressMessage(10, 100, 0.044, 3, 2));
assertEquals("10/100, 0 tps(avg), 3 tps(cur), ETC 06:15:00, 2 active threads.", getProgressMessage(10, 100, 0.004, 3, 2));
assertEquals("10/100, 4 tps(avg), 3 tps(cur), ETC 00:00:11, 2 active threads.", getProgressMessage(10, 100, 4, 3, 8, 2, false));
assertEquals("10/100, 0.4 tps(avg), 3 tps(cur), ETC 00:07:30 (paused), 2 active threads.", getProgressMessage(10, 100, 0.4, 3, 0.2, 2, true));
assertEquals("10/100, 0.49 tps(avg), 3 tps(cur), ETC 00:03:03, 2 active threads.", getProgressMessage(10, 100, 0.49, 3, 0.49, 2, false));
assertEquals("10/100, 0.45 tps(avg), 3 tps(cur), ETC 00:03:20, 2 active threads.", getProgressMessage(10, 100, 0.449, 3, 0.449, 2, false));
assertEquals("10/100, 0.04 tps(avg), 3 tps(cur), ETC 00:34:05, 2 active threads.", getProgressMessage(10, 100, 0.044, 3, 0.044, 2, false));
assertEquals("10/100, 0 tps(avg), 3 tps(cur), ETC 06:15:00 (paused), 2 active threads.", getProgressMessage(10, 100, 0.004, 3, 0.004, 2, true));
}

@Test
public void testGetEstimatedTimeCompletion_zero() {
assertEquals("00:00:-1", Monitor.getEstimatedTimeCompletion(100, 50, 0));
assertEquals("00:00:-1", Monitor.getEstimatedTimeCompletion(100, 50, 0, false));
assertEquals("00:00:-1 (paused)", Monitor.getEstimatedTimeCompletion(100, 50, 0, true));
}

@Test
public void testGetEstimatedTimeCompletion() {
String fourtyFiveSeconds = "00:00:45";
assertEquals("02:38:20", Monitor.getEstimatedTimeCompletion(100, 5, 0.01));
assertEquals("01:23:20", Monitor.getEstimatedTimeCompletion(100, 50, 0.01));
assertEquals("00:08:20", Monitor.getEstimatedTimeCompletion(100, 50, 0.1));
assertEquals("00:00:50", Monitor.getEstimatedTimeCompletion(100, 50, 1.0));
assertEquals(fourtyFiveSeconds, Monitor.getEstimatedTimeCompletion(100, 50, 1.1));
assertEquals(fourtyFiveSeconds, Monitor.getEstimatedTimeCompletion(100, 50, 1.111));
assertEquals("00:00:44", Monitor.getEstimatedTimeCompletion(100, 50, 1.12345));
assertEquals("00:00:01", Monitor.getEstimatedTimeCompletion(100, 50, 49));
assertEquals("00:00:00", Monitor.getEstimatedTimeCompletion(100, 50, 60));
assertEquals("02:38:20", Monitor.getEstimatedTimeCompletion(100, 5, 0.01, false));
assertEquals("01:23:20", Monitor.getEstimatedTimeCompletion(100, 50, 0.01, false));
assertEquals("00:08:20", Monitor.getEstimatedTimeCompletion(100, 50, 0.1, false));
assertEquals("00:00:50", Monitor.getEstimatedTimeCompletion(100, 50, 1.0, false));
assertEquals(fourtyFiveSeconds, Monitor.getEstimatedTimeCompletion(100, 50, 1.1, false));
assertEquals(fourtyFiveSeconds, Monitor.getEstimatedTimeCompletion(100, 50, 1.111, false));
assertEquals("00:00:44", Monitor.getEstimatedTimeCompletion(100, 50, 1.12345, false));
assertEquals("00:00:01", Monitor.getEstimatedTimeCompletion(100, 50, 49, false));
assertEquals("00:00:00", Monitor.getEstimatedTimeCompletion(100, 50, 60, false));

assertEquals("2777:38:20", Monitor.getEstimatedTimeCompletion(100000d, 5d, 0.01d));
assertEquals("2777:38:20", Monitor.getEstimatedTimeCompletion(100000d, 5d, 0.01d, false));

assertEquals("02:38:20 (paused)", Monitor.getEstimatedTimeCompletion(100, 5, 0.01, true));
}

@Test
Expand Down

0 comments on commit 657d46e

Please sign in to comment.