diff --git a/agent/org.linkedin.glu.agent-api/src/main/groovy/org/linkedin/glu/agent/api/Agent.groovy b/agent/org.linkedin.glu.agent-api/src/main/groovy/org/linkedin/glu/agent/api/Agent.groovy index 70ecc18b..bd774dab 100644 --- a/agent/org.linkedin.glu.agent-api/src/main/groovy/org/linkedin/glu/agent/api/Agent.groovy +++ b/agent/org.linkedin.glu.agent-api/src/main/groovy/org/linkedin/glu/agent/api/Agent.groovy @@ -315,6 +315,13 @@ public interface Agent */ def streamCommandResults(def args) throws NoSuchCommandException, AgentException + /** + * Interrupts the command. + * + * @param args.id the id of the command to interrupt + * @return true if the command was interrupted properly or false if + * there was no such command or already completed + */ boolean interruptCommand(args) throws AgentException /******************************************************************** diff --git a/agent/org.linkedin.glu.agent-impl/src/main/groovy/org/linkedin/glu/agent/impl/command/CommandManagerImpl.groovy b/agent/org.linkedin.glu.agent-impl/src/main/groovy/org/linkedin/glu/agent/impl/command/CommandManagerImpl.groovy index cfeb67d8..448ae9e2 100644 --- a/agent/org.linkedin.glu.agent-impl/src/main/groovy/org/linkedin/glu/agent/impl/command/CommandManagerImpl.groovy +++ b/agent/org.linkedin.glu.agent-impl/src/main/groovy/org/linkedin/glu/agent/impl/command/CommandManagerImpl.groovy @@ -139,7 +139,7 @@ public class CommandManagerImpl implements CommandManager try { - def res = commandExecution.waitForCompletion(args.timeout) + def res = commandExecution.getExitValue(args.timeout) commandExecution.log.info("waitForCommand(${GluGroovyCollectionUtils.xorMap(args, ['id'])}): ${res}") return res } @@ -173,7 +173,7 @@ public class CommandManagerImpl implements CommandManager if(commandExecution) { res = commandExecution.interruptExecution() - commandExecution.log.info("interruptCommand(${GluGroovyCollectionUtils.xorMap(args, ['id'])})") + commandExecution.log.info("interruptCommand(${GluGroovyCollectionUtils.xorMap(args, ['id'])}): ${res}") } else { diff --git a/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/CommandExecution.groovy b/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/CommandExecution.groovy index f90443e8..22910c5b 100644 --- a/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/CommandExecution.groovy +++ b/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/CommandExecution.groovy @@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService import org.linkedin.glu.groovy.utils.concurrent.FutureTaskExecution import org.slf4j.Logger import org.linkedin.groovy.util.config.Config +import java.util.concurrent.TimeoutException /** * @author yan@pongasoft.com */ @@ -78,15 +79,18 @@ public class CommandExecution completionTime > 0 } - def waitForCompletion(def timeout) + void waitForCompletion(def timeout) { - try + if(!isCompleted()) { - futureExecution.get(timeout) - } - catch(ExecutionException e) - { - throw e.cause + try + { + futureExecution.get(timeout) + } + catch(ExecutionException e) + { + // ok we just want to wait until the command completes but no more than the timeout + } } } @@ -98,9 +102,25 @@ public class CommandExecution def getExitValue() { if(isCompleted()) + { + try + { + futureExecution.get() + } + catch(ExecutionException e) + { + throw e.cause + } + } + else + return null + } + + def getExitValue(timeout) + { try { - futureExecution.get() + futureExecution.get(timeout) } catch(ExecutionException e) { @@ -108,7 +128,33 @@ public class CommandExecution } } - def getExitValue(timeout) + /** + * Completion value either return the result of the call if succeeded or the exception + * if an exception was thrown. Does not throw an exception! Does not wait! + */ + def getCompletionValue() + { + if(isCompleted()) + { + try + { + futureExecution.get() + } + catch(ExecutionException e) + { + e.cause + } + } + else + return null + } + + /** + * Completion value either return the result of the call if succeeded or the exception + * if an exception was thrown. Throws only the TimeoutException if cannot get + * a result in the timeout provided + */ + def getCompletionValue(timeout) throws TimeoutException { try { @@ -116,7 +162,7 @@ public class CommandExecution } catch(ExecutionException e) { - throw e.cause + e.cause } } diff --git a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/agents/AgentsService.groovy b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/agents/AgentsService.groovy index 5ee43597..574127db 100644 --- a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/agents/AgentsService.groovy +++ b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/agents/AgentsService.groovy @@ -92,4 +92,14 @@ interface AgentsService args, Closure commandResultProcessor) + /** + * Interrupts the command. + * + * @param args.id the id of the command to interrupt + * @return true if the command was interrupted properly or false if + * there was no such command or already completed + */ + boolean interruptCommand(Fabric fabric, + String agentName, + args) } diff --git a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/agents/AgentsServiceImpl.groovy b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/agents/AgentsServiceImpl.groovy index 62bba98f..2fe1065c 100644 --- a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/agents/AgentsServiceImpl.groovy +++ b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/agents/AgentsServiceImpl.groovy @@ -248,6 +248,14 @@ class AgentsServiceImpl implements AgentsService, AgentURIProvider } } + @Override + boolean interruptCommand(Fabric fabric, String agentName, def args) + { + withRemoteAgent(fabric, agentName) { Agent agent -> + agent.interruptCommand(args) + } as boolean + } + /** * Create the system entry for the given agent and mountpoint. */ diff --git a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/agents/AuditedAgentsService.groovy b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/agents/AuditedAgentsService.groovy index eccca0e8..e706f356 100644 --- a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/agents/AuditedAgentsService.groovy +++ b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/agents/AuditedAgentsService.groovy @@ -115,4 +115,11 @@ public class AuditedAgentsService implements AgentsService res?.id?.toString()) } } + + @Override + boolean interruptCommand(Fabric fabric, String agentName, def args) + { + auditLogService.audit('agent.interruptCommand', "${agentName} / ${fabric.name} / ${args.id}") + agentsService.interruptCommand(fabric, agentName, args) + } } \ No newline at end of file diff --git a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStorage.groovy b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStorage.groovy index 315e0288..13bb304e 100644 --- a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStorage.groovy +++ b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStorage.groovy @@ -41,6 +41,14 @@ public interface CommandExecutionStorage Long stderrTotalBytesCount, String exitValue) + DbCommandExecution endExecution(String commandId, + long endTime, + byte[] stdoutFirstBytes, + Long stdoutTotalBytesCount, + byte[] stderrFirstBytes, + Long stderrTotalBytesCount, + Throwable exception) + DbCommandExecution findCommandExecution(String fabric, String commandId) Map findCommandExecutions(String fabric, String agent, def params) diff --git a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStorageImpl.groovy b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStorageImpl.groovy index 28ed656e..0635de51 100644 --- a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStorageImpl.groovy +++ b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStorageImpl.groovy @@ -21,6 +21,8 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import org.linkedin.util.annotations.Initializable import org.linkedin.glu.groovy.utils.collections.GluGroovyCollectionUtils +import org.linkedin.util.lang.MemorySize +import org.linkedin.glu.utils.io.LimitedOutputStream /** * @author yan@pongasoft.com */ @@ -32,6 +34,9 @@ public class CommandExecutionStorageImpl implements CommandExecutionStorage @Initializable int maxResults = 25 + @Initializable + MemorySize stackTraceMaxSize = MemorySize.parse('255') + @Override DbCommandExecution startExecution(String fabric, String agent, @@ -71,6 +76,25 @@ public class CommandExecutionStorageImpl implements CommandExecutionStorage byte[] stderrFirstBytes, Long stderrTotalBytesCount, String exitValue) + { + endExecution(commandId, + endTime, + stdoutFirstBytes, + stdoutTotalBytesCount, + stderrFirstBytes, + stderrTotalBytesCount, + exitValue, + false) + } + + DbCommandExecution endExecution(String commandId, + long endTime, + byte[] stdoutFirstBytes, + Long stdoutTotalBytesCount, + byte[] stderrFirstBytes, + Long stderrTotalBytesCount, + String exitValue, + boolean isException) { DbCommandExecution.withTransaction { DbCommandExecution execution = DbCommandExecution.findByCommandId(commandId) @@ -86,6 +110,7 @@ public class CommandExecutionStorageImpl implements CommandExecutionStorage execution.stderrFirstBytes = stderrFirstBytes execution.stderrTotalBytesCount = stderrTotalBytesCount execution.exitValue = exitValue + execution.isException = isException if(!execution.save()) { @@ -96,6 +121,29 @@ public class CommandExecutionStorageImpl implements CommandExecutionStorage } } + @Override + DbCommandExecution endExecution(String commandId, + long endTime, + byte[] stdoutFirstBytes, + Long stdoutTotalBytesCount, + byte[] stderrFirstBytes, + Long stderrTotalBytesCount, + Throwable exception) + { + def baos = new ByteArrayOutputStream() + + def os = new PrintStream(new LimitedOutputStream(baos, stackTraceMaxSize)) + os.withStream { exception.printStackTrace(it) } + endExecution(commandId, + endTime, + stdoutFirstBytes, + stdoutTotalBytesCount, + stderrFirstBytes, + stderrTotalBytesCount, + new String(baos.toByteArray()), + true) + } + @Override DbCommandExecution findCommandExecution(String fabric, String commandId) { diff --git a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandsService.groovy b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandsService.groovy index cb613def..dfca2bea 100644 --- a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandsService.groovy +++ b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandsService.groovy @@ -59,4 +59,15 @@ public interface CommandsService DbCommandExecution findCommandExecution(Fabric fabric, String commandId) Map findCommandExecutions(Fabric fabric, String agentName, def params) + + /** + * Interrupts the command. + * + * @param args.id the id of the command to interrupt + * @return true if the command was interrupted properly or false if + * there was no such command or already completed + */ + boolean interruptCommand(Fabric fabric, + String agentName, + String commandId) } diff --git a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandsServiceImpl.groovy b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandsServiceImpl.groovy index fe1b9606..705fdf1c 100644 --- a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandsServiceImpl.groovy +++ b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandsServiceImpl.groovy @@ -93,6 +93,13 @@ public class CommandsServiceImpl implements CommandsService @Initializable Timespan defaultSynchronousWaitTimeout = Timespan.parse('1s') + /** + * This timeout represents how long to we are willing to wait for the command to wait for the + * interrupt to propagate before we interrupt it on this side + */ + @Initializable + Timespan defaultInterruptTimeout = Timespan.parse('5s') + /** * The commands that are currently executing */ private final Map> _currentCommandExecutions = [:] @@ -169,6 +176,36 @@ public class CommandsServiceImpl implements CommandsService return command.id } + @Override + boolean interruptCommand(Fabric fabric, String agentName, String commandId) + { + // first we interrupt the command "remotely" + def res = agentsService.interruptCommand(fabric, agentName, [id: commandId]) + + CommandExecution commandExecution + + synchronized(_currentCommandExecutions) + { + commandExecution = _currentCommandExecutions[commandId] + } + + if(commandExecution) + { + try + { + // we wait for the interrupt to propagate back + commandExecution.waitForCompletion(defaultInterruptTimeout) + } + catch(TimeoutException e) + { + // not completed yet... forcing interrupting on this side + res &= commandExecution.interruptExecution() + } + } + + return res + } + /** * Executes the command asynchronously */ @@ -244,8 +281,27 @@ public class CommandsServiceImpl implements CommandsService // this will demultiplex the result DemultiplexedOutputStream dos = new DemultiplexedOutputStream(streams) - dos.withStream { OutputStream os -> - onResultStreamAvailable(id: command.id, stream: new TeeInputStream(res.stream, os)) + try + { + dos.withStream { OutputStream os -> + onResultStreamAvailable(id: command.id, stream: new TeeInputStream(res.stream, os)) + } + } + catch(Throwable th) + { + long completionTime = clock.currentTimeMillis() + + GroovyLangUtils.noException { + commandExecutionStorage.endExecution(command.id, + completionTime, + stdout.bytes, + stdout.totalNumberOfBytes, + stderr.bytes, + stderr.totalNumberOfBytes, + th) + } + + return [completionTime: completionTime, exception: th] } long completionTime = clock.currentTimeMillis() @@ -257,7 +313,7 @@ public class CommandsServiceImpl implements CommandsService stdout.totalNumberOfBytes, stderr.bytes, stderr.totalNumberOfBytes, - toString(exitValueStream)).exitValue + (String) toString(exitValueStream)).exitValue return [exitValue: exitValue, completionTime: completionTime] } @@ -275,7 +331,7 @@ public class CommandsServiceImpl implements CommandsService null, null, null, - null) + th) } return [completionTime: completionTime, exception: th] @@ -363,7 +419,7 @@ public class CommandsServiceImpl implements CommandsService def args, Closure closure) { - def commandExecution = commandExecutionStorage.findCommandExecution(fabric.name, commandId) + def commandExecution = findCommandExecution(fabric, commandId) if(commandExecution?.fabric != fabric.name) throw new NoSuchCommandExecutionException(commandId) @@ -371,7 +427,7 @@ public class CommandsServiceImpl implements CommandsService commandExecutionIOStorage.withOrWithoutCommandExecutionAndStreams(commandId, args) { m -> if(!m) throw new NoSuchCommandExecutionException(commandId) - closure(m) + closure([commandExecution: commandExecution, stream: m.stream]) } } diff --git a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/DbCommandExecution.groovy b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/DbCommandExecution.groovy index 4f8a6efd..29035bf4 100644 --- a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/DbCommandExecution.groovy +++ b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/DbCommandExecution.groovy @@ -143,6 +143,11 @@ public class DbCommandExecution */ String exitValue + /** + * When an exception is generated, then the exit value contains the exception + */ + boolean isException + byte[] getFirstBytes(def streamType) { this."${streamType.toString().toLowerCase()}FirstBytes" diff --git a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/MemoryCommandExecutionStorage.groovy b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/MemoryCommandExecutionStorage.groovy index 8022b76d..39386002 100644 --- a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/MemoryCommandExecutionStorage.groovy +++ b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/MemoryCommandExecutionStorage.groovy @@ -82,13 +82,32 @@ public class MemoryCommandExecutionStorage implements CommandExecutionStorage } @Override + DbCommandExecution endExecution(String commandId, + long endTime, + byte[] stdoutFirstBytes, + Long stdoutTotalBytesCount, + byte[] stderrFirstBytes, + Long stderrTotalBytesCount, + String exitValue) + { + endExecution(commandId, + endTime, + stdoutFirstBytes, + stdoutTotalBytesCount, + stderrFirstBytes, + stderrTotalBytesCount, + exitValue, + false) + } + synchronized DbCommandExecution endExecution(String commandId, long endTime, byte[] stdoutFirstBytes, Long stdoutTotalBytesCount, byte[] stderrFirstBytes, Long stderrTotalBytesCount, - String exitValue) + String exitValue, + boolean isException) { DbCommandExecution execution = memory[commandId] if(!execution) @@ -103,11 +122,35 @@ public class MemoryCommandExecutionStorage implements CommandExecutionStorage execution.stderrFirstBytes = stderrFirstBytes execution.stderrTotalBytesCount = stderrTotalBytesCount execution.exitValue = exitValue + execution.isException = isException } return execution } + @Override + DbCommandExecution endExecution(String commandId, + long endTime, + byte[] stdoutFirstBytes, + Long stdoutTotalBytesCount, + byte[] stderrFirstBytes, + Long stderrTotalBytesCount, + Throwable exception) + { + def baos = new ByteArrayOutputStream() + + new PrintStream(baos).withStream { exception.printStackTrace(it) } + + endExecution(commandId, + endTime, + stdoutFirstBytes, + stdoutTotalBytesCount, + stderrFirstBytes, + stderrTotalBytesCount, + new String(baos.toByteArray()), + true) + } + @Override synchronized DbCommandExecution findCommandExecution(String fabric, String commandId) { diff --git a/orchestration/org.linkedin.glu.orchestration-engine/src/test/groovy/test/orchestration/engine/commands/TestCommandsServiceImpl.groovy b/orchestration/org.linkedin.glu.orchestration-engine/src/test/groovy/test/orchestration/engine/commands/TestCommandsServiceImpl.groovy index 9a0b0a95..683c372c 100644 --- a/orchestration/org.linkedin.glu.orchestration-engine/src/test/groovy/test/orchestration/engine/commands/TestCommandsServiceImpl.groovy +++ b/orchestration/org.linkedin.glu.orchestration-engine/src/test/groovy/test/orchestration/engine/commands/TestCommandsServiceImpl.groovy @@ -37,6 +37,8 @@ import org.linkedin.util.clock.Timespan import org.linkedin.glu.orchestration.engine.commands.DbCommandExecution import org.linkedin.glu.commands.impl.CommandExecution import org.linkedin.util.lang.MemorySize +import org.linkedin.glu.groovy.utils.concurrent.FutureTaskExecution +import java.util.concurrent.CancellationException /** * @author yan@pongasoft.com */ @@ -512,6 +514,128 @@ public class TestCommandsServiceImpl extends GroovyTestCase } } + /** + * Make sure that interrupts work as advertised! + */ + public void testInterrupt() + { + def tc = new ThreadControl(Timespan.parse('30s')) + + def f1 = new Fabric(name: "f1") + + withAuthorizationService { + + final def agentsServiceMock = new MockFor(AgentsService) + + final def lock = new Object() + + def simulateBlockingCommand = { + synchronized(lock) + { + tc.blockWithException("simulateBlockingCommand.start") + lock.wait() + } + } + + def future = new FutureTaskExecution(simulateBlockingCommand) + future.clock = clock + + // executeShellCommand + agentsServiceMock.demand.executeShellCommand { fabric, agentName, args -> + clock.addDuration(Timespan.parse("10s")) + future.runSync() + } + + // interrupt command + agentsServiceMock.demand.interruptCommand { fabric, agentName, args -> + future.cancel(true) + } + + AgentsService agentsService = agentsServiceMock.proxyInstance() + service.agentsService = agentsService + + long startTime = clock.currentTimeMillis() + + // execute the shell command + String cid = service.executeShellCommand(f1, + "a1", + [ + command: "uptime", + redirectStderr: true + ]) + + // we wait until the shell command is "executing" + tc.waitForBlock("simulateBlockingCommand.start") + + long completionTime = clock.currentTimeMillis() + + assertTrue(service.interruptCommand(f1, "a1", cid)) + + // get the results + service.withCommandExecutionAndWithOrWithoutStreams(f1, + cid, + [ + exitValueStream: true, + exitValueStreamTimeout: 0 + ]) { args -> + + shouldFailWithCause(CancellationException) { + MultiplexedInputStream.demultiplexToString(args.stream, + [ + StreamType.EXIT_VALUE.multiplexName, + ] as Set, + null) + } + } + + agentsServiceMock.verify(agentsService) + + CommandExecution ce = ioStorage.findCommandExecution(cid) + assertTrue(ce.isCompleted()) + def baos = new ByteArrayOutputStream() + new PrintStream(baos).withStream { ce.completionValue.printStackTrace(it) } + + assertNull("command is completed", service._currentCommandExecutions[cid]) + DbCommandExecution dbCommandExecution = service.findCommandExecution(f1, cid) + assertEquals(cid, dbCommandExecution.commandId) + assertEquals("uptime", dbCommandExecution.command) + assertTrue(dbCommandExecution.redirectStderr) + assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) + assertEquals(startTime, dbCommandExecution.startTime) + assertEquals(completionTime, dbCommandExecution.completionTime) + assertEquals(f1.name, dbCommandExecution.fabric) + assertEquals("a1", dbCommandExecution.agent) + assertNull(dbCommandExecution.stdinFirstBytes) + assertNull(dbCommandExecution.stdinTotalBytesCount) + assertNull(dbCommandExecution.stdoutFirstBytes) + assertNull(dbCommandExecution.stdoutTotalBytesCount) + assertNull(dbCommandExecution.stderrFirstBytes) + assertNull(dbCommandExecution.stderrTotalBytesCount) + assertEquals(new String(baos.toByteArray()), dbCommandExecution.exitValue) + assertTrue(dbCommandExecution.isException) + assertFalse(dbCommandExecution.isExecuting) + + // now that the command is complete... + service.withCommandExecutionAndWithOrWithoutStreams(f1, + cid, + [ + exitValueStream: true, + exitValueStreamTimeout: 0 + ]) { args -> + dbCommandExecution = args.commandExecution + assertTrue(dbCommandExecution.isException) + + shouldFailWithCause(CancellationException) { + MultiplexedInputStream.demultiplexToString(args.stream, + [ + StreamType.EXIT_VALUE.multiplexName, + ] as Set, + null) + } + } + } + } + private void withAuthorizationService(Closure closure) { def authorizationServiceMock = new MockFor(AuthorizationService) diff --git a/utils/org.linkedin.glu.utils/src/main/groovy/org/linkedin/glu/groovy/utils/concurrent/FutureTaskExecution.groovy b/utils/org.linkedin.glu.utils/src/main/groovy/org/linkedin/glu/groovy/utils/concurrent/FutureTaskExecution.groovy index 40655684..1f08e0cd 100644 --- a/utils/org.linkedin.glu.utils/src/main/groovy/org/linkedin/glu/groovy/utils/concurrent/FutureTaskExecution.groovy +++ b/utils/org.linkedin.glu.utils/src/main/groovy/org/linkedin/glu/groovy/utils/concurrent/FutureTaskExecution.groovy @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService import java.util.concurrent.FutureTask import org.linkedin.glu.groovy.utils.GluGroovyLangUtils +import org.linkedin.util.clock.SystemClock /** * @author yan@pongasoft.com */ @@ -51,7 +52,7 @@ public class FutureTaskExecution implements FutureExecution, Callable /** * The clock */ - Clock clock + Clock clock = SystemClock.instance() /** * The actual future task on which all the calls will be delegated to implement the diff --git a/utils/org.linkedin.glu.utils/src/main/groovy/org/linkedin/glu/groovy/utils/io/InputGeneratorStream.groovy b/utils/org.linkedin.glu.utils/src/main/groovy/org/linkedin/glu/groovy/utils/io/InputGeneratorStream.groovy index 9de014ad..d9aef8d8 100644 --- a/utils/org.linkedin.glu.utils/src/main/groovy/org/linkedin/glu/groovy/utils/io/InputGeneratorStream.groovy +++ b/utils/org.linkedin.glu.utils/src/main/groovy/org/linkedin/glu/groovy/utils/io/InputGeneratorStream.groovy @@ -128,10 +128,6 @@ public class InputGeneratorStream extends InputStream implements Sizeable { throw e } - catch(RuntimeException e) - { - throw e - } catch(Throwable th) { throw new IOException("issue while generating the input stream", th) diff --git a/utils/org.linkedin.glu.utils/src/main/java/org/linkedin/glu/utils/exceptions/MultipleExceptions.java b/utils/org.linkedin.glu.utils/src/main/java/org/linkedin/glu/utils/exceptions/MultipleExceptions.java index ba1a0c84..c135e17d 100644 --- a/utils/org.linkedin.glu.utils/src/main/java/org/linkedin/glu/utils/exceptions/MultipleExceptions.java +++ b/utils/org.linkedin.glu.utils/src/main/java/org/linkedin/glu/utils/exceptions/MultipleExceptions.java @@ -30,26 +30,16 @@ public class More extends RuntimeException { private static final long serialVersionUID = 1L; - private final Throwable _throwable; - private final int _exceptionIndex; + public More(String message) + { + super(message); + } public More(int exceptionIndex, Throwable throwable) { super(computeMessage(exceptionIndex, _causes.size(), throwable)); - _throwable = throwable; - _exceptionIndex = exceptionIndex; setStackTrace(throwable.getStackTrace()); } - - public Throwable getThrowable() - { - return _throwable; - } - - public int getExceptionIndex() - { - return _exceptionIndex; - } } private static String computeMessage(int exceptionIndex, int totalCount, Throwable more) @@ -104,7 +94,7 @@ public Collection getCauses() */ public static void throwIfExceptions(Collection causes) throws Throwable { - throwIfExceptions(null, causes); + throwIfExceptions((String) null, causes); } /** @@ -118,12 +108,26 @@ public static void throwIfExceptions(String message, throw throwable; } + /** + * Convenient call to throw a multiple exception or not depending on the collection + * + * @param rootCause the exception that will be thrown in the end (if causes) and + * attach the multi exception as the cause + */ + public static void throwIfExceptions(Throwable rootCause, + Collection causes) throws Throwable + { + Throwable throwable = createIfExceptions(rootCause, causes); + if(throwable != null) + throw throwable; + } + /** * Convenient call to create a multiple exception or not depending on the collection */ public static Throwable createIfExceptions(Collection causes) { - return createIfExceptions(null, causes); + return createIfExceptions((String) null, causes); } /** @@ -143,4 +147,23 @@ public static Throwable createIfExceptions(String message, return new MultipleExceptions(message, causes); } + + /** + * Convenient call to create a multiple exception or not depending on the collection + * + * @param rootCause the exception that will be returned in the end (if causes) and + * attach the multi exception as the cause + */ + public static T createIfExceptions(T rootCause, + Collection causes) + { + Throwable throwable = createIfExceptions(rootCause.getMessage(), causes); + if(throwable != null) + { + rootCause.initCause(throwable); + return rootCause; + } + else + return null; + } } diff --git a/utils/org.linkedin.glu.utils/src/main/java/org/linkedin/glu/utils/io/MultiplexedInputStream.java b/utils/org.linkedin.glu.utils/src/main/java/org/linkedin/glu/utils/io/MultiplexedInputStream.java index 0f6b3fe5..4deb5c40 100644 --- a/utils/org.linkedin.glu.utils/src/main/java/org/linkedin/glu/utils/io/MultiplexedInputStream.java +++ b/utils/org.linkedin.glu.utils/src/main/java/org/linkedin/glu/utils/io/MultiplexedInputStream.java @@ -322,19 +322,10 @@ public int read(byte[] b, int off, int len) throws IOException if(_closed) throw new IOException("closed"); - // some exceptions were generated... - Throwable th = - MultipleExceptions.createIfExceptions("Exceptions while processing some input streams", - _exceptions); - if(th != null) - { - if(th instanceof IOException) - throw (IOException) th; - else - { - throw new IOException(th); - } - } + // if some exceptions were generated... + if(!_exceptions.isEmpty()) + throw MultipleExceptions.createIfExceptions(new IOException("Exceptions while reading input streams"), + _exceptions); // nothing else to read... reach end of all streams! if(_multiplexedBuffer.position() == 0) @@ -390,15 +381,15 @@ public void close() throws IOException { synchronized(_multiplexedBuffer) { + if(_closed) + return; + _closed = true; // notify everybody that this stream is closed _multiplexedBuffer.notifyAll(); } - // this is a very "verbose" piece of code that simply calls close on each channel - // while ensuring it gets called for all of them even if an exception is thrown and if - // one is thrown, then the first one gets propagated - Throwable throwable = null; + Collection exceptions = new ArrayList(); for(ChannelReaderCallable channelReader : _channelReaders) { @@ -408,35 +399,13 @@ public void close() throws IOException } catch(Throwable e) { - if(throwable != null) - throwable = e; - else - { - if(log.isDebugEnabled()) - log.debug("ignored exception", e); - } + exceptions.add(e); } } - if(throwable != null) - { - try - { - throw throwable; - } - catch(IOException e) - { - throw e; - } - catch(RuntimeException e) - { - throw e; - } - catch(Throwable e) - { - throw new IOException(e); - } - } + if(!exceptions.isEmpty()) + throw MultipleExceptions.createIfExceptions(new IOException("Issue while closing the channels"), + exceptions); } /** @@ -470,7 +439,14 @@ public int getMinSize() private void close() throws IOException { - _channel.close(); + try + { + _channel.close(); + } + catch(IOException e) + { + throw new IOException("Error while closing stream: [" + _name + "]", e); + } } /** @@ -534,7 +510,7 @@ public Long call() throws Exception synchronized(_multiplexedBuffer) { // no need to call notifyAll: the finally block will take care of it... - _exceptions.add(th); + _exceptions.add(new IOException("Exception detected while reading stream: [" + _name + "]", th)); } } finally