From 5cd6c989c18dd900098a96889525bbcbf6bb762c Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Fri, 29 Apr 2022 12:30:13 -0400 Subject: [PATCH 1/2] fix #3856: removing the PodUploadWebSocketListener --- .../client/jdkhttp/JdkWebSocketImpl.java | 3 + .../kubernetes/client/dsl/ExecWatch.java | 28 +- .../kubernetes/client/http/WebSocket.java | 5 +- .../dsl/internal/ExecWebSocketListener.java | 112 +++++--- .../dsl/internal/PodOperationContext.java | 261 +++++++----------- .../internal/core/v1/PodOperationsImpl.java | 107 +------ .../dsl/internal/uploadable/PodUpload.java | 144 ++++------ .../PodUploadWebSocketListener.java | 131 --------- .../utils/internal/PodOperationUtil.java | 9 +- .../internal/ExecWebSocketListenerTest.java | 68 ++++- .../internal/uploadable/PodUploadTest.java | 59 ++-- .../PodUploadWebSocketListenerTest.java | 73 ----- 12 files changed, 375 insertions(+), 625 deletions(-) delete mode 100644 kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadWebSocketListener.java delete mode 100644 kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadWebSocketListenerTest.java diff --git a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkWebSocketImpl.java b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkWebSocketImpl.java index 299329c19b1..326ec21dd0d 100644 --- a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkWebSocketImpl.java +++ b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkWebSocketImpl.java @@ -27,6 +27,7 @@ import java.nio.channels.WritableByteChannel; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; class JdkWebSocketImpl implements WebSocket { @@ -177,6 +178,8 @@ private boolean asBoolean(CompletableFuture cf) { @Override public boolean sendClose(int code, String reason) { CompletableFuture cf = webSocket.sendClose(code, reason == null ? "Closing" : reason); + // matches the behavior of the okhttp implementation and will ensure input closure after 1 minute + cf.thenRunAsync(() -> webSocket.abort(), CompletableFuture.delayedExecutor(1, TimeUnit.MINUTES)); return asBoolean(cf); } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java index 23c89ca6b2c..07786daf42d 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java @@ -22,16 +22,40 @@ public interface ExecWatch extends Closeable { + /** + * Gets the {@link OutputStream} for stdIn if one is associated + * + * @return the stdIn stream + */ OutputStream getInput(); + /** + * Gets the {@link InputStream} for stdOut if one is associated + * + * @return the stdErr stream + */ InputStream getOutput(); + /** + * Gets the {@link InputStream} for stdErr if one is associated + * + * @return the stdErr stream + */ InputStream getError(); + /** + * Gets the {@link InputStream} associated with channel 3, which + * returns the final Status containing the exit code, which + * could indicate abnormal termination. + *

+ * See also {@link #exitCode()} + * + * @return the channel 3 stream + */ InputStream getErrorChannel(); /** - * Close the Watch. + * Gracefully close the Watch. */ @Override void close(); @@ -41,7 +65,7 @@ public interface ExecWatch extends Closeable { /** * Get a future that will be completed with the exit code. *

- * Will be -1 if the exit code can't be determined. + * Will be -1 if the exit code can't be determined from the status, or null if close is received before the exit code. *

* Can be used as an alternative to * {@link ExecListener#onFailure(Throwable, io.fabric8.kubernetes.client.dsl.ExecListener.Response)} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java index cac428d24f5..c67b7626282 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java @@ -73,7 +73,10 @@ interface Builder extends BasicBuilder { boolean send(ByteBuffer buffer); /** - * Send a close message + * Send a close message. If successful, the output side + * will then be closed. After a timeout the input side will + * automatically be shutdown if it isn't already shutdown by + * the remote side. * * @return true if the message was successfully enqueued. */ diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java index 90ab1f95ae7..36112462abd 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java @@ -46,6 +46,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -71,6 +72,8 @@ public class ExecWebSocketListener implements ExecWatch, AutoCloseable, WebSocke static final String REASON_NON_ZERO_EXIT_CODE = "NonZeroExitCode"; static final String STATUS_SUCCESS = "Success"; + private static final long MAX_QUEUE_SIZE = 16 * 1024 * 1024L; + private final class SimpleResponse implements Response { private final HttpResponse response; @@ -98,11 +101,13 @@ public String body() throws IOException { private final OutputStream err; private final OutputStream errChannel; - private final PipedOutputStream input; + private final OutputStream input; private final PipedInputStream output; private final PipedInputStream error; private final PipedInputStream errorChannel; + private final boolean forUpload; + private final AtomicReference webSocketRef = new AtomicReference<>(); private final ExecutorService executorService = Executors.newSingleThreadExecutor(); @@ -114,35 +119,36 @@ public String body() throws IOException { private final CompletableFuture exitCode = new CompletableFuture<>(); - private ObjectMapper objectMapper; - - public ExecWebSocketListener(InputStream in, OutputStream out, OutputStream err, OutputStream errChannel, - PipedOutputStream inputPipe, PipedInputStream outputPipe, PipedInputStream errorPipe, PipedInputStream errorChannelPipe, - ExecListener listener, Integer bufferSize) { - this.listener = listener; - this.in = inputStreamOrPipe(in, inputPipe, toClose, bufferSize); - this.out = outputStreamOrPipe(out, outputPipe, toClose); - this.err = outputStreamOrPipe(err, errorPipe, toClose); - this.errChannel = outputStreamOrPipe(errChannel, errorChannelPipe, toClose); - - this.input = inputPipe; + private ObjectMapper objectMapper = new ObjectMapper(); + + public ExecWebSocketListener(PodOperationContext context) { + this.listener = context.getExecListener(); + this.forUpload = context.isForUpload(); + PipedOutputStream inputPipe = context.getInPipe(); + PipedInputStream outputPipe = context.getOutPipe(); + PipedInputStream errorPipe = context.getErrPipe(); + PipedInputStream errorChannelPipe = context.getErrChannelPipe(); + this.in = inputStreamOrPipe(context.getIn(), inputPipe, toClose, context.getBufferSize()); + this.out = outputStreamOrPipe(context.getOut(), outputPipe, toClose); + this.err = outputStreamOrPipe(context.getErr(), errorPipe, toClose); + this.errChannel = outputStreamOrPipe(context.getErrChannel(), errorChannelPipe, toClose); + + if (inputPipe == null && in == null && forUpload) { + // if there's no explicit in, then we create an OutputStream that writes directly to send + this.input = InputStreamPumper.writableOutputStream(this::sendWithErrorChecking); + } else { + this.input = inputPipe; + } this.output = outputPipe; this.error = errorPipe; this.errorChannel = errorChannelPipe; - this.objectMapper = new ObjectMapper(); } @Override public void close() { - close(1000, "Closing..."); - } - - private void close(int code, String reason) { - if (!exitCode.isDone()) { - exitCode.completeExceptionally(new KubernetesClientException("Closed before exit code recieved")); - } - closeWebSocketOnce(code, reason); - onClosed(code, reason); + // simply sends a close, which will shut down the output + // it's expected that the server will respond with a close, but if not the input will be shutdown implicitly + closeWebSocketOnce(1000, "Closing..."); } /** @@ -176,8 +182,8 @@ private void closeWebSocketOnce(int code, String reason) { @Override public void onOpen(WebSocket webSocket) { try { - if (in instanceof PipedInputStream && input != null) { - input.connect((PipedInputStream) in); + if (in instanceof PipedInputStream && input instanceof PipedOutputStream) { + ((PipedOutputStream) input).connect((PipedInputStream) in); } if (out instanceof PipedOutputStream && output != null) { output.connect((PipedOutputStream) out); @@ -208,7 +214,7 @@ public void onOpen(WebSocket webSocket) { public void onError(WebSocket webSocket, Throwable t) { //If we already called onClosed() or onFailed() before, we need to abort. - if (closed.compareAndSet(false, true)) { + if (!closed.compareAndSet(false, true)) { //We are not going to notify the listener, sicne we've already called onClose(), so let's log a debug/warning. if (LOGGER.isWarnEnabled()) { LOGGER.warn("Received [{}], with message:[{}] after ExecWebSocketListener is closed, Ignoring.", @@ -224,7 +230,6 @@ public void onError(WebSocket webSocket, Throwable t) { } Status status = OperationSupport.createStatus(response); status.setMessage(t.getMessage()); - LOGGER.error("Exec Failure", t); cleanUpOnce(); } finally { try { @@ -234,6 +239,8 @@ public void onError(WebSocket webSocket, Throwable t) { execResponse = new SimpleResponse(response); } listener.onFailure(t, execResponse); + } else { + LOGGER.error("Exec Failure", t); } } finally { exitCode.completeExceptionally(t); @@ -256,6 +263,11 @@ public void onMessage(WebSocket webSocket, ByteBuffer bytes) { break; case 2: writeAndFlush(err, byteString); + if (forUpload) { + String stringValue = StandardCharsets.UTF_8.decode(bytes).toString(); + exitCode.completeExceptionally(new KubernetesClientException(stringValue)); + this.close(); + } break; case 3: handleExitStatus(bytes); @@ -304,10 +316,10 @@ private void writeAndFlush(OutputStream stream, ByteBuffer byteString) throws IO @Override public void onClose(WebSocket webSocket, int code, String reason) { - ExecWebSocketListener.this.close(code, reason); - } - - private void onClosed(int code, String reason) { + if (!exitCode.isDone()) { + exitCode.complete(null); + } + closeWebSocketOnce(code, reason); //If we already called onClosed() or onFailed() before, we need to abort. if (!closed.compareAndSet(false, true)) { return; @@ -360,12 +372,13 @@ public void resize(int cols, int rows) { private void send(byte[] bytes, int offset, int length, byte flag) { if (length > 0) { + waitForQueue(length); WebSocket ws = webSocketRef.get(); - if (ws != null) { - byte[] toSend = new byte[length + 1]; - toSend[0] = flag; - System.arraycopy(bytes, offset, toSend, 1, length); - ws.send(ByteBuffer.wrap(toSend)); + byte[] toSend = new byte[length + 1]; + toSend[0] = flag; + System.arraycopy(bytes, offset, toSend, 1, length); + if (!ws.send(ByteBuffer.wrap(toSend))) { + this.exitCode.completeExceptionally(new IOException("could not send")); } } } @@ -374,6 +387,12 @@ private void send(byte[] bytes, int offset, int length) { send(bytes, offset, length, (byte) 0); } + void sendWithErrorChecking(byte[] bytes, int offset, int length) { + checkError(); + send(bytes, offset, length); + checkError(); + } + private static InputStream inputStreamOrPipe(InputStream stream, PipedOutputStream out, Set toClose, Integer bufferSize) { if (stream != null) { @@ -426,4 +445,25 @@ public CompletableFuture exitCode() { return exitCode; } + final void waitForQueue(int length) { + try { + while (webSocketRef.get().queueSize() + length > MAX_QUEUE_SIZE && !Thread.interrupted()) { + checkError(); + Thread.sleep(50L); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + final void checkError() { + if (exitCode.isDone()) { + try { + exitCode.getNow(null); + } catch (CompletionException e) { + throw KubernetesClientException.launderThrowable(e.getCause()); + } + } + } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationContext.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationContext.java index 5059b54f4cc..f3f63ccb34a 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationContext.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationContext.java @@ -16,240 +16,179 @@ package io.fabric8.kubernetes.client.dsl.internal; import io.fabric8.kubernetes.client.dsl.ExecListener; +import io.fabric8.kubernetes.client.utils.URLUtils.URLBuilder; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; import java.io.InputStream; import java.io.OutputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; +@Builder(toBuilder = true) +@NoArgsConstructor +@AllArgsConstructor +@Getter public class PodOperationContext { - private String containerId; - private InputStream in; - private OutputStream out; - private OutputStream err; - private OutputStream errChannel; - - private PipedOutputStream inPipe; - private PipedInputStream outPipe; - private PipedInputStream errPipe; - private PipedInputStream errChannelPipe; - private boolean tty; - private boolean terminatedStatus; - private boolean timestamps; - private String sinceTimestamp; - private Integer sinceSeconds; - private Integer tailingLines; - private Integer logWaitTimeout; - private boolean prettyOutput; - private ExecListener execListener; - private Integer limitBytes; - private Integer bufferSize; - private String file; - private String dir; - - public PodOperationContext() { - } - - public PodOperationContext(String containerId, InputStream in, OutputStream out, OutputStream err, OutputStream errChannel, PipedOutputStream inPipe, PipedInputStream outPipe, PipedInputStream errPipe, PipedInputStream errChannelPipe, Boolean tty, Boolean terminatedStatus, Boolean timestampes, String sinceTimestamp, Integer sinceSeconds, Integer tailingLines, Boolean prettyOutput, Integer limitBytes, Integer bufferSize, ExecListener execListener, String file, String dir, Integer logWaitTimeout) { - this.containerId = containerId; - this.in = in; - this.out = out; - this.err = err; - this.errChannel = errChannel; - this.inPipe = inPipe; - this.outPipe = outPipe; - this.errPipe = errPipe; - this.errChannelPipe = errChannelPipe; - this.tty = tty; - this.terminatedStatus = terminatedStatus; - this.timestamps = timestampes; - this.sinceTimestamp = sinceTimestamp; - this.sinceSeconds = sinceSeconds; - this.tailingLines = tailingLines; - this.prettyOutput = prettyOutput; - this.execListener = execListener; - this.limitBytes = limitBytes; - this.bufferSize = bufferSize; - this.file = file; - this.dir = dir; - this.logWaitTimeout = logWaitTimeout; - } - - public String getContainerId() { - return containerId; - } - - public InputStream getIn() { - return in; - } - - public OutputStream getOut() { - return out; - } - - public OutputStream getErr() { - return err; - } - - public OutputStream getErrChannel() { - return errChannel; - } - - public PipedOutputStream getInPipe() { - return inPipe; - } - - public PipedInputStream getOutPipe() { - return outPipe; - } - - public PipedInputStream getErrPipe() { - return errPipe; - } - - public PipedInputStream getErrChannelPipe() { - return errChannelPipe; - } - - public boolean isTty() { - return tty; - } - - public boolean isTerminatedStatus() { - return terminatedStatus; - } - - public boolean isTimestamps() { - return timestamps; - } - - public String getSinceTimestamp() { - return sinceTimestamp; - } - - public Integer getSinceSeconds() { - return sinceSeconds; - } - - public Integer getTailingLines() { - return tailingLines; - } - - public boolean isPrettyOutput() { - return prettyOutput; - } - - public ExecListener getExecListener() { - return execListener; - } - - public Integer getLimitBytes() { - return limitBytes; - } - - public Integer getBufferSize() { - return bufferSize; - } - - public String getFile() { - return file; - } - - public String getDir() { - return dir; - } - - public Integer getLogWaitTimeout() { - return logWaitTimeout; - } + private String containerId; + private InputStream in; + private OutputStream out; + private OutputStream err; + private OutputStream errChannel; + + private PipedOutputStream inPipe; + private PipedInputStream outPipe; + private PipedInputStream errPipe; + private PipedInputStream errChannelPipe; + private boolean tty; + private boolean terminatedStatus; + private boolean timestamps; + private String sinceTimestamp; + private Integer sinceSeconds; + private Integer tailingLines; + private Integer logWaitTimeout; + private boolean prettyOutput; + private ExecListener execListener; + private Integer limitBytes; + private Integer bufferSize; + private String file; + private String dir; + private boolean forUpload; public PodOperationContext withContainerId(String containerId) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().containerId(containerId).build(); } public PodOperationContext withIn(InputStream in) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().in(in).build(); } public PodOperationContext withOut(OutputStream out) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().out(out).build(); } public PodOperationContext withErr(OutputStream err) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().err(err).build(); } public PodOperationContext withErrChannel(OutputStream errChannel) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().errChannel(errChannel).build(); } public PodOperationContext withInPipe(PipedOutputStream inPipe) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().inPipe(inPipe).build(); } public PodOperationContext withOutPipe(PipedInputStream outPipe) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().outPipe(outPipe).build(); } public PodOperationContext withErrPipe(PipedInputStream errPipe) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().errPipe(errPipe).build(); } public PodOperationContext withErrChannelPipe(PipedInputStream errChannelPipe) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().errChannelPipe(errChannelPipe).build(); } public PodOperationContext withTty(boolean tty) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().tty(tty).build(); } public PodOperationContext withTerminatedStatus(boolean terminatedStatus) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().terminatedStatus(terminatedStatus).build(); } public PodOperationContext withTimestamps(boolean timestamps) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().timestamps(timestamps).build(); } public PodOperationContext withSinceTimestamp(String sinceTimestamp) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().sinceTimestamp(sinceTimestamp).build(); } public PodOperationContext withSinceSeconds(Integer sinceSeconds) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().sinceSeconds(sinceSeconds).build(); } public PodOperationContext withTailingLines(Integer tailingLines) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().tailingLines(tailingLines).build(); } public PodOperationContext withPrettyOutput(boolean prettyOutput) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().prettyOutput(prettyOutput).build(); } public PodOperationContext withExecListener(ExecListener execListener) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().execListener(execListener).build(); } public PodOperationContext withLimitBytes(Integer limitBytes) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().limitBytes(limitBytes).build(); } public PodOperationContext withBufferSize(Integer bufferSize) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().bufferSize(bufferSize).build(); } public PodOperationContext withFile(String file) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().file(file).build(); } public PodOperationContext withDir(String dir) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().dir(dir).build(); } public PodOperationContext withLogWaitTimeout(Integer logWaitTimeout) { - return new PodOperationContext(containerId, in, out, err, errChannel, inPipe, outPipe, errPipe,errChannelPipe, tty, terminatedStatus, timestamps, sinceTimestamp, sinceSeconds, tailingLines, prettyOutput, limitBytes, bufferSize, execListener, file, dir, logWaitTimeout); + return this.toBuilder().logWaitTimeout(logWaitTimeout).build(); + } + + public String getLogParameters() { + StringBuilder sb = new StringBuilder(); + sb.append("log?pretty=").append(prettyOutput); + + if (containerId != null && !containerId.isEmpty()) { + sb.append("&container=").append(containerId); + } + if (terminatedStatus) { + sb.append("&previous=true"); + } + if (sinceSeconds != null) { + sb.append("&sinceSeconds=").append(sinceSeconds); + } else if (sinceTimestamp != null) { + sb.append("&sinceTime=").append(sinceTimestamp); + } + if (tailingLines != null) { + sb.append("&tailLines=").append(tailingLines); + } + if (limitBytes != null) { + sb.append("&limitBytes=").append(limitBytes); + } + if (timestamps) { + sb.append("×tamps=true"); + } + return sb.toString(); + } + + public void addQueryParameters(URLBuilder httpUrlBuilder) { + if (containerId != null && !containerId.isEmpty()) { + httpUrlBuilder.addQueryParameter("container", containerId); + } + if (tty) { + httpUrlBuilder.addQueryParameter("tty", "true"); + } + if (in != null || inPipe != null || forUpload) { + httpUrlBuilder.addQueryParameter("stdin", "true"); + } + if (out != null || outPipe != null) { + httpUrlBuilder.addQueryParameter("stdout", "true"); + } + if (err != null || errPipe != null || forUpload) { + httpUrlBuilder.addQueryParameter("stderr", "true"); + } } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java index 1f13ad20acf..a14eba60d98 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java @@ -26,7 +26,6 @@ import io.fabric8.kubernetes.client.LocalPortForward; import io.fabric8.kubernetes.client.PortForward; import io.fabric8.kubernetes.client.dsl.BytesLimitTerminateTimeTailPrettyLoggable; -import io.fabric8.kubernetes.client.dsl.ContainerResource; import io.fabric8.kubernetes.client.dsl.CopyOrReadable; import io.fabric8.kubernetes.client.dsl.ExecListenable; import io.fabric8.kubernetes.client.dsl.ExecListener; @@ -91,26 +90,6 @@ public class PodOperationsImpl extends HasMetadataOperation T doGetLog(Class type) { try { - URL url = new URL(URLUtils.join(getResourceUrl().toString(), getLogParameters())); + URL url = new URL(URLUtils.join(getResourceUrl().toString(), podOperationContext.getLogParameters())); return handleRawGet(url, type); } catch (IOException ioException) { throw KubernetesClientException.launderThrowable(forOperationType("doGetLog"), ioException); @@ -217,7 +150,7 @@ public LogWatch watchLog(OutputStream out) { PodOperationUtil.waitUntilReadyBeforeFetchingLogs(this, getContext().getLogWaitTimeout() != null ? getContext().getLogWaitTimeout() : DEFAULT_POD_LOG_WAIT_TIMEOUT); // Issue Pod Logs HTTP request - URL url = new URL(URLUtils.join(getResourceUrl().toString(), getLogParameters() + "&follow=true")); + URL url = new URL(URLUtils.join(getResourceUrl().toString(), getContext().getLogParameters() + "&follow=true")); final LogWatchCallback callback = new LogWatchCallback(out); return callback.callAndWait(httpClient, url); } catch (IOException ioException) { @@ -311,7 +244,7 @@ private boolean handleEvict(HasMetadata eviction) { } @Override - public ContainerResource inContainer( + public PodOperationsImpl inContainer( String containerId) { return new PodOperationsImpl(getContext().withContainerId(containerId), context); } @@ -322,8 +255,7 @@ public ExecWatch exec(String... command) { try { URL url = getURLWithCommandParams(actualCommands); HttpClient clone = httpClient.newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build(); - final ExecWebSocketListener execWebSocketListener = new ExecWebSocketListener(in, out, err, errChannel, inPipe, outPipe, - errPipe, errChannelPipe, execListener, bufferSize); + final ExecWebSocketListener execWebSocketListener = new ExecWebSocketListener(getContext()); CompletableFuture startedFuture = clone.newWebSocketBuilder() .subprotocol("v4.channel.k8s.io") .uri(url.toURI()) @@ -349,31 +281,18 @@ URL getURLWithCommandParams(String[] commands) throws MalformedURLException { httpUrlBuilder.addQueryParameter("command", cmd); } - if (containerId != null && !containerId.isEmpty()) { - httpUrlBuilder.addQueryParameter("container", containerId); - } - if (withTTY) { - httpUrlBuilder.addQueryParameter("tty", "true"); - } - if (in != null || inPipe != null) { - httpUrlBuilder.addQueryParameter("stdin", "true"); - } - if (out != null || outPipe != null) { - httpUrlBuilder.addQueryParameter("stdout", "true"); - } - if (err != null || errPipe != null) { - httpUrlBuilder.addQueryParameter("stderr", "true"); - } + getContext().addQueryParameters(httpUrlBuilder); + return httpUrlBuilder.build(); } @Override - public CopyOrReadable file(String file) { + public PodOperationsImpl file(String file) { return new PodOperationsImpl(getContext().withFile(file), context); } @Override - public CopyOrReadable dir(String dir) { + public PodOperationsImpl dir(String dir) { return new PodOperationsImpl(getContext().withDir(dir), context); } @@ -397,9 +316,8 @@ public boolean copy(Path destination) { public boolean upload(InputStream inputStream) { return wrapRunWithOptionalDependency(() -> { try { - return PodUpload.uploadFileData(httpClient, getContext(), this, inputStream); + return PodUpload.uploadFileData(this, inputStream); } catch (Exception ex) { - Thread.currentThread().interrupt(); throw KubernetesClientException.launderThrowable(ex); } }, "TarArchiveOutputStream is provided by commons-compress"); @@ -409,9 +327,8 @@ public boolean upload(InputStream inputStream) { public boolean upload(Path path) { return wrapRunWithOptionalDependency(() -> { try { - return PodUpload.upload(httpClient, getContext(), this, path); + return PodUpload.upload(this, path); } catch (Exception ex) { - Thread.currentThread().interrupt(); throw KubernetesClientException.launderThrowable(ex); } }, "TarArchiveOutputStream is provided by commons-compress"); @@ -680,4 +597,8 @@ public BytesLimitTerminateTimeTailPrettyLoggable usingTimestamps() { public static String shellQuote(String value) { return "'" + value.replace("'", "'\\\\''") + "'"; } + + public PodOperationsImpl forUpload() { + return new PodOperationsImpl(getContext().toBuilder().forUpload(true).build(), context); + } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java index b3b75548488..6746ea1ee77 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java @@ -15,13 +15,9 @@ */ package io.fabric8.kubernetes.client.dsl.internal.uploadable; -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.dsl.internal.OperationSupport; -import io.fabric8.kubernetes.client.dsl.internal.PodOperationContext; -import io.fabric8.kubernetes.client.http.HttpClient; -import io.fabric8.kubernetes.client.http.WebSocket; +import io.fabric8.kubernetes.client.dsl.ExecWatch; +import io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl; import io.fabric8.kubernetes.client.utils.InputStreamPumper; -import io.fabric8.kubernetes.client.utils.URLUtils; import io.fabric8.kubernetes.client.utils.Utils; import io.fabric8.kubernetes.client.utils.internal.Base64; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; @@ -31,15 +27,11 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URL; +import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.zip.GZIPOutputStream; import static io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl.shellQuote; @@ -51,68 +43,78 @@ public class PodUpload { private PodUpload() { } - public static boolean upload(HttpClient client, PodOperationContext context, - OperationSupport operationSupport, Path pathToUpload) - throws IOException, InterruptedException { + public static boolean upload(PodOperationsImpl operation, Path pathToUpload) + throws IOException { - if (Utils.isNotNullOrEmpty(context.getFile()) && pathToUpload.toFile().isFile()) { - return uploadFile(client, context, operationSupport, pathToUpload); - } else if (Utils.isNotNullOrEmpty(context.getDir()) && pathToUpload.toFile().isDirectory()) { - return uploadDirectory(client, context, operationSupport, pathToUpload); + if (Utils.isNotNullOrEmpty(operation.getContext().getFile()) && pathToUpload.toFile().isFile()) { + return uploadFile(operation, pathToUpload); + } else if (Utils.isNotNullOrEmpty(operation.getContext().getDir()) && pathToUpload.toFile().isDirectory()) { + return uploadDirectory(operation, pathToUpload); } throw new IllegalArgumentException("Provided arguments are not valid (file, directory, path)"); } - public static boolean uploadFileData(HttpClient client, PodOperationContext context, - OperationSupport operationSupport, InputStream inputStream) - throws IOException, InterruptedException { - final PodUploadWebSocketListener podUploadWebSocketListener = initWebSocket( - buildCommandUrl(createExecCommandForUpload(context), context, operationSupport), client); - try ( - final Base64.InputStream b64In = new Base64.InputStream(inputStream, Base64.ENCODE)) { - podUploadWebSocketListener.waitUntilReady(operationSupport.getConfig().getRequestConfig().getUploadConnectionTimeout()); - InputStreamPumper.transferTo(b64In, podUploadWebSocketListener::send); - podUploadWebSocketListener.waitUntilComplete(operationSupport.getConfig().getRequestConfig().getUploadRequestTimeout()); - return true; + private static interface UploadProcessor { + + void process(OutputStream out) throws IOException; + + } + + private static boolean upload(PodOperationsImpl operation, String command, UploadProcessor processor) throws IOException { + operation = operation.forUpload(); + String containerId = operation.getContext().getContainerId(); + if (Utils.isNotNullOrEmpty(containerId)) { + operation = operation.inContainer(containerId); + } + try (ExecWatch execWatch = operation.exec("sh", "-c", command)) { + OutputStream out = execWatch.getInput(); + processor.process(out); + out.close(); // also flushes + execWatch.close(); + if (!Utils.waitUntilReady(execWatch.exitCode(), operation.getConfig().getRequestConfig().getUploadRequestTimeout(), + TimeUnit.MILLISECONDS)) { + return false; + } + Integer exitCode = execWatch.exitCode().getNow(null); + return exitCode == null || exitCode.intValue() == 0; } } - private static boolean uploadFile(HttpClient client, PodOperationContext context, - OperationSupport operationSupport, Path pathToUpload) - throws IOException, InterruptedException { + public static boolean uploadFileData(PodOperationsImpl operation, InputStream inputStream) + throws IOException { + String command = createExecCommandForUpload(operation.getContext().getFile()); + + return upload(operation, command, os -> { + try (final Base64.InputStream b64In = new Base64.InputStream(inputStream, Base64.ENCODE)) { + InputStreamPumper.transferTo(b64In, os::write); + } + }); + } + + private static boolean uploadFile(PodOperationsImpl operation, Path pathToUpload) + throws IOException { try (final FileInputStream fis = new FileInputStream(pathToUpload.toFile())) { - return uploadFileData(client, context, operationSupport, fis); + return uploadFileData(operation, fis); } } - private static boolean uploadDirectory(HttpClient client, PodOperationContext context, - OperationSupport operationSupport, Path pathToUpload) - throws IOException, InterruptedException { + private static boolean uploadDirectory(PodOperationsImpl operation, Path pathToUpload) + throws IOException { final String command = String.format( - "mkdir -p %1$s && base64 -d - | tar -C %1$s -xzf -", shellQuote(context.getDir())); - final PodUploadWebSocketListener podUploadWebSocketListener = initWebSocket( - buildCommandUrl(command, context, operationSupport), client); - try ( - final java.io.OutputStream os = InputStreamPumper.writableOutputStream(podUploadWebSocketListener::send); - final Base64.OutputStream b64Out = new Base64.OutputStream(os, Base64.ENCODE); - final GZIPOutputStream gzip = new GZIPOutputStream(b64Out)) { - podUploadWebSocketListener.waitUntilReady(operationSupport.getConfig().getRequestConfig().getUploadConnectionTimeout()); - try (final TarArchiveOutputStream tar = new TarArchiveOutputStream(gzip)) { + "mkdir -p %1$s && base64 -d - | tar -C %1$s -xzf -", shellQuote(operation.getContext().getDir())); + + return upload(operation, command, os -> { + try (final Base64.OutputStream b64Out = new Base64.OutputStream(os, Base64.ENCODE); + final GZIPOutputStream gzip = new GZIPOutputStream(b64Out); + final TarArchiveOutputStream tar = new TarArchiveOutputStream(gzip)) { tar.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX); for (File file : pathToUpload.toFile().listFiles()) { addFileToTar(null, file, tar); } tar.flush(); } - podUploadWebSocketListener.waitUntilComplete(operationSupport.getConfig().getRequestConfig().getUploadRequestTimeout()); - } catch (KubernetesClientException ex) { - if (ex.getCause() instanceof TimeoutException) { - return false; - } - throw ex; - } - return true; + }); } private static void addFileToTar(String rootTarPath, File file, TarArchiveOutputStream tar) @@ -131,43 +133,11 @@ private static void addFileToTar(String rootTarPath, File file, TarArchiveOutput } } - private static PodUploadWebSocketListener initWebSocket(URL url, HttpClient client) { - final PodUploadWebSocketListener podUploadWebSocketListener = new PodUploadWebSocketListener(); - final HttpClient clone = client.newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build(); - CompletableFuture startedFuture = clone.newWebSocketBuilder() - .subprotocol("v4.channel.k8s.io") - .uri(URI.create(url.toString())) - .buildAsync(podUploadWebSocketListener); - startedFuture.whenComplete((w, t) -> { - if (t != null) { - podUploadWebSocketListener.onError(w, t); - } - }); - return podUploadWebSocketListener; - } - - private static URL buildCommandUrl(String command, PodOperationContext context, OperationSupport operationSupport) - throws MalformedURLException { - - final StringBuilder commandBuilder = new StringBuilder(); - commandBuilder.append("exec?"); - commandBuilder.append("command=sh&command=-c"); - commandBuilder.append("&command="); - commandBuilder.append(URLUtils.encodeToUTF(command)); - if (context.getContainerId() != null && !context.getContainerId().isEmpty()) { - commandBuilder.append("&container=").append(context.getContainerId()); - } - commandBuilder.append("&stdin=true"); - commandBuilder.append("&stderr=true"); - return new URL( - URLUtils.join(operationSupport.getResourceUrl().toString(), commandBuilder.toString())); - } - - static String createExecCommandForUpload(PodOperationContext context) { - final String file = context.getFile(); + static String createExecCommandForUpload(String file) { String directoryTrimmedFromFilePath = file.substring(0, file.lastIndexOf('/')); final String directory = directoryTrimmedFromFilePath.isEmpty() ? "/" : directoryTrimmedFromFilePath; return String.format( "mkdir -p %s && base64 -d - > %s", shellQuote(directory), shellQuote(file)); } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadWebSocketListener.java deleted file mode 100644 index 65c3f108f5f..00000000000 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadWebSocketListener.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.fabric8.kubernetes.client.dsl.internal.uploadable; - -import io.fabric8.kubernetes.api.model.Status; -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.dsl.internal.ExecWebSocketListener; -import io.fabric8.kubernetes.client.http.WebSocket; -import io.fabric8.kubernetes.client.utils.Serialization; -import io.fabric8.kubernetes.client.utils.Utils; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.TimeUnit; - -public class PodUploadWebSocketListener implements WebSocket.Listener { - - private static final byte FLAG_STDIN = (byte) 0; - private static final long MAX_QUEUE_SIZE = 16 * 1024 * 1024L; - - private final CompletableFuture webSocketRef = new CompletableFuture<>(); - private final CompletableFuture completeFuture = new CompletableFuture<>(); - - @Override - public void onOpen(WebSocket webSocket) { - webSocketRef.complete(webSocket); - } - - @Override - public void onMessage(WebSocket webSocket, ByteBuffer bytes) { - byte streamID = bytes.get(0); - if (bytes.remaining() > 1) { - bytes.position(1); - KubernetesClientException exception = null; - String stringValue = StandardCharsets.UTF_8.decode(bytes).toString(); - if (streamID == 3) { - try { - Status status = Serialization.unmarshal(stringValue, Status.class); - if (status != null) { - if (ExecWebSocketListener.parseExitCode(status) == 0) { - completeFuture.complete(null); - return; - } - exception = new KubernetesClientException(status); - } - } catch (Exception e) { - // can't determine an exit code, just use the whole message as the error - } - } - if (exception == null) { - exception = new KubernetesClientException(stringValue); - } - completeFuture.completeExceptionally(exception); - } - } - - @Override - public void onClose(WebSocket webSocket, int code, String reason) { - completeFuture.complete(null); - } - - @Override - public void onError(WebSocket webSocket, Throwable t) { - webSocketRef.completeExceptionally(t); - completeFuture.completeExceptionally(t); - } - - final void checkError() { - if (completeFuture.isDone()) { - try { - completeFuture.getNow(null); - } catch (CompletionException e) { - throw KubernetesClientException.launderThrowable(e.getCause()); - } - } - } - - final void waitUntilReady(int timeoutMilliseconds) { - Utils.waitUntilReadyOrFail(webSocketRef, timeoutMilliseconds, TimeUnit.MILLISECONDS); - } - - final void waitUntilComplete(int timeoutMilliseconds) throws InterruptedException { - while (webSocketRef.getNow(null).queueSize() > 0 && !completeFuture.isDone()) { - checkError(); - Thread.sleep(50); - } - webSocketRef.getNow(null).sendClose(1000, "Operation completed"); - Utils.waitUntilReadyOrFail(completeFuture, timeoutMilliseconds, TimeUnit.MILLISECONDS); - checkError(); - } - - final void send(byte[] data, int offset, int length) { - checkError(); - waitForQueue(length); - byte[] toSend = new byte[length + 1]; - toSend[0] = FLAG_STDIN; - System.arraycopy(data, offset, toSend, 1, length); - if (!webSocketRef.getNow(null).send(ByteBuffer.wrap(toSend))) { - this.completeFuture.completeExceptionally(new IOException("could not send")); - checkError(); - } - } - - final void waitForQueue(int length) { - try { - while (webSocketRef.getNow(null).queueSize() + length > MAX_QUEUE_SIZE && !Thread.interrupted()) { - checkError(); - Thread.sleep(50L); - } - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } - -} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/PodOperationUtil.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/PodOperationUtil.java index 760d1465124..25ab30455cd 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/PodOperationUtil.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/PodOperationUtil.java @@ -63,12 +63,9 @@ public static List getFilteredPodsForLogs(PodOperationsImpl podOper public static PodOperationsImpl getGenericPodOperations(OperationContext context, boolean isPretty, Integer podLogWaitTimeout, String containerId) { - return new PodOperationsImpl(new PodOperationContext(containerId, - null, null, null, null, null, - null, null, null, false, false, - false, null, null, null, isPretty, - null, null, null, - null, null, podLogWaitTimeout), context.withName(null).withApiGroupName(null).withApiGroupVersion("v1")); + return new PodOperationsImpl( + PodOperationContext.builder().containerId(containerId).prettyOutput(isPretty).logWaitTimeout(podLogWaitTimeout).build(), + context.withName(null).withApiGroupName(null).withApiGroupVersion("v1")); } public static List getPodOperationsForController(OperationContext context, String controllerUid, diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListenerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListenerTest.java index 66e0c527558..2e0330e1912 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListenerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListenerTest.java @@ -18,9 +18,20 @@ import io.fabric8.kubernetes.api.model.StatusBuilder; import io.fabric8.kubernetes.api.model.StatusCause; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.http.WebSocket; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.nio.ByteBuffer; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.verify; +import static org.mockito.internal.verification.VerificationModeFactory.times; class ExecWebSocketListenerTest { @@ -49,8 +60,63 @@ void testParseExitCodeNonZero() { assertEquals(126, ExecWebSocketListener .parseExitCode(new StatusBuilder().withStatus("Failed") .withReason(ExecWebSocketListener.REASON_NON_ZERO_EXIT_CODE) - .withNewDetails().withCauses(new StatusCause("ExitCode", "126", ExecWebSocketListener.CAUSE_REASON_EXIT_CODE)).endDetails() + .withNewDetails().withCauses(new StatusCause("ExitCode", "126", ExecWebSocketListener.CAUSE_REASON_EXIT_CODE)) + .endDetails() .build())); } + @Test + void testSendShouldTruncateAndSendFlaggedWebSocketData() { + final WebSocket mockedWebSocket = Mockito.mock(WebSocket.class); + Mockito.when(mockedWebSocket.send(Mockito.any())).thenReturn(true); + + ExecWebSocketListener listner = new ExecWebSocketListener(new PodOperationContext()); + + listner.onOpen(mockedWebSocket); + final byte[] toSend = new byte[] { 1, 3, 3, 7, 0 }; + + listner.sendWithErrorChecking(toSend, 0, 4); + + verify(mockedWebSocket, times(1)) + .send(ByteBuffer.wrap(new byte[] { (byte) 0, (byte) 1, (byte) 3, (byte) 3, (byte) 7 })); + } + + @Test + void testCheckErrorHasErrorFromMessageShouldThrowException() { + ExecWebSocketListener listener = new ExecWebSocketListener(new PodOperationContext().toBuilder().forUpload(true).build()); + + listener.onMessage(null, ByteBuffer.wrap(new byte[] { (byte) 2, (byte) 1, (byte) 1 })); + + assertThrows(KubernetesClientException.class, () -> listener.checkError()); + } + + @Test + void testCheckErrorHasErrorFromFailureShouldThrowException() { + ExecWebSocketListener listener = new ExecWebSocketListener(new PodOperationContext()); + + listener.onError(null, new IOException("here")); + + assertThrows(KubernetesClientException.class, () -> listener.checkError()); + } + + @Test + void testGracefulClose() { + ExecWebSocketListener listener = new ExecWebSocketListener(new PodOperationContext()); + WebSocket mock = Mockito.mock(WebSocket.class); + listener.onOpen(mock); + listener.close(); + + assertFalse(listener.exitCode().isDone()); + verify(mock).sendClose(Mockito.anyInt(), Mockito.anyString()); + } + + @Test + void testOnClose() { + ExecWebSocketListener listener = new ExecWebSocketListener(new PodOperationContext()); + WebSocket mock = Mockito.mock(WebSocket.class); + listener.onClose(mock, 1000, "testing"); + + assertNull(listener.exitCode().join()); + } + } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadTest.java index f7851d5c690..d74f6db5bd8 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadTest.java @@ -15,13 +15,16 @@ */ package io.fabric8.kubernetes.client.dsl.internal.uploadable; -import io.fabric8.kubernetes.client.dsl.internal.OperationSupport; +import io.fabric8.kubernetes.client.Client; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.dsl.internal.ExecWebSocketListener; +import io.fabric8.kubernetes.client.dsl.internal.OperationContext; import io.fabric8.kubernetes.client.dsl.internal.PodOperationContext; +import io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl; import io.fabric8.kubernetes.client.http.HttpClient; import io.fabric8.kubernetes.client.http.WebSocket; import io.fabric8.kubernetes.client.utils.InputStreamPumper; import io.fabric8.kubernetes.client.utils.InputStreamPumper.Writable; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -30,7 +33,6 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.net.URL; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.file.Path; @@ -54,10 +56,9 @@ class PodUploadTest { private HttpClient mockClient; - private PodOperationContext mockContext; - private OperationSupport operationSupport; private Path mockPathToUpload; private WebSocket mockWebSocket; + private PodOperationsImpl operation; @FunctionalInterface public interface PodUploadTester { @@ -67,29 +68,25 @@ public interface PodUploadTester { @BeforeEach void setUp() throws IOException { mockClient = Mockito.mock(HttpClient.class, Mockito.RETURNS_DEEP_STUBS); - mockContext = Mockito.mock(PodOperationContext.class, Mockito.RETURNS_DEEP_STUBS); - operationSupport = Mockito.mock(OperationSupport.class, Mockito.RETURNS_DEEP_STUBS); mockPathToUpload = Mockito.mock(Path.class, Mockito.RETURNS_DEEP_STUBS); mockWebSocket = Mockito.mock(WebSocket.class, Mockito.RETURNS_DEEP_STUBS); Mockito.when(mockWebSocket.send(Mockito.any())).thenReturn(true); when(mockClient.newBuilder().readTimeout(anyLong(), any(TimeUnit.class)).build()).thenReturn(mockClient); - when(operationSupport.getResourceUrl()) - .thenReturn(new URL("https://openshift.com:8443/api/v1/namespaces/default/pods/mock-pod/")); - } - @AfterEach - void tearDown() { - mockClient = null; - mockContext = null; - operationSupport = null; - mockPathToUpload = null; + Client client = Mockito.mock(Client.class); + Config config = Mockito.mock(Config.class, Mockito.RETURNS_DEEP_STUBS); + when(config.getMasterUrl()).thenReturn("https://openshift.com:8443"); + when(config.getNamespace()).thenReturn("default"); + when(client.getConfiguration()).thenReturn(config); + when(client.getHttpClient()).thenReturn(mockClient); + this.operation = new PodOperationsImpl(new PodOperationContext(), new OperationContext().withClient(client)); } @Test void testUploadInvalidParametersShouldThrowException() { final IllegalArgumentException result = assertThrows(IllegalArgumentException.class, - () -> PodUpload.upload(mockClient, mockContext, operationSupport, mockPathToUpload)); + () -> PodUpload.upload(operation, mockPathToUpload)); assertThat(result.getMessage(), equalTo("Provided arguments are not valid (file, directory, path)")); @@ -99,14 +96,14 @@ void testUploadInvalidParametersShouldThrowException() { void upload_whenFilePathProvided_shouldUploadFile() throws IOException, InterruptedException { when(mockPathToUpload.toFile()) .thenReturn(new File(PodUpload.class.getResource("/upload/upload-sample.txt").getFile())); - uploadFileAndVerify(() -> PodUpload.upload(mockClient, mockContext, operationSupport, mockPathToUpload)); + uploadFileAndVerify(() -> PodUpload.upload(operation, mockPathToUpload)); verify(mockPathToUpload, atLeast(1)).toFile(); } @Test void uploadFileData_whenByteArrayInputStreamProvided_shouldUploadFile() throws IOException, InterruptedException { InputStream inputStream = new ByteArrayInputStream("test data".getBytes()); - uploadFileAndVerify(() -> PodUpload.uploadFileData(mockClient, mockContext, operationSupport, inputStream)); + uploadFileAndVerify(() -> PodUpload.uploadFileData(operation, inputStream)); } @Test @@ -120,12 +117,12 @@ void testUploadDirectoryLongFileNameShouldUploadDirectory() throws Exception { } private void uploadDirectoryAndVerify(String resourcePath) throws IOException, InterruptedException { - when(mockContext.getDir()).thenReturn("/mock/dir"); + this.operation = operation.dir("/mock/dir"); when(mockPathToUpload.toFile()) .thenReturn(new File(PodUpload.class.getResource(resourcePath).getFile())); WebSocket.Builder builder = Mockito.mock(WebSocket.Builder.class, Mockito.RETURNS_SELF); when(builder.buildAsync(any())).thenAnswer(newWebSocket -> { - final PodUploadWebSocketListener wsl = newWebSocket.getArgument(0, PodUploadWebSocketListener.class); + final ExecWebSocketListener wsl = newWebSocket.getArgument(0, ExecWebSocketListener.class); // Set ready status wsl.onOpen(mockWebSocket); wsl.onMessage(mockWebSocket, ByteBuffer.wrap(new byte[] { (byte) 0 })); @@ -138,13 +135,13 @@ private void uploadDirectoryAndVerify(String resourcePath) throws IOException, I }); when(mockClient.newWebSocketBuilder()).thenReturn(builder); - final boolean result = PodUpload.upload(mockClient, mockContext, operationSupport, mockPathToUpload); + final boolean result = PodUpload.upload(operation, mockPathToUpload); assertThat(result, equalTo(true)); verify(mockPathToUpload, atLeast(1)).toFile(); verify(builder, times(1)).uri(argThat(request -> { assertThat(request.toString(), equalTo( - "https://openshift.com:8443/api/v1/namespaces/default/pods/mock-pod/exec?command=sh&command=-c&command=mkdir+-p+%27%2Fmock%2Fdir%27+%26%26+base64+-d+-+%7C+tar+-C+%27%2Fmock%2Fdir%27+-xzf+-&stdin=true&stderr=true")); + "https://openshift.com:8443/api/v1/namespaces/default/pods/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20base64%20-d%20-%20%7C%20tar%20-C%20%27%2Fmock%2Fdir%27%20-xzf%20-&stdin=true&stderr=true")); return true; })); verify(mockWebSocket, atLeast(1)).send(any(ByteBuffer.class)); @@ -164,11 +161,8 @@ void testCopy() throws Exception { @Test void createExecCommandForUpload_withFileInRootPath_shouldCreateValidExecCommandForUpload() { - // Given - when(mockContext.getFile()).thenReturn("/cp.log"); - // When - String result = PodUpload.createExecCommandForUpload(mockContext); + String result = PodUpload.createExecCommandForUpload("/cp.log"); // Then assertThat(result, equalTo("mkdir -p '/' && base64 -d - > '/cp.log'")); @@ -176,21 +170,18 @@ void createExecCommandForUpload_withFileInRootPath_shouldCreateValidExecCommandF @Test void createExecCommandForUpload_withNormalFile_shouldCreateValidExecCommandForUpload() { - // Given - when(mockContext.getFile()).thenReturn("/tmp/foo/cp.log"); - // When - String result = PodUpload.createExecCommandForUpload(mockContext); + String result = PodUpload.createExecCommandForUpload("/tmp/foo/cp.log"); // Then assertThat(result, equalTo("mkdir -p '/tmp/foo' && base64 -d - > '/tmp/foo/cp.log'")); } void uploadFileAndVerify(PodUploadTester fileUploadMethodToTest) throws IOException, InterruptedException { - when(mockContext.getFile()).thenReturn("/mock/dir/file"); + this.operation = operation.file("/mock/dir/file"); WebSocket.Builder builder = Mockito.mock(WebSocket.Builder.class, Mockito.RETURNS_SELF); when(builder.buildAsync(any())).thenAnswer(newWebSocket -> { - final PodUploadWebSocketListener wsl = newWebSocket.getArgument(0, PodUploadWebSocketListener.class); + final ExecWebSocketListener wsl = newWebSocket.getArgument(0, ExecWebSocketListener.class); // Set ready status wsl.onOpen(mockWebSocket); wsl.onMessage(mockWebSocket, ByteBuffer.wrap(new byte[] { (byte) 0 })); @@ -208,7 +199,7 @@ void uploadFileAndVerify(PodUploadTester fileUploadMethodToTest) throws assertThat(result, equalTo(true)); verify(builder, times(1)).uri(argThat(request -> { assertThat(request.toString(), equalTo( - "https://openshift.com:8443/api/v1/namespaces/default/pods/mock-pod/exec?command=sh&command=-c&command=mkdir+-p+%27%2Fmock%2Fdir%27+%26%26+base64+-d+-+%3E+%27%2Fmock%2Fdir%2Ffile%27&stdin=true&stderr=true")); + "https://openshift.com:8443/api/v1/namespaces/default/pods/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20base64%20-d%20-%20%3E%20%27%2Fmock%2Fdir%2Ffile%27&stdin=true&stderr=true")); return true; })); verify(mockWebSocket, atLeast(1)).send(any(ByteBuffer.class)); diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadWebSocketListenerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadWebSocketListenerTest.java deleted file mode 100644 index a9324241e4c..00000000000 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadWebSocketListenerTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.fabric8.kubernetes.client.dsl.internal.uploadable; - -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.http.WebSocket; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.verify; -import static org.mockito.internal.verification.VerificationModeFactory.times; - -class PodUploadWebSocketListenerTest { - - private PodUploadWebSocketListener podUploadWebSocketListener; - - @BeforeEach - void setUp() { - podUploadWebSocketListener = new PodUploadWebSocketListener(); - } - - @AfterEach - void tearDown() { - podUploadWebSocketListener = null; - } - - @Test - void testSendShouldTruncateAndSendFlaggedWebSocketData() { - final WebSocket mockedWebSocket = Mockito.mock(WebSocket.class); - Mockito.when(mockedWebSocket.send(Mockito.any())).thenReturn(true); - podUploadWebSocketListener.onOpen(mockedWebSocket); - final byte[] toSend = new byte[] { 1, 3, 3, 7, 0 }; - - podUploadWebSocketListener.send(toSend, 0, 4); - - verify(mockedWebSocket, times(1)) - .send(eq(ByteBuffer.wrap(new byte[] { (byte) 0, (byte) 1, (byte) 3, (byte) 3, (byte) 7 }))); - } - - @Test() - void testCheckErrorHasErrorFromMessageShouldThrowException() { - podUploadWebSocketListener.onMessage(null, ByteBuffer.wrap("Error".getBytes())); - - assertThrows(KubernetesClientException.class, () -> podUploadWebSocketListener.checkError()); - } - - @Test() - void testCheckErrorHasErrorFromFailureShouldThrowException() { - podUploadWebSocketListener.onError(null, new IOException("here")); - - assertThrows(KubernetesClientException.class, () -> podUploadWebSocketListener.checkError()); - } -} From de1ae180fbe6799cd647a94b83475fbd908f2d09 Mon Sep 17 00:00:00 2001 From: Steven Hawkins Date: Tue, 3 May 2022 10:21:17 -0400 Subject: [PATCH 2/2] Update kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java Co-authored-by: Chris Laprun --- .../main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java index 07786daf42d..ce074611954 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java @@ -32,7 +32,7 @@ public interface ExecWatch extends Closeable { /** * Gets the {@link InputStream} for stdOut if one is associated * - * @return the stdErr stream + * @return the stdOut stream */ InputStream getOutput();