Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JENKINS-50597] Network behavior tuning III #41

Merged
merged 8 commits into from
Jun 1, 2018
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.jenkins-ci.plugins</groupId>
<artifactId>plugin</artifactId>
<version>3.11</version>
<version>3.12</version>
<relativePath />
</parent>
<groupId>io.jenkins.plugins</groupId>
Expand Down Expand Up @@ -191,7 +191,7 @@
<dependency>
<groupId>org.jenkins-ci.plugins.workflow</groupId>
<artifactId>workflow-step-api</artifactId>
<version>2.15</version>
<version>2.16-rc310.04f07c15faaf</version> <!-- TODO https://github.com/jenkinsci/workflow-step-api-plugin/pull/37 -->
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.jenkins.plugins.artifact_manager_jclouds;

import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.AttemptTimeLimiters;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -55,26 +56,30 @@
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Predicate;
import com.google.common.util.concurrent.UncheckedTimeoutException;
import hudson.AbortException;
import hudson.EnvVars;
import hudson.FilePath;
import hudson.Launcher;
import hudson.Util;
import hudson.model.BuildListener;
import hudson.model.Computer;
import hudson.model.Run;
import hudson.model.TaskListener;
import hudson.remoting.VirtualChannel;
import hudson.slaves.WorkspaceList;
import hudson.util.DirScanner;
import hudson.util.io.ArchiverFactory;
import io.jenkins.plugins.artifact_manager_jclouds.BlobStoreProvider.HttpMethod;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import jenkins.MasterToSlaveFileCallable;
import jenkins.model.ArtifactManager;
import jenkins.util.JenkinsJVM;
import jenkins.util.VirtualFile;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.HttpResponseException;
import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.NoExternalUse;

Expand Down Expand Up @@ -138,6 +143,33 @@ public void archive(FilePath workspace, Launcher launcher, BuildListener listene
listener.getLogger().printf("Uploaded %s artifact(s) to %s%n", artifactUrls.size(), provider.toURI(provider.getContainer(), getBlobPath("artifacts/")));
}

private static class UploadToBlobStorage extends MasterToSlaveFileCallable<Void> {
private static final long serialVersionUID = 1L;

private final Map<String, URL> artifactUrls; // e.g. "target/x.war", "http://..."
private final TaskListener listener;
// Bind when constructed on the master side; on the agent side, deserialize those values.
private final int stopAfterAttemptNumber = UPLOAD_STOP_AFTER_ATTEMPT_NUMBER;
private final long waitMultiplier = UPLOAD_WAIT_MULTIPLIER;
private final long waitMaximum = UPLOAD_WAIT_MAXIMUM;
private final long timeout = UPLOAD_TIMEOUT;

UploadToBlobStorage(Map<String, URL> artifactUrls, TaskListener listener) {
this.artifactUrls = artifactUrls;
this.listener = listener;
}

@Override
public Void invoke(File f, VirtualChannel channel) throws IOException, InterruptedException {
for (Map.Entry<String, URL> entry : artifactUrls.entrySet()) {
Path local = f.toPath().resolve(entry.getKey());
URL url = entry.getValue();
uploadFile(local, url, listener, stopAfterAttemptNumber, waitMultiplier, waitMaximum, timeout);
}
return null;
}
}

@Override
public boolean delete() throws IOException, InterruptedException {
String blobPath = getBlobPath("");
Expand Down Expand Up @@ -197,6 +229,10 @@ private static final class Stash extends MasterToSlaveFileCallable<Integer> {
private final boolean useDefaultExcludes;
private final String tempDir;
private final TaskListener listener;
private final int stopAfterAttemptNumber = UPLOAD_STOP_AFTER_ATTEMPT_NUMBER;
private final long waitMultiplier = UPLOAD_WAIT_MULTIPLIER;
private final long waitMaximum = UPLOAD_WAIT_MAXIMUM;
private final long timeout = UPLOAD_TIMEOUT;

Stash(URL url, String includes, String excludes, boolean useDefaultExcludes, String tempDir, TaskListener listener) throws IOException {
this.url = url;
Expand All @@ -222,7 +258,7 @@ public Integer invoke(File f, VirtualChannel channel) throws IOException, Interr
throw new IOException(e);
}
if (count > 0) {
uploadFile(tmp, url, listener);
uploadFile(tmp, url, listener, stopAfterAttemptNumber, waitMultiplier, waitMaximum, timeout);
}
return count;
} finally {
Expand Down Expand Up @@ -323,55 +359,62 @@ private BlobStoreContext getContext() throws IOException {
return provider.getContext();
}

private static class UploadToBlobStorage extends MasterToSlaveFileCallable<Void> {
private static final long serialVersionUID = 1L;

private final Map<String, URL> artifactUrls; // e.g. "target/x.war", "http://..."
private final TaskListener listener;

UploadToBlobStorage(Map<String, URL> artifactUrls, TaskListener listener) {
this.artifactUrls = artifactUrls;
this.listener = listener;
}

@Override
public Void invoke(File f, VirtualChannel channel) throws IOException, InterruptedException {
for (Map.Entry<String, URL> entry : artifactUrls.entrySet()) {
Path local = f.toPath().resolve(entry.getKey());
URL url = entry.getValue();
uploadFile(local, url, listener);
}
return null;
private static final class HTTPAbortException extends AbortException {
final int code;
HTTPAbortException(int code, String message) {
super(message);
this.code = code;
}
}

/**
* Number of upload attempts of nonfatal errors before giving up.
*/
static int UPLOAD_STOP_AFTER_ATTEMPT_NUMBER = Integer.getInteger(JCloudsArtifactManager.class.getName() + ".UPLOAD_STOP_AFTER_ATTEMPT_NUMBER", 10);
/**
* Initial number of milliseconds between first and second upload attempts.
* Subsequent ones increase exponentially.
* Note that this is not a <em>randomized</em> exponential backoff;
* and the base of the exponent is hard-coded to 2.
*/
static long UPLOAD_WAIT_MULTIPLIER = Long.getLong(JCloudsArtifactManager.class.getName() + ".UPLOAD_WAIT_MULTIPLIER", 100);
/**
* Maximum number of seconds between upload attempts.
*/
static long UPLOAD_WAIT_MAXIMUM = Long.getLong(JCloudsArtifactManager.class.getName() + ".UPLOAD_WAIT_MAXIMUM", 300);
/**
* Number of seconds to permit a single upload attempt to take.
*/
static long UPLOAD_TIMEOUT = Long.getLong(JCloudsArtifactManager.class.getName() + ".UPLOAD_TIMEOUT", /* 15m */15 * 60);

private static final ExecutorService executors = JenkinsJVM.isJenkinsJVM() ? Computer.threadPoolForRemoting : Executors.newCachedThreadPool();

/**
* Upload a file to a URL
*/
@SuppressWarnings("Convert2Lambda") // bogus use of generics (type variable should have been on class); cannot be made into a lambda
private static void uploadFile(Path f, URL url, final TaskListener listener) throws IOException {
private static void uploadFile(Path f, URL url, final TaskListener listener, int stopAfterAttemptNumber, long waitMultiplier, long waitMaximum, long timeout) throws IOException, InterruptedException {
String urlSafe = url.toString().replaceFirst("[?].+$", "?…");
try {
Predicate<Throwable> nonfatal = x -> x instanceof IOException && (!(x instanceof HttpResponseException) || ((HttpResponseException) x).getStatusCode() >= 500);
AtomicReference<Throwable> lastError = new AtomicReference<>();
RetryerBuilder.<Void>newBuilder().
retryIfException(nonfatal).
retryIfException(x -> x instanceof IOException && (!(x instanceof HTTPAbortException) || ((HTTPAbortException) x).code >= 500) || x instanceof UncheckedTimeoutException).
withRetryListener(new RetryListener() {
@Override
public <Void> void onRetry(Attempt<Void> attempt) {
if (attempt.hasException()) {
Throwable t = attempt.getExceptionCause();
if (nonfatal.apply(t)) {
listener.getLogger().println("Retrying upload after: " + t);
}
lastError.set(attempt.getExceptionCause());
}
}
}).
// TODO all scalars configurable via system property
withStopStrategy(StopStrategies.stopAfterAttempt(10)).
// Note that this is not a _randomized_ exponential backoff; and the base of the exponent is hard-coded to 2.
withWaitStrategy(WaitStrategies.exponentialWait(100, 5, TimeUnit.MINUTES)).
// TODO withAttemptTimeLimiter(…).
withStopStrategy(StopStrategies.stopAfterAttempt(stopAfterAttemptNumber)).
withWaitStrategy(WaitStrategies.exponentialWait(waitMultiplier, waitMaximum, TimeUnit.SECONDS)).
withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(timeout, TimeUnit.SECONDS, executors)).
build().call(() -> {
Throwable t = lastError.get();
if (t != null) {
listener.getLogger().println("Retrying upload after: " + (t instanceof AbortException ? t.getMessage() : t.toString()));
}
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setDoOutput(true);
connection.setRequestMethod("PUT");
Expand All @@ -385,7 +428,7 @@ public <Void> void onRetry(Attempt<Void> attempt) {
try (InputStream err = connection.getErrorStream()) {
diag = err != null ? IOUtils.toString(err, connection.getContentEncoding()) : null;
}
throw new HttpResponseException(responseCode, String.format("Failed to upload %s to %s, response: %d %s, body: %s", f.toAbsolutePath(), urlSafe, responseCode, connection.getResponseMessage(), diag));
throw new HTTPAbortException(responseCode, String.format("Failed to upload %s to %s, response: %d %s, body: %s", f.toAbsolutePath(), urlSafe, responseCode, connection.getResponseMessage(), diag));
}
return null;
});
Expand All @@ -395,6 +438,8 @@ public <Void> void onRetry(Attempt<Void> attempt) {
throw (IOException) x2;
} else if (x2 instanceof RuntimeException) {
throw (RuntimeException) x2;
} else if (x2 instanceof InterruptedException) {
throw (InterruptedException) x2;
} else { // Error?
throw new RuntimeException(x);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,16 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.http.ConnectionClosedException;
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpVersion;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.bootstrap.HttpServer;
import org.apache.http.impl.bootstrap.ServerBootstrap;
import org.apache.http.message.BasicStatusLine;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpRequestHandler;
import org.jclouds.ContextBuilder;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
Expand Down Expand Up @@ -72,17 +71,17 @@ public String getPrefix() {
public String getContainer() {
return "container";
}
private static final Map<String, Integer> fails = new ConcurrentHashMap<>();

private static final Map<String, HttpRequestHandler> specialHandlers = new ConcurrentHashMap<>();

/**
* Requests that the <em>next</em> HTTP access to a particular presigned URL should fail with a 4xx/5xx error.
* Requests that the <em>next</em> HTTP access to a particular presigned URL should behave specially.
* @param method upload or download
* @param key the blob’s {@link StorageMetadata#getName}
* @param code the status code, or 0 to just make the request fail without sending a proper response
* @param handler what to do instead
*/
static void failIn(HttpMethod method, String key, int code) {
fails.put(method + ":" + key, code);
static void speciallyHandle(HttpMethod method, String key, HttpRequestHandler handler) {
specialHandlers.put(method + ":" + key, handler);
}

@Override
Expand All @@ -98,13 +97,9 @@ public synchronized BlobStoreContext getContext() throws IOException {
}
String container = m.group(1);
String key = m.group(2);
Integer failure = fails.remove(method + ":" + key);
if (failure != null) {
if (failure == 0) {
throw new IllegalStateException("Refusing to even send a status code for " + container + ":" + key);
}
response.setStatusLine(new BasicStatusLine(HttpVersion.HTTP_1_0, failure, "simulated " + failure + " failure"));
response.setEntity(new StringEntity("Detailed explanation of " + failure + "."));
HttpRequestHandler specialHandler = specialHandlers.remove(method + ":" + key);
if (specialHandler != null) {
specialHandler.handle(request, response, _context);
return;
}
BlobStore blobStore = context.getBlobStore();
Expand Down Expand Up @@ -136,7 +131,13 @@ public synchronized BlobStoreContext getContext() throws IOException {
}
}
}).
setExceptionLogger(x -> LOGGER.log(Level.INFO, "error thrown in HTTP service", x)).
setExceptionLogger(x -> {
if (x instanceof ConnectionClosedException) {
LOGGER.info(x.toString());
} else {
LOGGER.log(Level.INFO, "error thrown in HTTP service", x);
}
}).
create();
server.start();
baseURL = new URL("http://" + server.getInetAddress().getHostName() + ":" + server.getLocalPort() + "/");
Expand Down
Loading