Skip to content

Commit

Permalink
#166: handle stdin properly with HTTP
Browse files Browse the repository at this point in the history
  • Loading branch information
ypujante committed Oct 31, 2012
1 parent 7f7e020 commit 1e49c13
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,10 @@ private class ShellExec
private def executeBlockingCall = {

// if stdin then provide it to subprocess
if(stdin)
{
def stream = new BufferedOutputStream(process.outputStream)
IOUtils.copy(new BufferedInputStream(stdin), stream)
stream.close()
stdin?.withStream { InputStream sis ->
new BufferedOutputStream(process.outputStream).withStream { os ->
os << new BufferedInputStream(sis)
}
}

// make sure that the thread complete properly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,14 @@ public class FutureTaskExecution<T> implements FutureExecution, Callable<T>
T runSync()
{
_future.run()
_future.get()
try
{
_future.get()
}
catch(ExecutionException e)
{
throw e.cause
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,19 @@ class TestAgentRestClient extends GroovyTestCase
ram = new RAMDirectory()
RAMResourceProvider rp = new RAMResourceProvider(ram)
fileSystem = [
mkdirs: { dir ->
ram.mkdirhier(dir.toString())
return rp.createResource(dir.toString())
},
rmdirs: { dir ->
ram.rm(dir.toString())
},
mkdirs: { dir ->
ram.mkdirhier(dir.toString())
return rp.createResource(dir.toString())
},
rmdirs: { dir ->
ram.rm(dir.toString())
},

getRoot: { rp.createResource('/') },
getRoot: { rp.createResource('/') },

getTmpRoot: { rp.createResource('/tmp') },
getTmpRoot: { rp.createResource('/tmp') },

newFileSystem: { r,t -> fileSystem }
newFileSystem: { r,t -> fileSystem }
] as FileSystem

// the agent is logging for each script... we don't want the output in the test
Expand Down Expand Up @@ -638,6 +638,10 @@ gc: 1000
sslEnabled: false).withRemoteAgent(serverURI) { arc ->
FileSystemImpl.createTempFileSystem() { FileSystem fs ->
def shell = new ShellImpl(fileSystem: fs)

// setting a shell for commands
router.context.getAttributes().put('shellForCommands', shell)

def shellScript = shell.fetch("./src/test/resources/shellScriptTestAgentExecShellCommand.sh")
// let's make sure it is executable
fs.chmod(shellScript, '+x')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,20 @@ import org.restlet.Request
import org.restlet.Response
import org.restlet.representation.InputRepresentation
import org.restlet.representation.Representation
import org.linkedin.glu.agent.api.Shell
import org.linkedin.groovy.util.lang.GroovyLangUtils
import org.linkedin.glu.groovy.utils.io.GluGroovyIOUtils

/**
* @author yan@pongasoft.com */
public class CommandsResource extends BaseResource
{
private final Shell _shell

CommandsResource(Context context, Request request, Response response)
{
super(context, request, response);
_shell = context.attributes['shellForCommands'] as Shell
}

@Override
Expand All @@ -46,8 +52,10 @@ public class CommandsResource extends BaseResource
noException {
def args = toArgs(request.originalRef.queryAsForm)

if(representation instanceof InputRepresentation)
args.stdin = representation.stream
def stdin = getCopyOfStdin(representation)

if(stdin)
args.stdin = stdin

def res

Expand All @@ -60,4 +68,45 @@ public class CommandsResource extends BaseResource
throw new UnsupportedOperationException("unknown command type [${args.toString()}]")
}
}

/**
* Due to the nature of HTTP, we need to first copy stdin locally because we need to return right
* away and use the local copy instead
*
* @return
*/
private InputStream getCopyOfStdin(Representation representation)
{
if(representation instanceof InputRepresentation)
{
def copyOfStdin = _shell.tempFile()

String password = UUID.randomUUID().toString()

try
{
// we store an encrypted copy of stdin since it is on the file system
_shell.withOutputStream(copyOfStdin) { OutputStream os ->
GluGroovyIOUtils.withStreamToEncrypt(password, os) { OutputStream eos ->
eos << representation.stream
}
}

return GluGroovyIOUtils.decryptStream(password, copyOfStdin.inputStream)
}
finally
{
// implementation note: the temp file is deleted after getting an input stream out of it
// under Unix this is fine... the file will no longer be available on the filesystem
// but will still "exist" until the stream is read/closed
GroovyLangUtils.noException {
_shell.rm(copyOfStdin)
}
}
}
else
{
return null
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ class AgentMain implements LifecycleListener, Configurable
def attributes = context.getAttributes()

attributes.put('agent', _proxiedAgent)
attributes.put('shellForCommands', _agent.shellForCommands)
attributes.put('configurable', this)
attributes.put('codec', remoteConfigCodec)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2012 Yan Pujante
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.linkedin.glu.groovy.utils.io

import org.linkedin.groovy.util.io.GroovyIOUtils

import javax.crypto.Cipher
import javax.crypto.CipherInputStream
import javax.crypto.CipherOutputStream
import javax.crypto.spec.SecretKeySpec

/**
* @author yan@pongasoft.com */
public class GluGroovyIOUtils extends GroovyIOUtils
{
static InputStream decryptStream(String password, InputStream inputStream)
{
new CipherInputStream(inputStream, computeCipher(password, Cipher.DECRYPT_MODE))
}

static def withStreamToDecrypt(String password, InputStream inputStream, Closure closure)
{
decryptStream(password, inputStream).withStream { closure(it) }
}

static OutputStream encryptStream(String password, OutputStream outputStream)
{
new CipherOutputStream(outputStream, computeCipher(password, Cipher.ENCRYPT_MODE))
}

static def withStreamToEncrypt(String password, OutputStream outputStream, Closure closure)
{
encryptStream(password, outputStream).withStream { closure(it) }
}

private static Cipher computeCipher(String password, int mode)
{
SecretKeySpec key = new SecretKeySpec(password.getBytes("UTF-8"), "Blowfish")
Cipher cipher = Cipher.getInstance("Blowfish")
cipher.init(mode, key)
return cipher
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public class MultiplexedInputStream extends InputStream

private boolean _closed = false;
private int _endOfStream = 0;
private Collection<Throwable> _exceptions = new ArrayList<Throwable>();

/**
* Constructor
Expand Down Expand Up @@ -258,7 +259,7 @@ public int read(byte[] b, int off, int len) throws IOException
{
try
{
while(_multiplexedBuffer.position() == 0 && _endOfStream != 0 && !_closed)
while(_multiplexedBuffer.position() == 0 && _endOfStream != 0 && !_closed && _exceptions.isEmpty())
_multiplexedBuffer.wait();
}
catch(InterruptedException e)
Expand All @@ -269,6 +270,19 @@ public int read(byte[] b, int off, int len) throws IOException
if(_closed)
throw new IOException("closed");

// some exceptions were generated...
if(!_exceptions.isEmpty())
{
Throwable exceptionToThrow = null;
for(Throwable throwable : _exceptions)
{
if(exceptionToThrow == null)
exceptionToThrow = throwable;
else
log.warn("Multiple exception detected. This one is ignored", throwable);
}
throw new IOException(exceptionToThrow);
}
// nothing else to read... reach end of all streams!
if(_multiplexedBuffer.position() == 0)
return -1;
Expand Down Expand Up @@ -462,6 +476,14 @@ public Long call() throws Exception
while(_buffer.position() > 0)
writeToMultiplexBuffer();
}
catch(Throwable th)
{
synchronized(_multiplexedBuffer)
{
// no need to call notifyAll: the finally block will take care of it...
_exceptions.add(th);
}
}
finally
{
synchronized(_multiplexedBuffer)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2012 Yan Pujante
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package test.utils.io

import org.linkedin.glu.groovy.utils.io.GluGroovyIOUtils

/**
* @author yan@pongasoft.com */
public class TestEncryptedStreams extends GroovyTestCase
{
public void testEncryptAndDecryptWithSamePassword()
{
String text = "this is my test string"

ByteArrayOutputStream baos = new ByteArrayOutputStream()

String password = UUID.randomUUID().toString()

GluGroovyIOUtils.withStreamToEncrypt(password, baos) { OutputStream os ->
os << text
}

ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray())

baos = new ByteArrayOutputStream()

GluGroovyIOUtils.withStreamToDecrypt(password, bais) { InputStream is ->
baos << is
}

assertEquals(text, new String(baos.toByteArray(), "UTF-8"))

baos = new ByteArrayOutputStream()

GluGroovyIOUtils.withStreamToDecrypt("different", bais) { InputStream is ->
baos << is
}

assertTrue(text != new String(baos.toByteArray(), "UTF-8"))

}
}

0 comments on commit 1e49c13

Please sign in to comment.