Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-23085: Network and Data related Actions #675

Merged
merged 4 commits into from
Nov 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ public class HBaseClusterManager extends Configured implements ClusterManager {
"timeout 30 /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo -u %6$s %5$s\"";
private String tunnelCmd;

/**
* The command format that is used to execute the remote command with sudo. Arguments:
* 1 SSH options, 2 user name , 3 "@" if username is set, 4 host,
* 5 original command, 6 timeout.
*/
private static final String DEFAULT_TUNNEL_SUDO_CMD =
"timeout %6$s /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo %5$s\"";
private String tunnelSudoCmd;

private static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts";
private static final int DEFAULT_RETRY_ATTEMPTS = 5;

Expand All @@ -86,6 +95,7 @@ public void setConf(Configuration conf) {
sshOptions = (sshOptions == null) ? "" : sshOptions;
sshUserName = (sshUserName == null) ? "" : sshUserName;
tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD);
tunnelSudoCmd = conf.get("hbase.it.clustermanager.ssh.sudo.cmd", DEFAULT_TUNNEL_SUDO_CMD);
// Print out ssh special config if any.
if ((sshUserName != null && sshUserName.length() > 0) ||
(sshOptions != null && sshOptions.length() > 0)) {
Expand Down Expand Up @@ -152,10 +162,32 @@ public String[] getExecString() {
LOG.info("Executing full command [" + cmd + "]");
return new String[] { "/usr/bin/env", "bash", "-c", cmd };
}
}

/**
* Executes commands over SSH
*/
protected class RemoteSudoShell extends Shell.ShellCommandExecutor {
joshelser marked this conversation as resolved.
Show resolved Hide resolved
private String hostname;

public RemoteSudoShell(String hostname, String[] execString, long timeout) {
this(hostname, execString, null, null, timeout);
}

public RemoteSudoShell(String hostname, String[] execString, File dir, Map<String, String> env,
long timeout) {
super(execString, dir, env, timeout);
this.hostname = hostname;
}

@Override
public void execute() throws IOException {
super.execute();
public String[] getExecString() {
String at = sshUserName.isEmpty() ? "" : "@";
String remoteCmd = StringUtils.join(super.getExecString(), " ");
String cmd = String.format(tunnelSudoCmd, sshOptions, sshUserName, at, hostname, remoteCmd,
timeOutInterval/1000f);
LOG.info("Executing full command [" + cmd + "]");
return new String[] { "/usr/bin/env", "bash", "-c", cmd };
}
}

Expand Down Expand Up @@ -299,7 +331,8 @@ protected CommandProvider getCommandProvider(ServiceType service) throws IOExcep
*/
private Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
throws IOException {
LOG.info("Executing remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname);
LOG.info("Executing remote command: {} , hostname:{}", StringUtils.join(cmd, " "),
hostname);

RemoteShell shell = new RemoteShell(hostname, getServiceUser(service), cmd);
try {
Expand All @@ -312,8 +345,8 @@ private Pair<Integer, String> exec(String hostname, ServiceType service, String.
+ ", stdout: " + output);
}

LOG.info("Executed remote command, exit code:" + shell.getExitCode()
+ " , output:" + shell.getOutput());
LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(),
shell.getOutput());

return new Pair<>(shell.getExitCode(), shell.getOutput());
}
Expand All @@ -331,7 +364,52 @@ private Pair<Integer, String> execWithRetries(String hostname, ServiceType servi
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException ex) {
// ignore
LOG.warn("Sleep Interrupted:" + ex);
LOG.warn("Sleep Interrupted:", ex);
}
}
}

/**
* Execute the given command on the host using SSH
* @return pair of exit code and command output
* @throws IOException if something goes wrong.
*/
public Pair<Integer, String> execSudo(String hostname, long timeout, String... cmd)
throws IOException {
LOG.info("Executing remote command: {} , hostname:{}", StringUtils.join(cmd, " "),
hostname);

RemoteSudoShell shell = new RemoteSudoShell(hostname, cmd, timeout);
try {
shell.execute();
} catch (Shell.ExitCodeException ex) {
// capture the stdout of the process as well.
String output = shell.getOutput();
// add output for the ExitCodeException.
throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage()
+ ", stdout: " + output);
}

LOG.info("Executed remote command, exit code:{} , output:{}", shell.getExitCode(),
shell.getOutput());

return new Pair<>(shell.getExitCode(), shell.getOutput());
}

public Pair<Integer, String> execSudoWithRetries(String hostname, long timeout, String... cmd)
throws IOException {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
return execSudo(hostname, timeout, cmd);
} catch (IOException e) {
retryOrThrow(retryCounter, e, hostname, cmd);
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException ex) {
// ignore
LOG.warn("Sleep Interrupted:", ex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.chaos.actions;

import java.io.IOException;

import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Action that adds high cpu load to a random regionserver for a given duration
*/
public class AddCPULoadAction extends SudoCommandAction {
protected static final Logger LOG = LoggerFactory.getLogger(AddCPULoadAction.class);
private static final String CPU_LOAD_COMMAND =
"seq 1 %s | xargs -I{} -n 1 -P %s timeout %s dd if=/dev/urandom of=/dev/null bs=1M " +
"iflag=fullblock";
Comment on lines +34 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

%s is used but numbers are added to the String.format arguments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is intentional. %s uses Integer.toString, which is predictable while %d uses locale specific formatting that might change.


private final long duration;
private long processes;

/**
* Add high load to cpu
*
* @param duration Duration that this thread should generate the load for in milliseconds
* @param processes The number of parallel processes, should be equal to cpu threads for max load
*/
public AddCPULoadAction(long duration, long processes, long timeout) {
super(timeout);
this.duration = duration;
this.processes = processes;
}

protected void localPerform() throws IOException {
LOG.info("Starting to execute AddCPULoadAction");
ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers());
String hostname = server.getHostname();

try {
clusterManager.execSudo(hostname, timeout, getCommand());
} catch (IOException ex){
//This will always happen. We use timeout to kill a continously running process
//after the duration expires
Comment on lines +60 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add /bin/true at the end of the command line. We don't know at this point whether we got a network error for example or the script returned with error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@meszibalu As discussed for /bin/true to make a difference we should increase the outer timeout and that would not be much better so I will leave it as it is.

}
LOG.info("Finished to execute AddCPULoadAction");
}

private String getCommand(){
return String.format(CPU_LOAD_COMMAND, processes, processes, duration/1000f);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.chaos.actions;

import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Action corrupts HFiles with a certain chance.
*/
public class CorruptDataFilesAction extends Action {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat idea, but how could we tell via automation when a file was expectedly corrupted vs. unexpectedly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't as far as I'm aware. It's not clear to me what is the intended use of these tests, but they were requested by stack so I added them. They are so destructive I couldn't eve restart hbase after running them and had to delete every hbase related data from hdfs and zokeeper to be able to run hbase on the cluster again.
My best guess is to use them for active testing and run them in the background while monitoring hbase status/behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was worried with corrupting something critical like hbase:meta, a table descriptor, or something like that. I think corrupting a single hfile for a user-table is a more "reasonable" failure condition which wouldn't have long-lasting impact on the ability for HBase to keep working.

@saintstack that jive with what you were thinking or you have something else in mind?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolving this -- defaulting to $hbase.root/data/default is a nice compromise! Thanks for changing Szabolcs!

private static final Logger LOG = LoggerFactory.getLogger(CorruptDataFilesAction.class);
private float chance;

/**
* Corrupts HFiles with a certain chance
* @param chance chance to corrupt any give data file (0.5 => 50%)
*/
public CorruptDataFilesAction(float chance) {
this.chance = chance * 100;
}

@Override
public void perform() throws Exception {
LOG.info("Start corrupting data files");

FileSystem fs = CommonFSUtils.getRootDirFileSystem(getConf());
Path rootDir = CommonFSUtils.getRootDir(getConf());
Path defaultDir = rootDir.suffix("/data/default");
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(defaultDir, true);
while (iterator.hasNext()){
LocatedFileStatus status = iterator.next();
if(!HFile.isHFileFormat(fs, status.getPath())){
continue;
}
if(RandomUtils.nextFloat(0, 100) > chance){
continue;
}

FSDataOutputStream out = fs.create(status.getPath(), true);
try {
out.write(0);
} finally {
out.close();
}
LOG.info("Corrupting {}", status.getPath());
}
LOG.info("Done corrupting data files");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.chaos.actions;

import java.io.IOException;

import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* Corrupt network packages on a random regionserver.
*/
public class CorruptPackagesCommandAction extends TCCommandAction {
private static final Logger LOG = LoggerFactory.getLogger(CorruptPackagesCommandAction.class);
private float ratio;
private long duration;

/**
* Corrupt network packages on a random regionserver.
*
* @param ratio the ratio of packages corrupted
* @param duration the time this issue persists in milliseconds
* @param timeout the timeout for executing required commands on the region server in milliseconds
* @param network network interface the regionserver uses for communication
*/
public CorruptPackagesCommandAction(float ratio, long duration, long timeout, String network) {
joshelser marked this conversation as resolved.
Show resolved Hide resolved
super(timeout, network);
this.ratio = ratio;
this.duration = duration;
}

protected void localPerform() throws IOException {
LOG.info("Starting to execute CorruptPackagesCommandAction");
ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers());
String hostname = server.getHostname();

try {
clusterManager.execSudoWithRetries(hostname, timeout, getCommand(ADD));
Thread.sleep(duration);
} catch (InterruptedException e) {
LOG.debug("Failed to run the command for the full duration", e);
} finally {
clusterManager.execSudoWithRetries(hostname, timeout, getCommand(DELETE));
}

LOG.info("Finished to execute CorruptPackagesCommandAction");
}

private String getCommand(String operation){
return String.format("tc qdisc %s dev %s root netem corrupt %s%%", operation, network,
ratio * 100);
}
}
Loading