diff --git a/README.md b/README.md index 6acf2391..f1925a02 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,7 @@ Option | Description **DISK-QUEUE** | Boolean value indicating whether the CoRB job should spill to disk when a maximum number of URIs have been loaded in memory, in order to control memory consumption and avoid Out of Memory exceptions for extremely large sets of URIs. **DISK-QUEUE-MAX-IN-MEMORY-SIZE** | The maximum number of URIs to hold in memory before spilling over to disk. Default is 1000. **DISK-QUEUE-TEMP-DIR** | The directory where the URIs queue can write to disk when the maximum in-memory items has been exceeded. Default behavior is to use java.io.tmpdir. +**NUM_TPS_FOR_ETC** | Default is 10. Number of recent transactions per second (tps) values used to calculate estimated completion time (ETC). ### Alternate XCC connection configuration Option | Description diff --git a/src/main/java/com/marklogic/developer/corb/Manager.java b/src/main/java/com/marklogic/developer/corb/Manager.java index ee54fb8a..cca6783c 100644 --- a/src/main/java/com/marklogic/developer/corb/Manager.java +++ b/src/main/java/com/marklogic/developer/corb/Manager.java @@ -34,6 +34,7 @@ import static com.marklogic.developer.corb.Options.INSTALL; import static com.marklogic.developer.corb.Options.MODULES_DATABASE; import static com.marklogic.developer.corb.Options.MODULE_ROOT; +import static com.marklogic.developer.corb.Options.NUM_TPS_FOR_ETC; import static com.marklogic.developer.corb.Options.OPTIONS_FILE; import static com.marklogic.developer.corb.Options.POST_BATCH_MODULE; import static com.marklogic.developer.corb.Options.POST_BATCH_TASK; @@ -234,6 +235,8 @@ protected void initOptions(String[] args) throws ClassNotFoundException, Instant options.setUseDiskQueue(stringToBoolean(getOption(DISK_QUEUE))); String diskQueueMaxInMemorySize = getOption(DISK_QUEUE_MAX_IN_MEMORY_SIZE); String diskQueueTempDir = getOption(DISK_QUEUE_TEMP_DIR); + + String numTpsForETC = getOption(NUM_TPS_FOR_ETC); //Check legacy properties keys, for backwards compatability if (processModule == null) { @@ -275,6 +278,9 @@ protected void initOptions(String[] args) throws ClassNotFoundException, Instant if (diskQueueMaxInMemorySize != null) { options.setDiskQueueMaxInMemorySize(Integer.parseInt(diskQueueMaxInMemorySize)); } + if (numTpsForETC != null) { + options.setNumTpsForETC(Integer.parseInt(numTpsForETC)); + } if (!this.properties.containsKey(EXPORT_FILE_DIR) && exportFileDir != null) { this.properties.put(EXPORT_FILE_DIR, exportFileDir); } @@ -800,6 +806,10 @@ public void pause() { pool.pause(); } } + + public boolean isPaused(){ + return pool != null && pool.isPaused(); + } /** * Resume pool execution (if paused). diff --git a/src/main/java/com/marklogic/developer/corb/Monitor.java b/src/main/java/com/marklogic/developer/corb/Monitor.java index e52be96e..7b5fc22e 100644 --- a/src/main/java/com/marklogic/developer/corb/Monitor.java +++ b/src/main/java/com/marklogic/developer/corb/Monitor.java @@ -22,6 +22,7 @@ import java.math.RoundingMode; import java.text.DecimalFormat; import java.text.NumberFormat; +import java.util.ArrayList; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -39,6 +40,8 @@ public class Monitor implements Runnable { protected static final Logger LOG = Logger.getLogger(Monitor.class.getName()); + + protected static final int DEFAULT_NUM_TPS_FOR_ETC = 10; private final CompletionService cs; private long lastProgress = 0; @@ -51,6 +54,9 @@ public class Monitor implements Runnable { protected long completed = 0; private long prevCompleted = 0; private long prevMillis = 0; + + private final ArrayList tpsForETCList; + private final int numTpsForEtc; /** * @param pool @@ -61,6 +67,9 @@ public Monitor(PausableThreadPoolExecutor pool, CompletionService cs, this.pool = pool; this.cs = cs; this.manager = manager; + + this.numTpsForEtc = manager.getOptions() != null ? manager.getOptions().getNumTpsForETC() : DEFAULT_NUM_TPS_FOR_ETC; + this.tpsForETCList = new ArrayList(this.numTpsForEtc); } /* @@ -154,27 +163,50 @@ public long getCompletedCount() { private String getProgressMessage(long completed) { long curMillis = System.currentTimeMillis(); - double tps = calculateThreadsPerSecond(completed, curMillis, startMillis); + double tps = calculateTransactionsPerSecond(completed, curMillis, startMillis); double curTps = tps; if (prevMillis > 0) { - curTps = calculateThreadsPerSecond(completed, prevCompleted, curMillis, prevMillis); + curTps = calculateTransactionsPerSecond(completed, prevCompleted, curMillis, prevMillis); } prevCompleted = completed; prevMillis = curMillis; - - return getProgressMessage(completed, taskCount, tps, curTps, pool.getActiveCount()); + + boolean isPaused = manager.isPaused(); + double tpsForETC = calculateTpsForETC(curTps,isPaused); + return getProgressMessage(completed, taskCount, tps, curTps, tpsForETC, pool.getActiveCount(), isPaused); + } + + protected double calculateTpsForETC(double curTps, boolean isPaused){ + if(curTps == 0 && isPaused){ + this.tpsForETCList.clear(); + }else { + if(this.tpsForETCList.size() >= this.numTpsForEtc){ + this.tpsForETCList.remove(0); + } + this.tpsForETCList.add(curTps); + } + + double tpsForETC = 0; + double sum = 0; + for(Double next:this.tpsForETCList){ + sum += next; + } + if(this.tpsForETCList.size() > 0){ + tpsForETC = sum / this.tpsForETCList.size(); + } + return tpsForETC; } - protected static double calculateThreadsPerSecond(long amountCompleted, long currentMillis, long previousMillis) { - return calculateThreadsPerSecond(amountCompleted, 0, currentMillis, previousMillis); + static protected double calculateTransactionsPerSecond(long amountCompleted, long currentMillis, long previousMillis) { + return calculateTransactionsPerSecond(amountCompleted, 0, currentMillis, previousMillis); } - protected static double calculateThreadsPerSecond(long amountCompleted, long previouslyCompleted, long currentMillis, long previousMillis) { + static protected double calculateTransactionsPerSecond(long amountCompleted, long previouslyCompleted, long currentMillis, long previousMillis) { return (amountCompleted - previouslyCompleted) * 1000d / (currentMillis - previousMillis); } - protected static String getProgressMessage(long completed, long taskCount, double tps, double curTps, int threads) { - String etc = getEstimatedTimeCompletion(taskCount, completed, tps); + static protected String getProgressMessage(long completed, long taskCount, double tps, double curTps, double tpsForETC, int threads, boolean isPaused) { + String etc = getEstimatedTimeCompletion(taskCount, completed, tpsForETC, isPaused); return completed + "/" + taskCount + ", " + formatTransactionsPerSecond(tps) + " tps(avg), " + formatTransactionsPerSecond(curTps) + " tps(cur), " + @@ -182,12 +214,13 @@ protected static String getProgressMessage(long completed, long taskCount, doubl threads + " active threads."; } - protected static String getEstimatedTimeCompletion(double taskCount, double completed, double tps) { - double ets = (tps != 0) ? (taskCount - completed) / tps : -1; + static protected String getEstimatedTimeCompletion(double taskCount, double completed, double tpsForETC, boolean isPaused) { + double ets = (tpsForETC != 0) ? (taskCount - completed) / tpsForETC : -1; int hours = (int) ets / 3600; int minutes = (int) (ets % 3600) / 60; int seconds = (int) ets % 60; - return String.format("%02d:%02d:%02d", hours, minutes, seconds); + return String.format("%02d:%02d:%02d", hours, minutes, seconds) + + (isPaused ? " (paused)":""); } /** * Returns a string representation of the number. @@ -195,7 +228,7 @@ protected static String getEstimatedTimeCompletion(double taskCount, double comp * @param n * @return */ - protected static String formatTransactionsPerSecond(Number n) { + static protected String formatTransactionsPerSecond(Number n) { NumberFormat format = DecimalFormat.getInstance(); format.setRoundingMode(RoundingMode.HALF_UP); format.setMinimumFractionDigits(0); diff --git a/src/main/java/com/marklogic/developer/corb/Options.java b/src/main/java/com/marklogic/developer/corb/Options.java index 42f6043f..55034dde 100644 --- a/src/main/java/com/marklogic/developer/corb/Options.java +++ b/src/main/java/com/marklogic/developer/corb/Options.java @@ -410,6 +410,12 @@ public class Options { @Usage(description = "Default is '/'.") public static final String MODULE_ROOT = "MODULE-ROOT"; + /** + * Default is 10. Max number of recent tps (transaction per second) values used to calculate ETC (estimated time to completion) + */ + @Usage(description = "Default is 10. Max number of recent tps values used to calculate ETC") + public static final String NUM_TPS_FOR_ETC="NUM_TPS_FOR_ETC"; + /** * A properties file containing any of the CoRB2 options. Relative and full * file system paths are supported. diff --git a/src/main/java/com/marklogic/developer/corb/TransformOptions.java b/src/main/java/com/marklogic/developer/corb/TransformOptions.java index f349fbfa..f069c73e 100644 --- a/src/main/java/com/marklogic/developer/corb/TransformOptions.java +++ b/src/main/java/com/marklogic/developer/corb/TransformOptions.java @@ -63,6 +63,8 @@ public class TransformOptions { private int diskQueueMaxInMemorySize = 1000; private File diskQueueTempDir; private boolean doInstall; + + private int numTpsForETC = 10; private boolean failOnError = true; @@ -353,4 +355,14 @@ public void setDiskQueueTempDir(File directory) { public File getDiskQueueTempDir() { return this.diskQueueTempDir; } + + public void setNumTpsForETC(int numTpsForETC){ + if(numTpsForETC > 0){ + this.numTpsForETC = numTpsForETC; + } + } + + public int getNumTpsForETC(){ + return this.numTpsForETC; + } } diff --git a/src/test/java/com/marklogic/developer/corb/MonitorTest.java b/src/test/java/com/marklogic/developer/corb/MonitorTest.java index 1954f31a..4947c058 100644 --- a/src/test/java/com/marklogic/developer/corb/MonitorTest.java +++ b/src/test/java/com/marklogic/developer/corb/MonitorTest.java @@ -65,12 +65,12 @@ public void testRun_whenPaused() { * Test of calculateThreadsPerSecond method, of class Monitor. */ @Test - public void testCalculateThreadsPerSecond_3args() { + public void testCalculateTransactionsPerSecond_3args() { long amountCompleted = 10L; long previousMillis = 1000L; long currentMillis = 2000L; - double expResult = Monitor.calculateThreadsPerSecond(amountCompleted, 0, currentMillis, previousMillis); - double result = Monitor.calculateThreadsPerSecond(amountCompleted, currentMillis, previousMillis); + double expResult = Monitor.calculateTransactionsPerSecond(amountCompleted, 0, currentMillis, previousMillis); + double result = Monitor.calculateTransactionsPerSecond(amountCompleted, currentMillis, previousMillis); assertEquals(expResult, result, DOUBLE_DELTA); } @@ -84,7 +84,7 @@ public void testCalculateThreadsPerSecond_4args() { long currentMillis = 2000L; long previousMillis = 1000L; double expResult = 100.0; - double result = Monitor.calculateThreadsPerSecond(amountCompleted, previouslyCompleted, currentMillis, previousMillis); + double result = Monitor.calculateTransactionsPerSecond(amountCompleted, previouslyCompleted, currentMillis, previousMillis); assertEquals(expResult, result, DOUBLE_DELTA); } @@ -98,39 +98,42 @@ public void testCalculateThreadsPerSecond_fractional() { long currentMillis = 3000L; long previousMillis = 1000L; double expResult = 0.5; - double result = Monitor.calculateThreadsPerSecond(amountCompleted, previouslyCompleted, currentMillis, previousMillis); + double result = Monitor.calculateTransactionsPerSecond(amountCompleted, previouslyCompleted, currentMillis, previousMillis); assertEquals(expResult, result, DOUBLE_DELTA); } @Test public void testGetProgressMessage() { - assertEquals("10/100, 4 tps(avg), 3 tps(cur), ETC 00:00:22, 2 active threads.", getProgressMessage(10, 100, 4, 3, 2)); - assertEquals("10/100, 0.4 tps(avg), 3 tps(cur), ETC 00:03:45, 2 active threads.", getProgressMessage(10, 100, 0.4, 3, 2)); - assertEquals("10/100, 0.49 tps(avg), 3 tps(cur), ETC 00:03:03, 2 active threads.", getProgressMessage(10, 100, 0.49, 3, 2)); - assertEquals("10/100, 0.45 tps(avg), 3 tps(cur), ETC 00:03:20, 2 active threads.", getProgressMessage(10, 100, 0.449, 3, 2)); - assertEquals("10/100, 0.04 tps(avg), 3 tps(cur), ETC 00:34:05, 2 active threads.", getProgressMessage(10, 100, 0.044, 3, 2)); - assertEquals("10/100, 0 tps(avg), 3 tps(cur), ETC 06:15:00, 2 active threads.", getProgressMessage(10, 100, 0.004, 3, 2)); + assertEquals("10/100, 4 tps(avg), 3 tps(cur), ETC 00:00:11, 2 active threads.", getProgressMessage(10, 100, 4, 3, 8, 2, false)); + assertEquals("10/100, 0.4 tps(avg), 3 tps(cur), ETC 00:07:30 (paused), 2 active threads.", getProgressMessage(10, 100, 0.4, 3, 0.2, 2, true)); + assertEquals("10/100, 0.49 tps(avg), 3 tps(cur), ETC 00:03:03, 2 active threads.", getProgressMessage(10, 100, 0.49, 3, 0.49, 2, false)); + assertEquals("10/100, 0.45 tps(avg), 3 tps(cur), ETC 00:03:20, 2 active threads.", getProgressMessage(10, 100, 0.449, 3, 0.449, 2, false)); + assertEquals("10/100, 0.04 tps(avg), 3 tps(cur), ETC 00:34:05, 2 active threads.", getProgressMessage(10, 100, 0.044, 3, 0.044, 2, false)); + assertEquals("10/100, 0 tps(avg), 3 tps(cur), ETC 06:15:00 (paused), 2 active threads.", getProgressMessage(10, 100, 0.004, 3, 0.004, 2, true)); } @Test public void testGetEstimatedTimeCompletion_zero() { - assertEquals("00:00:-1", Monitor.getEstimatedTimeCompletion(100, 50, 0)); + assertEquals("00:00:-1", Monitor.getEstimatedTimeCompletion(100, 50, 0, false)); + assertEquals("00:00:-1 (paused)", Monitor.getEstimatedTimeCompletion(100, 50, 0, true)); } @Test public void testGetEstimatedTimeCompletion() { String fourtyFiveSeconds = "00:00:45"; - assertEquals("02:38:20", Monitor.getEstimatedTimeCompletion(100, 5, 0.01)); - assertEquals("01:23:20", Monitor.getEstimatedTimeCompletion(100, 50, 0.01)); - assertEquals("00:08:20", Monitor.getEstimatedTimeCompletion(100, 50, 0.1)); - assertEquals("00:00:50", Monitor.getEstimatedTimeCompletion(100, 50, 1.0)); - assertEquals(fourtyFiveSeconds, Monitor.getEstimatedTimeCompletion(100, 50, 1.1)); - assertEquals(fourtyFiveSeconds, Monitor.getEstimatedTimeCompletion(100, 50, 1.111)); - assertEquals("00:00:44", Monitor.getEstimatedTimeCompletion(100, 50, 1.12345)); - assertEquals("00:00:01", Monitor.getEstimatedTimeCompletion(100, 50, 49)); - assertEquals("00:00:00", Monitor.getEstimatedTimeCompletion(100, 50, 60)); + assertEquals("02:38:20", Monitor.getEstimatedTimeCompletion(100, 5, 0.01, false)); + assertEquals("01:23:20", Monitor.getEstimatedTimeCompletion(100, 50, 0.01, false)); + assertEquals("00:08:20", Monitor.getEstimatedTimeCompletion(100, 50, 0.1, false)); + assertEquals("00:00:50", Monitor.getEstimatedTimeCompletion(100, 50, 1.0, false)); + assertEquals(fourtyFiveSeconds, Monitor.getEstimatedTimeCompletion(100, 50, 1.1, false)); + assertEquals(fourtyFiveSeconds, Monitor.getEstimatedTimeCompletion(100, 50, 1.111, false)); + assertEquals("00:00:44", Monitor.getEstimatedTimeCompletion(100, 50, 1.12345, false)); + assertEquals("00:00:01", Monitor.getEstimatedTimeCompletion(100, 50, 49, false)); + assertEquals("00:00:00", Monitor.getEstimatedTimeCompletion(100, 50, 60, false)); - assertEquals("2777:38:20", Monitor.getEstimatedTimeCompletion(100000d, 5d, 0.01d)); + assertEquals("2777:38:20", Monitor.getEstimatedTimeCompletion(100000d, 5d, 0.01d, false)); + + assertEquals("02:38:20 (paused)", Monitor.getEstimatedTimeCompletion(100, 5, 0.01, true)); } @Test