From 83bf38e8a2a7c640d3519b275fb9971ff0fca42a Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Fri, 29 Sep 2017 15:11:20 -0700 Subject: [PATCH] Closes #32. Add the ability to specify workload-job-only configurations. --- .../java/com/linkedin/dynamometer/Client.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java index b2bd610e41..0b468bd443 100644 --- a/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java +++ b/dynamometer-infra/src/main/java/com/linkedin/dynamometer/Client.java @@ -7,6 +7,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Optional; +import com.google.common.base.Splitter; import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper; @@ -138,6 +139,7 @@ public class Client extends Configured implements Tool { public static final String WORKLOAD_START_DELAY_ARG = "workload_start_delay"; public static final String WORKLOAD_RATE_FACTOR_ARG = "workload_rate_factor"; public static final String WORKLOAD_RATE_FACTOR_DEFAULT = "1.0"; + public static final String WORKLOAD_CONFIG_ARG = "workload_config"; private static final String START_SCRIPT_LOCATION = Client.class.getClassLoader().getResource(DynoConstants.START_SCRIPT.getResourcePath()).toString(); @@ -193,6 +195,7 @@ public class Client extends Configured implements Tool { // The startup delay for the workload job. private long workloadStartDelayMs; private double workloadRateFactor = 0.0; + private Map workloadExtraConfigs; // Start time for client private final long clientStartTime = System.currentTimeMillis(); @@ -287,6 +290,9 @@ public Client(String appMasterJar) { opts.addOption(WORKLOAD_RATE_FACTOR_ARG, true, "Rate factor (multiplicative speed factor) to apply to workload replay (Default " + WORKLOAD_RATE_FACTOR_DEFAULT + ")"); + opts.addOption(WORKLOAD_CONFIG_ARG, true, "Additional configurations to pass only to the workload job. " + + "This can be used multiple times and should be specified as a key=value pair, e.g. '-" + + WORKLOAD_CONFIG_ARG + " conf.one=val1 -" + WORKLOAD_CONFIG_ARG + " conf.two=val2'"); } /** @@ -388,6 +394,13 @@ public boolean accept(Path path) { String.valueOf(AuditReplayMapper.NUM_THREADS_DEFAULT))); workloadRateFactor = Double.parseDouble(cliParser.getOptionValue(WORKLOAD_RATE_FACTOR_ARG, WORKLOAD_RATE_FACTOR_DEFAULT)); + workloadExtraConfigs = new HashMap<>(); + if (cliParser.getOptionValues(WORKLOAD_CONFIG_ARG) != null) { + for (String opt : cliParser.getOptionValues(WORKLOAD_CONFIG_ARG)) { + List kvPair = Splitter.on("=").trimResults().splitToList(opt); + workloadExtraConfigs.put(kvPair.get(0), kvPair.get(1)); + } + } String delayString = cliParser.getOptionValue(WORKLOAD_START_DELAY_ARG, WorkloadDriver.START_TIME_OFFSET_DEFAULT); // Store a temporary config to leverage Configuration's time duration parsing. getConf().set("___temp___", delayString); @@ -838,6 +851,9 @@ private void launchAndMonitorWorkloadDriver(Properties nameNodeProperties) { workloadConf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath); workloadConf.setInt(AuditReplayMapper.NUM_THREADS_KEY, workloadThreadsPerMapper); workloadConf.setDouble(AuditReplayMapper.RATE_FACTOR_KEY, workloadRateFactor); + for (Map.Entry configPair : workloadExtraConfigs.entrySet()) { + workloadConf.set(configPair.getKey(), configPair.getValue()); + } workloadJob = WorkloadDriver.getJobForSubmission(workloadConf, nameNodeURI.toString(), workloadStartTime, AuditReplayMapper.class); workloadJob.submit();