Skip to content

Commit

Permalink
#166: added test for CommandsServiceImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
ypujante committed Nov 8, 2012
1 parent 5f2370f commit ecb8396
Show file tree
Hide file tree
Showing 21 changed files with 993 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,17 @@ public class CommandManagerImpl implements CommandManager
def actionArgs = [*:command.args]

// handle stdin...
storage.withStorageInput(StreamType.STDIN) { stdin ->
storage.withOrWithoutStorageInput(StreamType.STDIN) { stdin ->
if(stdin)
actionArgs.stdin = stdin

// handle stdout...
storage.withStorageOutput(StreamType.STDOUT) { stdout ->
storage.withOrWithoutStorageOutput(StreamType.STDOUT) { stdout ->
if(stdout)
actionArgs.stdout = stdout

// handle stderr
storage.withStorageOutput(StreamType.STDERR) { stderr ->
storage.withOrWithoutStorageOutput(StreamType.STDERR) { stderr ->
if(stderr)
actionArgs.stderr = stderr

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public abstract class AbstractCommandStreamStorage<T extends AbstractCommandExec
T ioStorage

@Override
def withStorageOutput(StreamType streamType, Closure c)
def withOrWithoutStorageOutput(StreamType streamType, Closure c)
{
def output = findStorageOutput(streamType)
if(output)
Expand All @@ -50,7 +50,7 @@ public abstract class AbstractCommandStreamStorage<T extends AbstractCommandExec
}

@Override
def withStorageInput(StreamType streamType, Closure c)
def withOrWithoutStorageInput(StreamType streamType, Closure c)
{
def input = findStorageInput(streamType)
if(input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ public interface CommandStreamStorage
{
// output
OutputStream findStorageOutput(StreamType streamType)
def withStorageOutput(StreamType streamType, Closure c)
def withOrWithoutStorageOutput(StreamType streamType, Closure c)

// input
InputStream findStorageInput(StreamType streamType)
def withStorageInput(StreamType streamType, Closure c)
def withOrWithoutStorageInput(StreamType streamType, Closure c)

/**
* @return map with <code>stream</code> and <code>size</code> or <code>null</code>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public class FileSystemCommandExecutionIOStorage extends AbstractCommandExecutio

// save stdin if there is any
if(stdin)
storage.withStorageOutput(StreamType.STDIN) { it << stdin }
storage.withOrWithoutStorageOutput(StreamType.STDIN) { it << stdin }

return storage
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package org.linkedin.glu.commands.impl

import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.linkedin.util.annotations.Initializable
import org.linkedin.glu.utils.collections.EvictingWithLRUPolicyMap
import org.linkedin.util.annotations.Initializer

Expand All @@ -31,14 +30,13 @@ public class MemoryCommandExecutionIOStorage extends AbstractCommandExecutionIOS
public static final String MODULE = MemoryCommandExecutionIOStorage.class.getName();
public static final Logger log = LoggerFactory.getLogger(MODULE);

@Initializable
int maxNumberOfElements = 25
private int _maxNumberOfElements = 25

/**
* Completed commands: keeps a maximum number of elements.
*/
Map<String, CommandExecution> completedCommands =
new EvictingWithLRUPolicyMap<String, CommandExecution>(maxNumberOfElements)
new EvictingWithLRUPolicyMap<String, CommandExecution>(_maxNumberOfElements)

/**
* The commands that are currently executing */
Expand All @@ -48,10 +46,15 @@ public class MemoryCommandExecutionIOStorage extends AbstractCommandExecutionIOS
* For the compile to stop bugging me with commands being non final... */
private final Object _lock = new Object()

int getMaxNumberOfElements()
{
return _maxNumberOfElements
}

@Initializer
void setMaxNumberOfElements(int maxNumberOfElements)
{
this.maxNumberOfElements = maxNumberOfElements
_maxNumberOfElements = maxNumberOfElements
completedCommands = new EvictingWithLRUPolicyMap<String, CommandExecution>(maxNumberOfElements)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ public class TestFileSystemCommandExecutionIOStorage extends GroovyTestCase
long completionTime = 0L

def processing = { CommandStreamStorage storage ->
storage.withStorageInput(StreamType.STDIN) { stdin ->
storage.withOrWithoutStorageInput(StreamType.STDIN) { stdin ->
assertNull("no stdin", stdin)

storage.withStorageOutput(StreamType.STDOUT) { stdout ->
storage.withOrWithoutStorageOutput(StreamType.STDOUT) { stdout ->

storage.withStorageOutput(StreamType.STDERR) { stderr->
storage.withOrWithoutStorageOutput(StreamType.STDERR) { stderr->
stdout << "out0"
stderr << "err0"

Expand Down Expand Up @@ -268,12 +268,12 @@ public class TestFileSystemCommandExecutionIOStorage extends GroovyTestCase
assertFalse(fs.toResource("${path}/${commandId}/${ioStorage.stderrStreamFileName}").exists())

def processing = { CommandStreamStorage storage ->
storage.withStorageInput(StreamType.STDIN) { stdin ->
storage.withOrWithoutStorageInput(StreamType.STDIN) { stdin ->
assertEquals("in0", stdin.text)

storage.withStorageOutput(StreamType.STDOUT) { stdout ->
storage.withOrWithoutStorageOutput(StreamType.STDOUT) { stdout ->

storage.withStorageOutput(StreamType.STDERR) { stderr->
storage.withOrWithoutStorageOutput(StreamType.STDERR) { stderr->
stdout << "out0"
stderr << "err0"

Expand Down Expand Up @@ -420,12 +420,12 @@ public class TestFileSystemCommandExecutionIOStorage extends GroovyTestCase
assertFalse(fs.toResource("${path}/${commandId}/${ioStorage.stderrStreamFileName}").exists())

def processing = { CommandStreamStorage storage ->
storage.withStorageInput(StreamType.STDIN) { stdin ->
storage.withOrWithoutStorageInput(StreamType.STDIN) { stdin ->
assertNull("no stdin", stdin)

storage.withStorageOutput(StreamType.STDOUT) { stdout ->
storage.withOrWithoutStorageOutput(StreamType.STDOUT) { stdout ->

storage.withStorageOutput(StreamType.STDERR) { stderr->
storage.withOrWithoutStorageOutput(StreamType.STDERR) { stderr->
assertNull("stderr redirected", stderr)

stdout << "out0"
Expand Down Expand Up @@ -550,11 +550,11 @@ public class TestFileSystemCommandExecutionIOStorage extends GroovyTestCase
assertFalse(fs.toResource("${path}/${commandId}/${ioStorage.stderrStreamFileName}").exists())

def processing = { CommandStreamStorage storage ->
storage.withStorageInput(StreamType.STDIN) { stdin ->
storage.withOrWithoutStorageInput(StreamType.STDIN) { stdin ->

storage.withStorageOutput(StreamType.STDOUT) { stdout ->
storage.withOrWithoutStorageOutput(StreamType.STDOUT) { stdout ->

storage.withStorageOutput(StreamType.STDERR) { stderr->
storage.withOrWithoutStorageOutput(StreamType.STDERR) { stderr->

stdout.write("o0".bytes)
stderr.write("e0".bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ public class TestMemoryCommandExecutionIOStorage extends GroovyTestCase
assertEquals(1, ioStorage.executingCommands.size())
assertTrue(ioStorage.executingCommands[ce.id].is(ce))

storage.withStorageInput(StreamType.STDIN) { stdin ->
storage.withOrWithoutStorageInput(StreamType.STDIN) { stdin ->
assertNull("no stdin", stdin)

storage.withStorageOutput(StreamType.STDOUT) { stdout ->
storage.withOrWithoutStorageOutput(StreamType.STDOUT) { stdout ->

storage.withStorageOutput(StreamType.STDERR) { stderr->
storage.withOrWithoutStorageOutput(StreamType.STDERR) { stderr->
stdout << "out0"
stderr << "err0"

Expand Down Expand Up @@ -200,12 +200,12 @@ public class TestMemoryCommandExecutionIOStorage extends GroovyTestCase
assertEquals(1, ioStorage.executingCommands.size())
assertTrue(ioStorage.executingCommands[ce.id].is(ce))

storage.withStorageInput(StreamType.STDIN) { stdin ->
storage.withOrWithoutStorageInput(StreamType.STDIN) { stdin ->
assertEquals("in0", stdin.text)

storage.withStorageOutput(StreamType.STDOUT) { stdout ->
storage.withOrWithoutStorageOutput(StreamType.STDOUT) { stdout ->

storage.withStorageOutput(StreamType.STDERR) { stderr->
storage.withOrWithoutStorageOutput(StreamType.STDERR) { stderr->
stdout << "out0"
stderr << "err0"

Expand Down Expand Up @@ -308,12 +308,12 @@ public class TestMemoryCommandExecutionIOStorage extends GroovyTestCase
assertEquals(1, ioStorage.executingCommands.size())
assertTrue(ioStorage.executingCommands[ce.id].is(ce))

storage.withStorageInput(StreamType.STDIN) { stdin ->
storage.withOrWithoutStorageInput(StreamType.STDIN) { stdin ->
assertNull("no stdin", stdin)

storage.withStorageOutput(StreamType.STDOUT) { stdout ->
storage.withOrWithoutStorageOutput(StreamType.STDOUT) { stdout ->

storage.withStorageOutput(StreamType.STDERR) { stderr->
storage.withOrWithoutStorageOutput(StreamType.STDERR) { stderr->
assertNull("stderr redirected", stderr)

stdout << "out0"
Expand Down Expand Up @@ -402,11 +402,11 @@ public class TestMemoryCommandExecutionIOStorage extends GroovyTestCase
long startTime = clock.currentTimeMillis()

def processing = { CommandStreamStorage storage ->
storage.withStorageInput(StreamType.STDIN) { stdin ->
storage.withOrWithoutStorageInput(StreamType.STDIN) { stdin ->

storage.withStorageOutput(StreamType.STDOUT) { stdout ->
storage.withOrWithoutStorageOutput(StreamType.STDOUT) { stdout ->

storage.withStorageOutput(StreamType.STDERR) { stderr->
storage.withOrWithoutStorageOutput(StreamType.STDERR) { stderr->

stdout.write("o0".bytes)
stderr.write("e0".bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.linkedin.glu.utils.core.DisabledFeatureProxy
import org.linkedin.glu.orchestration.engine.commands.CommandsService
import org.linkedin.glu.orchestration.engine.commands.CommandExecutionStorageImpl
import org.linkedin.glu.commands.impl.FileSystemCommandExecutionIOStorage
import org.linkedin.glu.commands.impl.MemoryCommandExecutionIOStorage

// Place your Spring DSL code here
beans = {
Expand Down Expand Up @@ -70,18 +71,19 @@ beans = {
{
case Environment.DEVELOPMENT:
case Environment.TEST:
commandExecutionIOStorage(MemoryCommandExecutionIOStorage)
break

default:
commandExecutionFileSystem(FileSystemImpl) { bean ->
bean.factoryMethod = "createTempFileSystem"
bean.destroyMethod = "destroy"
}
break

default:
throw new RuntimeException("TODO")
}

commandExecutionIOStorage(FileSystemCommandExecutionIOStorage) {
commandExecutionFileSystem = ref("commandExecutionFileSystem")
commandExecutionIOStorage(FileSystemCommandExecutionIOStorage) {
commandExecutionFileSystem = ref("commandExecutionFileSystem")
}
break
}

commandsService(CommandsServiceImpl) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class CommandsController extends ControllerBase
/**
* Writes the stream requested
*
* curl -v -u "glua:password" "http://localhost:8080/console/rest/v1/glu-dev-1/command/2d044e0b-a1f5-4cbd-9210-cf42c77f6e94/stdout"
* curl -v -u "glua:password" "http://localhost:8080/console/rest/v1/glu-dev-1/command/2d044e0b-a1f5-4cbd-9210-cf42c77f6e94/streams?stdoutStream=true"
*/
def rest_show_command_execution_streams = {
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
--}%

<g:if test="${command}">
<div id="shell-${command.commandId}"><div class="shell span16"><g:link class="close" controller="agents" action="commands" id="${command.agent}">&times;</g:link><div class="cli"><span class="prompt">${command.username.encodeAsHTML()}@${command.agent.encodeAsHTML()}#</span>&nbsp;<span class="command">${command.command.encodeAsHTML()}</span> <g:if test="${command.redirectStderr}">2&gt;&amp;1 </g:if><span class="date">[<cl:formatDate time="${command.startTime}"/>]</span> <span class="exitValue">[${'$'}?=${command.exitValue?.encodeAsHTML()}]</span></div><g:if test="${!command.isExecuting}"><g:each in="['stdin', 'stderr', 'stdout']" var="streamType"><g:if test="${command.getTotalBytesCount(streamType) > 0}"><div id="${command.commandId}-${streamType}" class="${streamType}"><cl:renderCommandBytes command="${command}" streamType="${streamType}" onclick="renderCommandStream('${command.commandId}', '${streamType}', '${command.commandId}-${streamType}')"/></div></g:if></g:each><div class="cli"><span class="prompt">${command.username.encodeAsHTML()}@${command.agent.encodeAsHTML()}#</span>&nbsp;<span class="date">[<cl:formatDate time="${command.endTime}"/>]</span></div></g:if><g:else><img src="${resource(dir:'images',file:'spinner.gif')}" alt="Spinner" /></g:else></div><g:if test="${command.isExecuting}"><g:javascript>setTimeout("renderCommand('${command.commandId}', 'shell-${command.commandId}')", '${params.refreshRate ?: 2000}')</g:javascript></g:if></div>
<div id="shell-${command.commandId}"><div class="shell span16"><g:link class="close" controller="agents" action="commands" id="${command.agent}">&times;</g:link><div class="cli"><span class="prompt">${command.username.encodeAsHTML()}@${command.agent.encodeAsHTML()}#</span>&nbsp;<span class="command">${command.command.encodeAsHTML()}</span> <g:if test="${command.redirectStderr}">2&gt;&amp;1 </g:if><span class="date">[<cl:formatDate time="${command.startTime}"/>]</span> <span class="exitValue">[${'$'}?=${command.exitValue?.encodeAsHTML()}]</span></div><g:if test="${!command.isExecuting}"><g:each in="['stdin', 'stderr', 'stdout']" var="streamType"><g:if test="${command.getTotalBytesCount(streamType) > 0}"><div id="${command.commandId}-${streamType}" class="${streamType}"><cl:renderCommandBytes command="${command}" streamType="${streamType}" onclick="renderCommandStream('${command.commandId}', '${streamType}', '${command.commandId}-${streamType}')"/></div></g:if></g:each><div class="cli"><span class="prompt">${command.username.encodeAsHTML()}@${command.agent.encodeAsHTML()}#</span>&nbsp;<span class="date">[<cl:formatDate time="${command.completionTime}"/>]</span></div></g:if><g:else><img src="${resource(dir:'images',file:'spinner.gif')}" alt="Spinner" /></g:else></div><g:if test="${command.isExecuting}"><g:javascript>setTimeout("renderCommand('${command.commandId}', 'shell-${command.commandId}')", '${params.refreshRate ?: 2000}')</g:javascript></g:if></div>
</g:if>
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class CommandExecutionStorageImpl implements CommandExecutionStorage
}
else
{
execution.endTime = endTime
execution.completionTime = endTime
execution.stdoutFirstBytes = stdoutFirstBytes
execution.stdoutTotalBytesCount = stdoutTotalBytesCount
execution.stderrFirstBytes = stderrFirstBytes
Expand All @@ -105,10 +105,9 @@ public class CommandExecutionStorageImpl implements CommandExecutionStorage
@Override
Map findCommandExecutions(String fabric, String agent, def params)
{
params = GluGroovyCollectionUtils.subMap(params, ['offset', 'max', 'sort', 'order'])
params = GluGroovyCollectionUtils.subMap(params ?: [:], ['offset', 'max', 'sort', 'order'])

if(params.offset == null)
params.offset = 0
params.offset = params.offset?.toInteger() ?: 0
params.max = Math.min(params.max ? params.max.toInteger() : maxResults, maxResults)
params.sort = params.sort ?: 'startTime'
params.order = params.order ?: 'desc'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2012 Yan Pujante
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.linkedin.glu.orchestration.engine.commands

import org.linkedin.glu.commands.impl.StreamType
import org.linkedin.glu.commands.impl.CommandStreamStorage
import org.linkedin.glu.utils.io.LimitedOutputStream
import org.apache.tools.ant.util.TeeOutputStream
import org.linkedin.util.lang.MemorySize

/**
* Helper class to manage stream capture
*
* @author yan@pongasoft.com */
public class CommandExecutionStream
{
MemorySize commandExecutionFirstBytesSize
StreamType streamType
boolean captureStream
CommandStreamStorage storage
def streams

private ByteArrayOutputStream _firstBytesOutputStream
private LimitedOutputStream _limitedOutputStream
private OutputStream _stream

def capture(Closure c)
{
if(captureStream)
{
_firstBytesOutputStream =
new ByteArrayOutputStream((int) commandExecutionFirstBytesSize.sizeInBytes)
_limitedOutputStream =
new LimitedOutputStream(_firstBytesOutputStream, commandExecutionFirstBytesSize)

storage.withOrWithoutStorageOutput(streamType) { OutputStream stream ->

if(stream)
_stream = new TeeOutputStream(stream, _limitedOutputStream)
else
_stream = _limitedOutputStream

streams[streamType.multiplexName] = _stream
}
}

c(this)
}

byte[] getBytes()
{
_firstBytesOutputStream?.toByteArray()
}

Long getTotalNumberOfBytes()
{
_limitedOutputStream?.totalNumberOfBytes
}
}
Loading

0 comments on commit ecb8396

Please sign in to comment.