Skip to content

Commit

Permalink
#166: added error stream
Browse files Browse the repository at this point in the history
  • Loading branch information
ypujante committed Nov 8, 2012
1 parent b722a82 commit ee6593a
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -317,18 +317,19 @@ class AgentRestClient implements Agent
}

private def streamCommandResultsInputArgs = [
'id',
'exitValueStream',
'exitValueStreamTimeout',
'stdinStream',
'stdinOffset',
'stdinLen',
'stdoutStream',
'stdoutOffset',
'stdoutLen',
'stderrStream',
'stderrOffset',
'stderrLen',
'id',
'exitErrorStream',
'exitValueStream',
'exitValueStreamTimeout',
'stdinStream',
'stdinOffset',
'stdinLen',
'stdoutStream',
'stdoutOffset',
'stdoutLen',
'stderrStream',
'stderrOffset',
'stderrLen',
]

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.linkedin.glu.groovy.utils.concurrent.FutureTaskExecution
import java.util.concurrent.ExecutorService
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.linkedin.glu.utils.io.EmptyInputStream

/**
* @author yan@pongasoft.com */
Expand Down Expand Up @@ -64,6 +65,7 @@ public abstract class AbstractCommandStreamStorage<T extends AbstractCommandExec
{
args = GluGroovyCollectionUtils.subMap(args,
[
'exitErrorStream',
'exitValueStream',
'exitValueStreamTimeout',
'stdinStream',
Expand All @@ -84,33 +86,65 @@ public abstract class AbstractCommandStreamStorage<T extends AbstractCommandExec
Map<String, InputStream> streams = [:]

def exitValueStream = Config.getOptionalBoolean(args, 'exitValueStream', false)
def exitErrorStream = Config.getOptionalBoolean(args, 'exitErrorStream', false)

if(exitErrorStream)
{
numberOfStreams++
streams[StreamType.EXIT_ERROR.multiplexName] = EmptyInputStream.INSTANCE

// command completed => check for error
if(commandExecution.isCompleted())
{
def completionValue = commandExecution.completionValue
if(completionValue instanceof Throwable)
{
streams[StreamType.EXIT_ERROR.multiplexName] =
new InputGeneratorStream(GluGroovyLangUtils.getStackTrace(completionValue))
}
}
}

// Factory to compute the exit value
def exitValueFactory = { null }

if(exitValueStream)
numberOfStreams++

def timeout = args.exitValueStreamTimeout

if(exitValueStream && (timeout != null || commandExecution.isCompleted()))
if(exitValueStream)
{
exitValueFactory = {
try
numberOfStreams++
streams[StreamType.EXIT_VALUE.multiplexName] = EmptyInputStream.INSTANCE

// command completed => exit value only when no error
if(commandExecution.isCompleted())
{
def completionValue = commandExecution.completionValue
if(completionValue && !(completionValue instanceof Throwable))
{
return commandExecution.getExitValue(timeout).toString()
streams[StreamType.EXIT_VALUE.multiplexName] = new InputGeneratorStream(completionValue)
}
catch(TimeoutException e)
}
else
{
if(timeout != null)
{
if(log.isDebugEnabled())
log.debug("timeout reached", e)
// ok: ignored...
return null
exitValueFactory = {
try
{
return commandExecution.getExitValue(timeout).toString()
}
catch(TimeoutException e)
{
if(log.isDebugEnabled())
log.debug("timeout reached", e)
// ok: ignored...
return null
}
}
InputStream exitValueInputStream = new InputGeneratorStream(exitValueFactory)
streams[StreamType.EXIT_VALUE.multiplexName] = exitValueInputStream
}
}

InputStream exitValueInputStream = new InputGeneratorStream(exitValueFactory)
streams[StreamType.EXIT_VALUE.multiplexName] = exitValueInputStream
}

def m = [:]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public enum StreamType
STDOUT("O"),
STDERR("E"),
EXIT_VALUE("V"),
EXIT_ERROR("X"), // when exit value cannot exit
MULTIPLEXED(null);

String multiplexName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.linkedin.glu.console.domain.AuditLog
import org.linkedin.util.codec.Codec
import org.linkedin.util.codec.Base64Codec
import org.linkedin.groovy.util.json.JsonUtils
import org.linkedin.util.lang.LangUtils

/**
* Base class for controllers
Expand Down Expand Up @@ -69,11 +70,7 @@ class ControllerBase
protected def flashException(message, throwable)
{
flash.error = message
def sw = new StringWriter()
sw.withPrintWriter {
throwable.printStackTrace(it)
}
flash.stackTrace = sw.toString()
flash.stackTrace = LangUtils.getStackTrace(throwable)
}

protected def flashException(throwable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.linkedin.util.annotations.Initializable
import org.linkedin.glu.groovy.utils.collections.GluGroovyCollectionUtils
import org.linkedin.util.lang.MemorySize
import org.linkedin.glu.utils.io.LimitedOutputStream
import org.linkedin.util.lang.LangUtils

/**
* @author yan@pongasoft.com */
Expand Down Expand Up @@ -130,17 +131,13 @@ public class CommandExecutionStorageImpl implements CommandExecutionStorage
Long stderrTotalBytesCount,
Throwable exception)
{
def baos = new ByteArrayOutputStream()

def os = new PrintStream(new LimitedOutputStream(baos, stackTraceMaxSize))
os.withStream { exception.printStackTrace(it) }
endExecution(commandId,
endTime,
stdoutFirstBytes,
stdoutTotalBytesCount,
stderrFirstBytes,
stderrTotalBytesCount,
new String(baos.toByteArray()),
LangUtils.getStackTrace(exception),
true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.linkedin.util.annotations.Initializer
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.linkedin.glu.groovy.utils.collections.GluGroovyCollectionUtils
import org.linkedin.util.lang.LangUtils

/**
* Mostly used for testing purposes...
Expand Down Expand Up @@ -137,17 +138,13 @@ public class MemoryCommandExecutionStorage implements CommandExecutionStorage
Long stderrTotalBytesCount,
Throwable exception)
{
def baos = new ByteArrayOutputStream()

new PrintStream(baos).withStream { exception.printStackTrace(it) }

endExecution(commandId,
endTime,
stdoutFirstBytes,
stdoutTotalBytesCount,
stderrFirstBytes,
stderrTotalBytesCount,
new String(baos.toByteArray()),
LangUtils.getStackTrace(exception),
true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.linkedin.glu.commands.impl.CommandExecution
import org.linkedin.util.lang.MemorySize
import org.linkedin.glu.groovy.utils.concurrent.FutureTaskExecution
import java.util.concurrent.CancellationException
import org.linkedin.glu.utils.io.NullOutputStream

/**
* @author yan@pongasoft.com */
Expand Down Expand Up @@ -182,6 +183,7 @@ public class TestCommandsServiceImpl extends GroovyTestCase
assertNull(dbCommandExecution.stderrFirstBytes)
assertNull(dbCommandExecution.stderrTotalBytesCount)
assertTrue(dbCommandExecution.isExecuting)
assertFalse(dbCommandExecution.isException)

// test bulk methods
def ces = service.findCommandExecutions(f1, null, null)
Expand Down Expand Up @@ -233,6 +235,7 @@ public class TestCommandsServiceImpl extends GroovyTestCase
assertEquals(10, dbCommandExecution.stderrTotalBytesCount)
assertEquals("14", dbCommandExecution.exitValue)
assertFalse(dbCommandExecution.isExecuting)
assertFalse(dbCommandExecution.isException)
}
}

Expand Down Expand Up @@ -456,6 +459,7 @@ public class TestCommandsServiceImpl extends GroovyTestCase
assertNull(dbCommandExecution.stderrFirstBytes)
assertNull(dbCommandExecution.stderrTotalBytesCount)
assertTrue(dbCommandExecution.isExecuting)
assertFalse(dbCommandExecution.isException)

// test bulk methods
def ces = service.findCommandExecutions(f1, null, null)
Expand Down Expand Up @@ -509,8 +513,9 @@ public class TestCommandsServiceImpl extends GroovyTestCase
assertNull(dbCommandExecution.stdoutTotalBytesCount)
assertNull(dbCommandExecution.stderrFirstBytes)
assertNull(dbCommandExecution.stderrTotalBytesCount)
assertNull(dbCommandExecution.exitValue)
assertEquals(GluGroovyLangUtils.getStackTrace(exception), dbCommandExecution.exitValue)
assertFalse(dbCommandExecution.isExecuting)
assertTrue(dbCommandExecution.isException)
}
}

Expand Down Expand Up @@ -569,8 +574,6 @@ public class TestCommandsServiceImpl extends GroovyTestCase

long completionTime = clock.currentTimeMillis()

assertTrue(service.interruptCommand(f1, "a1", cid))

// get the results
service.withCommandExecutionAndWithOrWithoutStreams(f1,
cid,
Expand All @@ -579,21 +582,19 @@ public class TestCommandsServiceImpl extends GroovyTestCase
exitValueStreamTimeout: 0
]) { args ->

// at this stage the exit value stream is blocking: reading the stream will block
// and throw an exception => interrupting the process
assertTrue(service.interruptCommand(f1, "a1", cid))

shouldFailWithCause(CancellationException) {
MultiplexedInputStream.demultiplexToString(args.stream,
[
StreamType.EXIT_VALUE.multiplexName,
] as Set,
null)
NullOutputStream.INSTANCE << args.stream
}
}

agentsServiceMock.verify(agentsService)

CommandExecution ce = ioStorage.findCommandExecution(cid)
assertTrue(ce.isCompleted())
def baos = new ByteArrayOutputStream()
new PrintStream(baos).withStream { ce.completionValue.printStackTrace(it) }

assertNull("command is completed", service._currentCommandExecutions[cid])
DbCommandExecution dbCommandExecution = service.findCommandExecution(f1, cid)
Expand All @@ -611,27 +612,31 @@ public class TestCommandsServiceImpl extends GroovyTestCase
assertNull(dbCommandExecution.stdoutTotalBytesCount)
assertNull(dbCommandExecution.stderrFirstBytes)
assertNull(dbCommandExecution.stderrTotalBytesCount)
assertEquals(new String(baos.toByteArray()), dbCommandExecution.exitValue)
assertEquals(GluGroovyLangUtils.getStackTrace(ce.completionValue), dbCommandExecution.exitValue)
assertTrue(dbCommandExecution.isException)
assertFalse(dbCommandExecution.isExecuting)

// now that the command is complete...
service.withCommandExecutionAndWithOrWithoutStreams(f1,
cid,
[
exitValueStream: true,
exitValueStreamTimeout: 0
exitValueStream: true
]) { args ->
dbCommandExecution = args.commandExecution
assertTrue(dbCommandExecution.isException)

shouldFailWithCause(CancellationException) {
MultiplexedInputStream.demultiplexToString(args.stream,
[
StreamType.EXIT_VALUE.multiplexName,
] as Set,
null)
}
// now that the execution is complete... there is no exit value so it should not fail
assertEquals("", args.stream.text)
}

// now that the command is complete...
service.withCommandExecutionAndWithOrWithoutStreams(f1,
cid,
[
exitErrorStream: true
]) { args ->
// now that the execution is complete... we should get the error stream
assertEquals(GluGroovyLangUtils.getStackTrace(ce.completionValue), args.stream.text)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import org.linkedin.glu.utils.core.Sizeable
* @author yan@pongasoft.com */
public class InputGeneratorStream extends InputStream implements Sizeable
{
private final Closure _inputStreamFactory
private final def _inputStreamFactory

private InputStream _inputStream = null
private volatile boolean _isClosed = false
private volatile long _size = -1

InputGeneratorStream(Closure inputStreamFactory)
InputGeneratorStream(def inputStreamFactory)
{
_inputStreamFactory = inputStreamFactory
}
Expand Down Expand Up @@ -99,7 +99,7 @@ public class InputGeneratorStream extends InputStream implements Sizeable
{
try
{
def input = _inputStreamFactory()
def input = _inputStreamFactory instanceof Closure ? _inputStreamFactory() : _inputStreamFactory

if(input == null)
{
Expand Down

0 comments on commit ee6593a

Please sign in to comment.