Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent unneeded exec operations #239

Merged
merged 9 commits into from
Jan 29, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.io.PrintStream;
Expand All @@ -28,6 +29,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -39,6 +41,7 @@

import hudson.EnvVars;
import hudson.FilePath;
import edu.umd.cs.findbugs.annotations.CheckForNull;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
import org.apache.commons.io.output.TeeOutputStream;
Expand Down Expand Up @@ -76,38 +79,51 @@ public class ContainerExecDecorator extends LauncherDecorator implements Seriali

private transient KubernetesClient client;


@SuppressFBWarnings(value = "SE_TRANSIENT_FIELD_NOT_RESTORED", justification = "not needed on deserialization")
private transient List<Closeable> closables;
@SuppressFBWarnings(value = "SE_TRANSIENT_FIELD_NOT_RESTORED", justification = "not needed on deserialization")
private transient Map<Integer, ContainerExecProc> processes = new HashMap<Integer, ContainerExecProc>();

private final String podName;
private final String namespace;
private final String containerName;
private final EnvironmentExpander environmentExpander;

public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, String namespace, EnvironmentExpander environmentExpander) {
private final FilePath ws;

public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, String namespace, EnvironmentExpander environmentExpander, FilePath ws) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we keep binary backwards compatibility ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has been fixed

this.client = client;
this.podName = podName;
this.namespace = namespace;
this.containerName = containerName;
this.environmentExpander = environmentExpander;
this.ws = ws;
}

@Deprecated
public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, String namespace, EnvironmentExpander environmentExpander) {
this(client, podName, containerName, namespace, environmentExpander, null);
}

@Deprecated
public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, String namespace) {
this(client, podName, containerName, namespace, null);
this(client, podName, containerName, namespace, null, null);
}

@Deprecated
public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, AtomicBoolean alive, CountDownLatch started, CountDownLatch finished, String namespace) {
this(client, podName, containerName, namespace, null);
this(client, podName, containerName, namespace, null, null);
}

@Deprecated
public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, AtomicBoolean alive, CountDownLatch started, CountDownLatch finished) {
this(client, podName, containerName, null, null);
this(client, podName, containerName, (String) null, null, null);
}

@Deprecated
public ContainerExecDecorator(KubernetesClient client, String podName, String containerName, String path, AtomicBoolean alive, CountDownLatch started, CountDownLatch finished) {
this(client, podName, containerName, null, null);
this(client, podName, containerName, (String) null, null, null);
}

@Override
Expand All @@ -133,6 +149,80 @@ public Proc launch(ProcStarter starter) throws IOException {
}

private Proc doLaunch(boolean quiet, String [] cmdEnvs, OutputStream outputForCaller, FilePath pwd, String... commands) throws IOException {
if (processes == null) {
processes = new HashMap<>();
}
//check ifits the actual script or the ProcessLiveness check.
int p = readPidFromPsCommand(commands);
//if it is a liveness check, try to find the actual process to avoid doing multiple execs.
if (p == 9999) {
return new Proc() {
@Override
public boolean isAlive() throws IOException, InterruptedException {
return false;
}

@Override
public void kill() throws IOException, InterruptedException {

}

@Override
public int join() throws IOException, InterruptedException {
return 1;
}

@Override
public InputStream getStdout() {
return null;
}

@Override
public InputStream getStderr() {
return null;
}

@Override
public OutputStream getStdin() {
return null;
}
};
} else if (p > 0 && processes.containsKey(p)) {
LOGGER.log(Level.INFO, "Retrieved process from cache with pid:[ " + p +"].");
Proc proc = processes.get(p);
return new Proc() {

@Override
public boolean isAlive() throws IOException, InterruptedException {
return false;
}

@Override
public void kill() throws IOException, InterruptedException {
}

@Override
public int join() throws IOException, InterruptedException {
return proc.isAlive() ? 0 : -1;
}

@Override
public InputStream getStdout() {
return null;
}

@Override
public InputStream getStderr() {
return null;
}

@Override
public OutputStream getStdin() {
return null;
}
};
}

waitUntilContainerIsReady();

final CountDownLatch started = new CountDownLatch(1);
Expand All @@ -151,7 +241,7 @@ private Proc doLaunch(boolean quiet, String [] cmdEnvs, OutputStream outputForC
// we need to keep the last bytes in the stream to parse the exit code as it is printed there
// so we use a buffer
ExitCodeOutputStream exitCodeOutputStream = new ExitCodeOutputStream();
// send container output both to the job output and our buffer
// send container output to all 3 streams (pid, out, job).
stream = new TeeOutputStream(exitCodeOutputStream, stream);
// Send to proc caller as well if they sent one
if (outputForCaller != null) {
Expand All @@ -169,6 +259,7 @@ private Proc doLaunch(boolean quiet, String [] cmdEnvs, OutputStream outputForC
public void onOpen(Response response) {
alive.set(true);
started.countDown();
LOGGER.log(Level.FINEST, "onOpen : {0}", finished);
}

@Override
Expand Down Expand Up @@ -238,13 +329,17 @@ public void onClose(int i, String s) {

this.setupEnvironmentVariable(envVars, watch);
doExec(watch, printStream, commands);
ContainerExecProc proc = new ContainerExecProc(watch, alive, finished, exitCodeOutputStream::getExitCode);
if (closables == null) {
closables = new ArrayList<>();
}

int pid = readPidFromPidFile(commands);
LOGGER.log(Level.INFO, "Created process inside pod: ["+podName+"], container: ["+containerName+"] with pid:["+pid+"]");
ContainerExecProc proc = new ContainerExecProc(watch, alive, finished, exitCodeOutputStream::getExitCode);
processes.put(pid, proc);
closables.add(proc);
return proc;
} catch (InterruptedException ie) {
} catch (InterruptedException ie) {
throw new InterruptedIOException(ie.getMessage());
} catch (Exception e) {
closeWatch(watch);
Expand All @@ -266,8 +361,7 @@ public void kill(Map<String, String> modelEnvVars) throws IOException, Interrupt
getListener().getLogger().println("kill finished with exit code " + exitCode);
}

private void setupEnvironmentVariable(EnvVars vars, ExecWatch watch) throws IOException
{
private void setupEnvironmentVariable(EnvVars vars, ExecWatch watch) throws IOException {
for (Map.Entry<String, String> entry : vars.entrySet()) {
//Check that key is bash compliant.
if (entry.getKey().matches("[a-zA-Z_][a-zA-Z0-9_]*")) {
Expand Down Expand Up @@ -356,6 +450,51 @@ private static void doExec(ExecWatch watch, PrintStream out, String... statement
}
}

static int readPidFromPsCommand(String... commands) {
if (commands.length == 4 && "ps".equals(commands[0]) && "-o".equals(commands[1]) && commands[2].equals("pid=")) {
return Integer.parseInt(commands[3]);
}


if (commands.length == 4 && "ps".equals(commands[0]) && "-o".equals(commands[1]) && commands[2].startsWith("-pid")) {
return Integer.parseInt(commands[3]);
}
return -1;
}


private synchronized int readPidFromPidFile(String... commands) throws IOException, InterruptedException {
int pid = -1;
String pidFilePath = readPidFile(commands);
if (pidFilePath == null) {
return pid;
}
FilePath pidFile = ws.child(pidFilePath);
for (int w = 0; w < 10 && !pidFile.exists(); w++) {
try {
wait(1000);
} catch (InterruptedException e) {
break;
}
}
if (pidFile.exists()) {
try {
pid = Integer.parseInt(pidFile.readToString().trim());
} catch (NumberFormatException x) {
throw new IOException("corrupted content in " + pidFile + ": " + x, x);
}
}
return pid;
}

@CheckForNull
static String readPidFile(String... commands) {
if (commands.length >= 4 && "nohup".equals(commands[0]) && "sh".equals(commands[1]) && commands[2].equals("-c") && commands[3].startsWith("echo \\$\\$ >")) {
return commands[3].substring(13, commands[3].indexOf(";") - 1);
}
return null;
}

static String[] getCommands(Launcher.ProcStarter starter) {
List<String> allCommands = new ArrayList<String>();

Expand Down Expand Up @@ -416,7 +555,7 @@ public int getExitCode() {
int i = 1;
String s = new String(b.array(), StandardCharsets.UTF_8);
if (s.indexOf(EXIT_COMMAND_TXT) < 0) {
LOGGER.log(Level.WARNING, "Unable to find \"{0}\" in {1}", new Object[] { EXIT_COMMAND_TXT, s });
LOGGER.log(Level.WARNING, "Unable to find \"{0}\" in {1}", new Object[]{EXIT_COMMAND_TXT, s});
return i;
}
// parse the exitcode int printed after EXITCODE
Expand All @@ -426,7 +565,7 @@ public int getExitCode() {
i = Integer.parseInt(s);
} catch (NumberFormatException e) {
LOGGER.log(Level.WARNING, "Unable to parse exit code as integer: \"{0}\" {1} / {2}",
new Object[] { s, queue.toString(), Arrays.toString(b.array()) });
new Object[]{s, queue.toString(), Arrays.toString(b.array())});
}
return i;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import java.io.Closeable;
import java.util.logging.Level;
import java.util.logging.Logger;

import hudson.FilePath;
import org.jenkinsci.plugins.workflow.steps.AbstractStepExecutionImpl;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.jenkinsci.plugins.workflow.steps.BodyExecutionCallback;
import org.jenkinsci.plugins.workflow.steps.BodyInvoker;
Expand Down Expand Up @@ -57,7 +60,7 @@ public boolean start() throws Exception {
client = nodeContext.connectToCloud();

EnvironmentExpander env = getContext().get(EnvironmentExpander.class);
decorator = new ContainerExecDecorator(client, nodeContext.getPodName(), containerName, nodeContext.getNamespace(), env);
decorator = new ContainerExecDecorator(client, nodeContext.getPodName(), containerName, nodeContext.getNamespace(), env, getContext().get(FilePath.class));
getContext().newBodyInvoker()
.withContext(BodyInvoker
.mergeLauncherDecorators(getContext().get(LauncherDecorator.class), decorator))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ public void after() throws Exception {
deletePods(client, getLabels(this), false);
}

/**
* Test that multiple command execution in parallel works
* @throws Exception
*/
@Test(timeout = 10000)
public void testCommandExecution() throws Exception {
Thread[] t = new Thread[10];
Expand Down Expand Up @@ -212,7 +216,11 @@ private ProcReturn execCommand(boolean quiet, String... cmd) throws Exception {
.decorate(new DummyLauncher(new StreamTaskListener(new TeeOutputStream(out, System.out))), null);
ContainerExecProc proc = (ContainerExecProc) launcher
.launch(launcher.new ProcStarter().pwd("/tmp").cmds(cmd).quiet(quiet));
assertTrue(proc.isAlive());
// wait for proc to finish (shouldn't take long)
while (proc.isAlive()) {
Thread.sleep(100);
}
assertFalse("proc is alive", proc.isAlive());
int exitCode = proc.joinWithTimeout(10, TimeUnit.SECONDS, StreamTaskListener.fromStderr());
return new ProcReturn(proc, exitCode, out.toString());
}
Expand Down