Skip to content

Commit

Permalink
Closes linkedin#32. Add the ability to specify workload-job-only conf…
Browse files Browse the repository at this point in the history
…igurations.
  • Loading branch information
xkrogen committed Apr 13, 2018
1 parent 6506a13 commit 83bf38e
Showing 1 changed file with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.linkedin.dynamometer.workloadgenerator.audit.AuditReplayMapper;
Expand Down Expand Up @@ -138,6 +139,7 @@ public class Client extends Configured implements Tool {
public static final String WORKLOAD_START_DELAY_ARG = "workload_start_delay";
public static final String WORKLOAD_RATE_FACTOR_ARG = "workload_rate_factor";
public static final String WORKLOAD_RATE_FACTOR_DEFAULT = "1.0";
public static final String WORKLOAD_CONFIG_ARG = "workload_config";

private static final String START_SCRIPT_LOCATION =
Client.class.getClassLoader().getResource(DynoConstants.START_SCRIPT.getResourcePath()).toString();
Expand Down Expand Up @@ -193,6 +195,7 @@ public class Client extends Configured implements Tool {
// The startup delay for the workload job.
private long workloadStartDelayMs;
private double workloadRateFactor = 0.0;
private Map<String, String> workloadExtraConfigs;

// Start time for client
private final long clientStartTime = System.currentTimeMillis();
Expand Down Expand Up @@ -287,6 +290,9 @@ public Client(String appMasterJar) {
opts.addOption(WORKLOAD_RATE_FACTOR_ARG, true,
"Rate factor (multiplicative speed factor) to apply to workload replay (Default " +
WORKLOAD_RATE_FACTOR_DEFAULT + ")");
opts.addOption(WORKLOAD_CONFIG_ARG, true, "Additional configurations to pass only to the workload job. " +
"This can be used multiple times and should be specified as a key=value pair, e.g. '-" +
WORKLOAD_CONFIG_ARG + " conf.one=val1 -" + WORKLOAD_CONFIG_ARG + " conf.two=val2'");
}

/**
Expand Down Expand Up @@ -388,6 +394,13 @@ public boolean accept(Path path) {
String.valueOf(AuditReplayMapper.NUM_THREADS_DEFAULT)));
workloadRateFactor = Double.parseDouble(cliParser.getOptionValue(WORKLOAD_RATE_FACTOR_ARG,
WORKLOAD_RATE_FACTOR_DEFAULT));
workloadExtraConfigs = new HashMap<>();
if (cliParser.getOptionValues(WORKLOAD_CONFIG_ARG) != null) {
for (String opt : cliParser.getOptionValues(WORKLOAD_CONFIG_ARG)) {
List<String> kvPair = Splitter.on("=").trimResults().splitToList(opt);
workloadExtraConfigs.put(kvPair.get(0), kvPair.get(1));
}
}
String delayString = cliParser.getOptionValue(WORKLOAD_START_DELAY_ARG, WorkloadDriver.START_TIME_OFFSET_DEFAULT);
// Store a temporary config to leverage Configuration's time duration parsing.
getConf().set("___temp___", delayString);
Expand Down Expand Up @@ -838,6 +851,9 @@ private void launchAndMonitorWorkloadDriver(Properties nameNodeProperties) {
workloadConf.set(AuditReplayMapper.INPUT_PATH_KEY, workloadInputPath);
workloadConf.setInt(AuditReplayMapper.NUM_THREADS_KEY, workloadThreadsPerMapper);
workloadConf.setDouble(AuditReplayMapper.RATE_FACTOR_KEY, workloadRateFactor);
for (Map.Entry<String, String> configPair : workloadExtraConfigs.entrySet()) {
workloadConf.set(configPair.getKey(), configPair.getValue());
}
workloadJob = WorkloadDriver.getJobForSubmission(workloadConf, nameNodeURI.toString(),
workloadStartTime, AuditReplayMapper.class);
workloadJob.submit();
Expand Down

0 comments on commit 83bf38e

Please sign in to comment.