Skip to content

Commit

Permalink
Closes #14. Proxy in workload job to use enable permission checking. …
Browse files Browse the repository at this point in the history
…(PR #18)
  • Loading branch information
xkrogen authored Apr 5, 2018
1 parent 8f73360 commit aadf7c2
Show file tree
Hide file tree
Showing 14 changed files with 168 additions and 40 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ The other supporter format is `com.linkedin.dynamometer.workloadgenerator.audit.
files in the format produced by a Hive query with output fields, in order:

* `relativeTimestamp`: event time offset, in milliseconds, from the start of the trace
* `ugi`: user information of the submitting user
* `command`: name of the command, e.g. 'open'
* `source`: source path
* `dest`: destination path
Expand All @@ -134,7 +135,7 @@ files in the format produced by a Hive query with output fields, in order:
Assuming your audit logs are available in Hive, this can be produced via a Hive query looking like:
```sql
INSERT OVERWRITE DIRECTORY '${outputPath}'
SELECT (timestamp - ${startTimestamp} AS relativeTimestamp, command, source, dest, sourceIP
SELECT (timestamp - ${startTimestamp} AS relativeTimestamp, ugi, command, source, dest, sourceIP
FROM '${auditLogTableLocation}'
WHERE timestamp >= ${startTimestamp} AND timestamp < ${endTimestamp}
DISTRIBUTE BY src
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.dynamometer;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ImpersonationProvider;


/**
* An {@link ImpersonationProvider} that indiscriminately allows all users
* to proxy as any other user.
*/
public class AllowAllImpersonationProvider extends Configured implements ImpersonationProvider {

public void init(String configurationPrefix) {
// Do nothing
}
public void authorize(UserGroupInformation user, String remoteAddress) {
// Do nothing
}

}
5 changes: 3 additions & 2 deletions dynamometer-infra/src/main/resources/start-component.sh
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ EOF
ln -snf "`pwd`/VERSION" "$nameDir/current/VERSION"
chmod 700 "$nameDir/current/"*

# To be able to use the custom block placement policy
# To be able to use the custom block placement policy and the AllowAllImpersonationProvider
export HADOOP_CLASSPATH="`pwd`/dynamometer.jar:$HADOOP_CLASSPATH"

read -r -d '' namenodeConfigs <<EOF
Expand All @@ -266,9 +266,10 @@ EOF
-D dfs.namenode.kerberos.principal=
-D dfs.namenode.keytab.file=
-D dfs.namenode.safemode.threshold-pct=0.0f
-D dfs.permissions.enabled=false
-D dfs.permissions.enabled=true
-D dfs.cluster.administrators="*"
-D dfs.block.replicator.classname=com.linkedin.dynamometer.BlockPlacementPolicyAlwaysSatisfied
-D hadoop.security.impersonation.provider.class=com.linkedin.dynamometer.AllowAllImpersonationProvider
${configOverrides}
EOF

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,8 @@ public Boolean get() {
fail("Workload job failed");
}
Counters counters = client.getWorkloadJob().getCounters();
assertEquals(3, counters.findCounter(AuditReplayMapper.REPLAYCOUNTERS.TOTALCOMMANDS).getValue());
assertEquals(0, counters.findCounter(AuditReplayMapper.REPLAYCOUNTERS.TOTALINVALIDCOMMANDS).getValue());
assertEquals(6, counters.findCounter(AuditReplayMapper.REPLAYCOUNTERS.TOTALCOMMANDS).getValue());
assertEquals(1, counters.findCounter(AuditReplayMapper.REPLAYCOUNTERS.TOTALINVALIDCOMMANDS).getValue());

LOG.info("Waiting for infra application to exit");
GenericTestUtils.waitFor(new Supplier<Boolean>() {
Expand Down
13 changes: 13 additions & 0 deletions dynamometer-infra/src/test/resources/conf/etc/hadoop/core-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?xml version='1.0' encoding='UTF-8'?>
<!--
Copyright 2017 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
See LICENSE in the project root for license information.
-->
<configuration>
<property>
<name>hadoop.security.impersonation.provider.class</name>
<value>com.linkedin.dynamometer.AllowAllImpersonationProvider</value>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class AuditLogDirectParser implements AuditCommandParser {
private static final Pattern MESSAGE_ONLY_PATTERN = Pattern.compile("^([0-9-]+ [0-9:,]+) [^:]+: (.+)$");
private static final Splitter.MapSplitter AUDIT_SPLITTER =
Splitter.on("\t").trimResults().omitEmptyStrings().withKeyValueSeparator("=");
private static final Splitter SPACE_SPLITTER = Splitter.on(" ").trimResults().omitEmptyStrings();
private static final DateFormat AUDIT_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
static {
AUDIT_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
Expand Down Expand Up @@ -63,6 +64,8 @@ public AuditReplayCommand parse(Text inputLine, Function<Long, Long> relativeToA
String auditMessageSanitized = m.group(2).replace("(options=", "(options:");
Map<String, String> parameterMap = AUDIT_SPLITTER.split(auditMessageSanitized);
return new AuditReplayCommand(relativeToAbsolute.apply(relativeTimestamp),
// Split the UGI on space to remove the auth and proxy portions of it
SPACE_SPLITTER.split(parameterMap.get("ugi")).iterator().next(),
parameterMap.get("cmd").replace("(options:", "(options="),
parameterMap.get("src"), parameterMap.get("dst"), parameterMap.get("ip"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
* which fields should be separated by the start-of-heading (U+0001) character.
* The fields available should be, in order:
* <pre>
* relativeTimestampMs,command,src,dest,sourceIP
* relativeTimestampMs,ugi,command,src,dest,sourceIP
* </pre>
* Where relativeTimestampMs represents the time elapsed between the start of
* the audit log and the occurrence of the audit event. Assuming your audit
* logs are available in Hive, this can be generated with a query looking like:
* <pre>
* INSERT OVERWRITE DIRECTORY '${outputPath}'
* SELECT (timestamp - ${startTimestamp} AS relativeTimestamp, cmd, src, dst, ip
* SELECT (timestamp - ${startTimestamp} AS relativeTimestamp, ugi, cmd, src, dst, ip
* FROM '${auditLogTableLocation}'
* WHERE timestamp >= ${startTimestamp} AND timestamp < ${endTimestamp}
* DISTRIBUTE BY src
Expand All @@ -45,7 +45,7 @@ public void initialize(Configuration conf) throws IOException {
public AuditReplayCommand parse(Text inputLine, Function<Long, Long> relativeToAbsolute) throws IOException {
String[] fields = inputLine.toString().split(FIELD_SEPARATOR);
long absoluteTimestamp = relativeToAbsolute.apply(Long.parseLong(fields[0]));
return new AuditReplayCommand(absoluteTimestamp, fields[1], fields[2], fields[3], fields[4]);
return new AuditReplayCommand(absoluteTimestamp, fields[1], fields[2], fields[3], fields[4], fields[5]);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@
*/
package com.linkedin.dynamometer.workloadgenerator.audit;

import java.io.IOException;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -19,14 +25,19 @@
*/
class AuditReplayCommand implements Delayed {

private static final Logger LOG = LoggerFactory.getLogger(AuditReplayCommand.class);
private static final Pattern SIMPLE_UGI_PATTERN = Pattern.compile("([^/@ ]*).*?");

private long absoluteTimestamp;
private String ugi;
private String command;
private String src;
private String dest;
private String sourceIP;

AuditReplayCommand(long absoluteTimestamp, String command, String src, String dest, String sourceIP) {
AuditReplayCommand(long absoluteTimestamp, String ugi, String command, String src, String dest, String sourceIP) {
this.absoluteTimestamp = absoluteTimestamp;
this.ugi = ugi;
this.command = command;
this.src = src;
this.dest = dest;
Expand All @@ -37,6 +48,20 @@ long getAbsoluteTimestamp() {
return absoluteTimestamp;
}

String getSimpleUgi() {
Matcher m = SIMPLE_UGI_PATTERN.matcher(ugi);
if (m.matches()) {
return m.group(1);
} else {
LOG.error("Error parsing simple UGI <{}>; falling back to current user", ugi);
try {
return UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException ioe) {
return "";
}
}
}

String getCommand() {
return command;
}
Expand Down Expand Up @@ -79,7 +104,7 @@ boolean isPoison() {
private static class PoisonPillCommand extends AuditReplayCommand {

private PoisonPillCommand(long absoluteTimestamp) {
super(absoluteTimestamp, null, null, null, null);
super(absoluteTimestamp, null, null, null, null, null);
}

@Override
Expand All @@ -99,13 +124,13 @@ public boolean equals(Object other) {
return false;
}
AuditReplayCommand o = (AuditReplayCommand) other;
return absoluteTimestamp == o.absoluteTimestamp && command.equals(o.command) && src.equals(o.src) &&
dest.equals(o.dest) && sourceIP.equals(o.sourceIP);
return absoluteTimestamp == o.absoluteTimestamp && ugi.equals(o.ugi) && command.equals(o.command) &&
src.equals(o.src) && dest.equals(o.dest) && sourceIP.equals(o.sourceIP);
}

@Override
public String toString() {
return String.format("AuditReplayCommand(absoluteTimestamp=%d, command=%s, src=%s, dest=%s, sourceIP=%s",
absoluteTimestamp, command, src, dest, sourceIP);
return String.format("AuditReplayCommand(absoluteTimestamp=%d, ugi=%s, command=%s, src=%s, dest=%s, sourceIP=%s",
absoluteTimestamp, ugi, command, src, dest, sourceIP);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
Expand Down Expand Up @@ -187,9 +190,10 @@ public Long apply(Long input) {
LOG.info("Starting " + numThreads + " threads");

threads = new ArrayList<>();
ConcurrentMap<String, FileSystem> fsCache = new ConcurrentHashMap<>();
commandQueue = new DelayQueue<>();
for (int i = 0; i < numThreads; i++) {
AuditReplayThread thread = new AuditReplayThread(context, commandQueue);
AuditReplayThread thread = new AuditReplayThread(context, commandQueue, fsCache);
threads.add(thread);
thread.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import com.linkedin.dynamometer.workloadgenerator.WorkloadDriver;
import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.Map;
Expand All @@ -21,7 +23,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.mapreduce.Counter;
Expand Down Expand Up @@ -49,28 +50,30 @@ public class AuditReplayThread extends Thread {
private static final Log LOG = LogFactory.getLog(AuditReplayThread.class);

private DelayQueue<AuditReplayCommand> commandQueue;
private ConcurrentMap<String, FileSystem> fsCache;
private URI namenodeUri;
private UserGroupInformation loginUser;
private Configuration mapperConf;
// If any exception is encountered it will be stored here
private Exception exception;
private long startTimestampMs;
private FileSystem fs;
private DFSClient dfsClient;
private boolean createBlocks;

// Counters are not thread-safe so we store a local mapping in our thread
// and merge them all together at the end.
private Map<REPLAYCOUNTERS, Counter> replayCountersMap = new HashMap<>();
private Map<String, Counter> individualCommandsMap = new HashMap<>();

AuditReplayThread(Mapper.Context mapperContext, DelayQueue<AuditReplayCommand> queue)
throws IOException {
AuditReplayThread(Mapper.Context mapperContext, DelayQueue<AuditReplayCommand> queue,
ConcurrentMap<String, FileSystem> fsCache) throws IOException {
commandQueue = queue;
Configuration mapperConf = mapperContext.getConfiguration();
String namenodeURI = mapperConf.get(WorkloadDriver.NN_URI);
this.fsCache = fsCache;
loginUser = UserGroupInformation.getLoginUser();
mapperConf = mapperContext.getConfiguration();
namenodeUri = URI.create(mapperConf.get(WorkloadDriver.NN_URI));
startTimestampMs = mapperConf.getLong(WorkloadDriver.START_TIMESTAMP_MS, -1);
createBlocks = mapperConf.getBoolean(AuditReplayMapper.CREATE_BLOCKS_KEY,
AuditReplayMapper.CREATE_BLOCKS_DEFAULT);
fs = FileSystem.get(URI.create(namenodeURI), mapperConf);
dfsClient = ((DistributedFileSystem) fs).getClient();
LOG.info("Start timestamp: " + startTimestampMs);
for (REPLAYCOUNTERS rc : REPLAYCOUNTERS.values()) {
replayCountersMap.put(rc, new GenericCounter());
Expand Down Expand Up @@ -134,7 +137,7 @@ public void run() {
replayCountersMap.get(REPLAYCOUNTERS.LATECOMMANDS).increment(1);
replayCountersMap.get(REPLAYCOUNTERS.LATECOMMANDSTOTALTIME).increment(-1 * delay);
}
if (!replayLog(cmd.getCommand(), cmd.getSrc(), cmd.getDest())) {
if (!replayLog(cmd)) {
replayCountersMap.get(REPLAYCOUNTERS.TOTALINVALIDCOMMANDS).increment(1);
}
cmd = commandQueue.take();
Expand All @@ -149,15 +152,33 @@ public void run() {

/**
* Attempt to replay the provided command. Updates counters accordingly.
* @param command The name of the command to replay.
* @param src The source path of the command.
* @param dst The destination path of the command (null except for rename and concat).
* @param command The command to replay
* @return True iff the command was successfully replayed (i.e., no exceptions were thrown).
*/
private boolean replayLog(String command, String src, String dst) {
private boolean replayLog(final AuditReplayCommand command) {
final String src = command.getSrc();
final String dst = command.getDest();
FileSystem proxyFs = fsCache.get(command.getSimpleUgi());
if (proxyFs == null) {
UserGroupInformation ugi = UserGroupInformation.createProxyUser(command.getSimpleUgi(), loginUser);
proxyFs = ugi.doAs(new PrivilegedAction<FileSystem>() {
@Override
public FileSystem run() {
try {
FileSystem fs = new DistributedFileSystem();
fs.initialize(namenodeUri, mapperConf);
return fs;
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
});
fsCache.put(command.getSimpleUgi(), proxyFs);
}
final FileSystem fs = proxyFs;
ReplayCommand replayCommand;
try {
replayCommand = ReplayCommand.valueOf(command.split(" ")[0].toUpperCase());
replayCommand = ReplayCommand.valueOf(command.getCommand().split(" ")[0].toUpperCase());
} catch (IllegalArgumentException iae) {
LOG.warn("Unsupported/invalid command: " + command);
replayCountersMap.get(REPLAYCOUNTERS.TOTALUNSUPPORTEDCOMMANDS).increment(1);
Expand Down Expand Up @@ -191,7 +212,7 @@ private boolean replayLog(String command, String src, String dst) {
break;

case LISTSTATUS:
dfsClient.listPaths(src, HdfsFileStatus.EMPTY_NAME);
((DistributedFileSystem) fs).getClient().listPaths(src, HdfsFileStatus.EMPTY_NAME);
break;

case APPEND:
Expand Down
Loading

0 comments on commit aadf7c2

Please sign in to comment.