From 83b624e26593aa9185f29600514b06616bde49b1 Mon Sep 17 00:00:00 2001 From: Yan Pujante Date: Wed, 14 Nov 2012 15:50:56 -1000 Subject: [PATCH] #166: implemented commands configuration for console --- ...FileSystemCommandExecutionIOStorage.groovy | 4 +- ...FileSystemCommandExecutionIOStorage.groovy | 8 +- .../resources/conf/glu-console-webapp.groovy | 19 + .../build.gradle | 1 + .../grails-app/conf/BootStrap.groovy | 2 +- .../grails-app/conf/Config.groovy | 6 + .../grails-app/conf/spring/resources.groovy | 25 +- .../controllers/AgentsController.groovy | 1 - .../controllers/CommandsController.groovy | 1 - .../console/controllers/PlanController.groovy | 1 - .../glu/grails/utils/ConsoleConfig.groovy | 12 +- .../grails-app/views/commands/_command.gsp | 2 +- .../commands/CommandExecutionStream.groovy | 6 +- .../commands/CommandsServiceImpl.groovy | 4 + .../commands/TestCommandsServiceImpl.groovy | 1001 +++++++++-------- .../utils/test/GluGroovyTestUtils.groovy | 131 +++ 16 files changed, 718 insertions(+), 506 deletions(-) create mode 100644 utils/org.linkedin.glu.utils/src/main/groovy/org/linkedin/glu/groovy/utils/test/GluGroovyTestUtils.groovy diff --git a/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/FileSystemCommandExecutionIOStorage.groovy b/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/FileSystemCommandExecutionIOStorage.groovy index 1638a0c8..89fe5c2c 100644 --- a/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/FileSystemCommandExecutionIOStorage.groovy +++ b/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/FileSystemCommandExecutionIOStorage.groovy @@ -183,10 +183,10 @@ public class FileSystemCommandExecutionIOStorage extends AbstractCommandExecutio if(exception != null) { if(exception instanceof Throwable) - args.exception = GluGroovyJsonUtils.exceptionToJSON(exception) + args.exception = GluGroovyJsonUtils.extractFullStackTrace(exception) else { - args.exception = exception.toString() + args.exception = GluGroovyJsonUtils.fromJSON(exception.toString()) exception = GluGroovyJsonUtils.rebuildException(args.exception) } } diff --git a/commands/org.linkedin.glu.commands-impl/src/test/groovy/test/commands/impl/TestFileSystemCommandExecutionIOStorage.groovy b/commands/org.linkedin.glu.commands-impl/src/test/groovy/test/commands/impl/TestFileSystemCommandExecutionIOStorage.groovy index b6c7b4ba..552d7d68 100644 --- a/commands/org.linkedin.glu.commands-impl/src/test/groovy/test/commands/impl/TestFileSystemCommandExecutionIOStorage.groovy +++ b/commands/org.linkedin.glu.commands-impl/src/test/groovy/test/commands/impl/TestFileSystemCommandExecutionIOStorage.groovy @@ -23,7 +23,6 @@ import org.linkedin.groovy.util.io.fs.FileSystemImpl import org.linkedin.glu.commands.impl.GluCommandFactory import java.text.SimpleDateFormat import org.linkedin.groovy.util.json.JsonUtils -import org.linkedin.groovy.util.collections.GroovyCollectionsUtils import org.linkedin.glu.commands.impl.CommandStreamStorage import org.linkedin.glu.commands.impl.StreamType import org.linkedin.util.clock.Timespan @@ -33,6 +32,8 @@ import org.linkedin.glu.utils.io.MultiplexedInputStream import org.linkedin.glu.groovy.utils.json.GluGroovyJsonUtils import org.linkedin.glu.groovy.utils.io.GluGroovyIOUtils import org.linkedin.glu.groovy.utils.plugins.PluginServiceImpl +import org.linkedin.groovy.util.collections.IgnoreTypeComparator +import org.linkedin.glu.groovy.utils.test.GluGroovyTestUtils /** * @author yan@pongasoft.com */ @@ -702,7 +703,7 @@ public class TestFileSystemCommandExecutionIOStorage extends GroovyTestCase xtra0: 'x0', startTime: startTime, completionTime: completionTime, - exception: GluGroovyJsonUtils.exceptionToJSON(exception), + exception: GluGroovyJsonUtils.fromJSON(GluGroovyJsonUtils.exceptionToJSON(exception)), ], commandResource) @@ -963,8 +964,7 @@ public class TestFileSystemCommandExecutionIOStorage extends GroovyTestCase */ void assertEqualsIgnoreType(o1, o2) { - assertEquals(JsonUtils.prettyPrint(o1), JsonUtils.prettyPrint(o2)) - assertTrue("expected <${o1}> but was <${o2}>", GroovyCollectionsUtils.compareIgnoreType(o1, o2)) + GluGroovyTestUtils.assertEqualsIgnoreType(this, "expected <${o1}> but was <${o2}>", o1, o2) } /** diff --git a/console/org.linkedin.glu.console-server/src/cmdline/resources/conf/glu-console-webapp.groovy b/console/org.linkedin.glu.console-server/src/cmdline/resources/conf/glu-console-webapp.groovy index a4d8e480..2141539b 100644 --- a/console/org.linkedin.glu.console-server/src/cmdline/resources/conf/glu-console-webapp.groovy +++ b/console/org.linkedin.glu.console-server/src/cmdline/resources/conf/glu-console-webapp.groovy @@ -50,6 +50,19 @@ orchestration.engine.plugins = [ 'org.linkedin.glu.orchestration.engine.plugins.builtin.StreamFileContentPlugin' ] +// commands +def commandsDir = + System.properties['org.linkedin.glu.console.commands.dir'] ?: "${System.properties['user.dir']}/commands" + +// storage type supported right now are 'filesystem' and 'memory' +console.commandsService.storageType = 'filesystem' + +// when storage is filesystem => where the commands are stored +console.commandsService.commandExecutionIOStorage.filesystem.rootDir = commandsDir + +// when storage is memory => how many elements maximum to store (then start evicting...) +console.commandsService.commandExecutionIOStorage.memory.maxNumberOfElements = 25 + // The following property limits how many (leaf) steps get executed in parallel during a deployment // By default (undefined), it is unlimited // console.deploymentService.deployer.planExecutor.leafExecutorService.fixedThreadPoolSize = 100 @@ -201,4 +214,10 @@ console.defaults = [planType: "transition", displayName: "Stop", state: "stopped"], ], */ + // features that can be turned on and off + features: + [ + commands: true + ], + ] diff --git a/console/org.linkedin.glu.console-webapp/build.gradle b/console/org.linkedin.glu.console-webapp/build.gradle index 71808c65..77caf64f 100644 --- a/console/org.linkedin.glu.console-webapp/build.gradle +++ b/console/org.linkedin.glu.console-webapp/build.gradle @@ -106,6 +106,7 @@ task lib(dependsOn: jar) << { console.keystorePath="${new File(keysDir, 'console.keystore').canonicalPath}" console.secretkeystorePath="${new File(secretKeyStoreDir, 'console.secretkeystore').canonicalPath}" console.truststorePath="${new File(keysDir, 'agent.truststore').canonicalPath}" +console.commandsService.commandExecutionIOStorage.filesystem.rootDir="${buildDir}/commands" """ // generate a hello world system for dev diff --git a/console/org.linkedin.glu.console-webapp/grails-app/conf/BootStrap.groovy b/console/org.linkedin.glu.console-webapp/grails-app/conf/BootStrap.groovy index ff34823c..b37befa3 100644 --- a/console/org.linkedin.glu.console-webapp/grails-app/conf/BootStrap.groovy +++ b/console/org.linkedin.glu.console-webapp/grails-app/conf/BootStrap.groovy @@ -137,7 +137,7 @@ class BootStrap { log.info "Successfully created (original) admin user. MAKE SURE YOU LOG IN AND CHANGE THE PASSWORD!" } - // initialzing default custom delta definition + // initializing default custom delta definition CustomDeltaDefinition defaultCustomDeltaDefinition = CustomDeltaDefinition.fromDashboard(consoleConfig.defaults.dashboard) if(defaultCustomDeltaDefinition) diff --git a/console/org.linkedin.glu.console-webapp/grails-app/conf/Config.groovy b/console/org.linkedin.glu.console-webapp/grails-app/conf/Config.groovy index bc6a5c8b..73c4c7df 100644 --- a/console/org.linkedin.glu.console-webapp/grails-app/conf/Config.groovy +++ b/console/org.linkedin.glu.console-webapp/grails-app/conf/Config.groovy @@ -420,6 +420,9 @@ environments { 'org.linkedin.glu.orchestration.engine.plugins.builtin.StreamFileContentPlugin' ] + // storage type supported right now are 'filesystem' and 'memory' + console.commandsService.storageType = 'filesystem' + // log4j configuration log4j = { @@ -475,6 +478,9 @@ environments { 'org.linkedin.glu.orchestration.engine.plugins.builtin.StreamFileContentPlugin' ] + // storage type supported right now are 'filesystem' and 'memory' + console.commandsService.storageType = 'filesystem' + // log4j configuration log4j = { diff --git a/console/org.linkedin.glu.console-webapp/grails-app/conf/spring/resources.groovy b/console/org.linkedin.glu.console-webapp/grails-app/conf/spring/resources.groovy index 5d6a6a6b..b575bb16 100644 --- a/console/org.linkedin.glu.console-webapp/grails-app/conf/spring/resources.groovy +++ b/console/org.linkedin.glu.console-webapp/grails-app/conf/spring/resources.groovy @@ -67,26 +67,27 @@ beans = { commandExecutionStorage(CommandExecutionStorageImpl) // IO Storage (command) - switch(Environment.current) + switch(consoleConfig.console.commandsService.storageType) { - case Environment.DEVELOPMENT: - case Environment.TEST: - commandExecutionIOStorage(MemoryCommandExecutionIOStorage) { + case 'filesystem': + def rootDir = new File(consoleConfig.console.commandsService.commandExecutionIOStorage.filesystem.rootDir) + commandExecutionFileSystem(FileSystemImpl, rootDir) + + commandExecutionIOStorage(FileSystemCommandExecutionIOStorage) { + commandExecutionFileSystem = ref("commandExecutionFileSystem") pluginService = ref('pluginService') } break - default: - commandExecutionFileSystem(FileSystemImpl) { bean -> - bean.factoryMethod = "createTempFileSystem" - bean.destroyMethod = "destroy" - } - - commandExecutionIOStorage(FileSystemCommandExecutionIOStorage) { - commandExecutionFileSystem = ref("commandExecutionFileSystem") + case 'memory': + commandExecutionIOStorage(MemoryCommandExecutionIOStorage) { pluginService = ref('pluginService') + maxNumberOfElements = consoleConfig.console.commandsService.commandExecutionIOStorage.memory.maxNumberOfElements ?: 25 } break + + default: + throw new IllegalArgumentException("unsupported storageType [${consoleConfig.console.commandsService.storageType}]") } commandsService(CommandsServiceImpl) { diff --git a/console/org.linkedin.glu.console-webapp/grails-app/controllers/org/linkedin/glu/console/controllers/AgentsController.groovy b/console/org.linkedin.glu.console-webapp/grails-app/controllers/org/linkedin/glu/console/controllers/AgentsController.groovy index bdb15311..a892940e 100644 --- a/console/org.linkedin.glu.console-webapp/grails-app/controllers/org/linkedin/glu/console/controllers/AgentsController.groovy +++ b/console/org.linkedin.glu.console-webapp/grails-app/controllers/org/linkedin/glu/console/controllers/AgentsController.groovy @@ -536,7 +536,6 @@ class AgentsController extends ControllerBase { response.sendError(HttpServletResponse.SC_NOT_FOUND, 'no such agent') - render '' } } diff --git a/console/org.linkedin.glu.console-webapp/grails-app/controllers/org/linkedin/glu/console/controllers/CommandsController.groovy b/console/org.linkedin.glu.console-webapp/grails-app/controllers/org/linkedin/glu/console/controllers/CommandsController.groovy index 0caace0d..66c0740d 100644 --- a/console/org.linkedin.glu.console-webapp/grails-app/controllers/org/linkedin/glu/console/controllers/CommandsController.groovy +++ b/console/org.linkedin.glu.console-webapp/grails-app/controllers/org/linkedin/glu/console/controllers/CommandsController.groovy @@ -101,7 +101,6 @@ public class CommandsController extends ControllerBase catch (NoSuchCommandExecutionException e) { response.sendError(HttpServletResponse.SC_NOT_FOUND) - render '' } } } \ No newline at end of file diff --git a/console/org.linkedin.glu.console-webapp/grails-app/controllers/org/linkedin/glu/console/controllers/PlanController.groovy b/console/org.linkedin.glu.console-webapp/grails-app/controllers/org/linkedin/glu/console/controllers/PlanController.groovy index e4462a6b..34094155 100644 --- a/console/org.linkedin.glu.console-webapp/grails-app/controllers/org/linkedin/glu/console/controllers/PlanController.groovy +++ b/console/org.linkedin.glu.console-webapp/grails-app/controllers/org/linkedin/glu/console/controllers/PlanController.groovy @@ -713,7 +713,6 @@ public class PlanController extends ControllerBase else { response.sendError HttpServletResponse.SC_NOT_FOUND - render '' } } diff --git a/console/org.linkedin.glu.console-webapp/grails-app/utils/org/linkedin/glu/grails/utils/ConsoleConfig.groovy b/console/org.linkedin.glu.console-webapp/grails-app/utils/org/linkedin/glu/grails/utils/ConsoleConfig.groovy index 994c26c3..131e335c 100644 --- a/console/org.linkedin.glu.console-webapp/grails-app/utils/org/linkedin/glu/grails/utils/ConsoleConfig.groovy +++ b/console/org.linkedin.glu.console-webapp/grails-app/utils/org/linkedin/glu/grails/utils/ConsoleConfig.groovy @@ -58,7 +58,17 @@ class ConsoleConfig { return org.linkedin.groovy.util.config.Config.getOptionalBoolean(getDefault('features'), feature, - true) + false) } + void disableFeature(feature) + { + def features = defaults.features + if(!features) + { + features = [:] + defaults.features = features + } + features[feature] = false + } } diff --git a/console/org.linkedin.glu.console-webapp/grails-app/views/commands/_command.gsp b/console/org.linkedin.glu.console-webapp/grails-app/views/commands/_command.gsp index a06f16b1..d2fd1065 100644 --- a/console/org.linkedin.glu.console-webapp/grails-app/views/commands/_command.gsp +++ b/console/org.linkedin.glu.console-webapp/grails-app/views/commands/_command.gsp @@ -15,5 +15,5 @@ --}% -
×
${command.username.encodeAsHTML()}@${command.agent.encodeAsHTML()}# ${command.command.encodeAsHTML()} 2>&1 [] [${'$'}?=${command.exitValue?.encodeAsHTML()}]
${command.username.encodeAsHTML()}@${command.agent.encodeAsHTML()}# [] ()
Spinner ... interrupt
setTimeout("renderCommand('${command.commandId}', 'shell-${command.commandId}')", '${params.refreshRate ?: 2000}')
+
×
${command.username.encodeAsHTML()}@${command.agent.encodeAsHTML()}# ${command.command.encodeAsHTML()} 2>&1 [] [${'$'}?=${command.exitValue?.encodeAsHTML()}]
${command.username.encodeAsHTML()}@${command.agent.encodeAsHTML()}# [] ()
Spinner ... interrupt
setTimeout("renderCommand('${command.commandId}', 'shell-${command.commandId}')", '${params.refreshRate ?: 2000}')
diff --git a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStream.groovy b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStream.groovy index f849fe4b..3c662445 100644 --- a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStream.groovy +++ b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStream.groovy @@ -55,10 +55,12 @@ public class CommandExecutionStream _stream = _limitedOutputStream streams[streamType.multiplexName] = _stream + + c(this) } } - - c(this) + else + c(this) } byte[] getBytes() diff --git a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandsServiceImpl.groovy b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandsServiceImpl.groovy index 99c0e633..970bfb63 100644 --- a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandsServiceImpl.groovy +++ b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandsServiceImpl.groovy @@ -183,6 +183,10 @@ public class CommandsServiceImpl implements CommandsService { // it is ok... we did not get any result during this amount of time } + catch(Throwable th) + { + log.warn("command execution generated an unknown exception [ignored]", th) + } } return command.id diff --git a/orchestration/org.linkedin.glu.orchestration-engine/src/test/groovy/test/orchestration/engine/commands/TestCommandsServiceImpl.groovy b/orchestration/org.linkedin.glu.orchestration-engine/src/test/groovy/test/orchestration/engine/commands/TestCommandsServiceImpl.groovy index 5ff75dcd..315413fe 100644 --- a/orchestration/org.linkedin.glu.orchestration-engine/src/test/groovy/test/orchestration/engine/commands/TestCommandsServiceImpl.groovy +++ b/orchestration/org.linkedin.glu.orchestration-engine/src/test/groovy/test/orchestration/engine/commands/TestCommandsServiceImpl.groovy @@ -19,7 +19,7 @@ package test.orchestration.engine.commands import groovy.mock.interceptor.MockFor import org.linkedin.glu.commands.impl.MemoryCommandExecutionIOStorage import org.linkedin.glu.commands.impl.StreamType -import org.linkedin.glu.groovy.utils.GluGroovyLangUtils + import org.linkedin.glu.groovy.utils.io.InputGeneratorStream import org.linkedin.glu.orchestration.engine.agents.AgentsService import org.linkedin.glu.orchestration.engine.authorization.AuthorizationService @@ -41,22 +41,19 @@ import java.util.concurrent.CancellationException import org.linkedin.glu.utils.io.NullOutputStream import org.linkedin.glu.groovy.utils.json.GluGroovyJsonUtils +import org.linkedin.glu.commands.impl.AbstractCommandExecutionIOStorage +import org.linkedin.groovy.util.io.fs.FileSystemImpl +import org.linkedin.glu.commands.impl.FileSystemCommandExecutionIOStorage + /** * @author yan@pongasoft.com */ public class TestCommandsServiceImpl extends GroovyTestCase { SettableClock clock = new SettableClock() - MemoryCommandExecutionStorage storage = new MemoryCommandExecutionStorage() - MemoryCommandExecutionIOStorage ioStorage = new MemoryCommandExecutionIOStorage(clock: clock) - - - CommandsServiceImpl service = - new CommandsServiceImpl(clock: clock, - commandExecutionStorage: storage, - commandExecutionIOStorage: ioStorage, - commandExecutionFirstBytesSize: MemorySize.parse("5"), - defaultSynchronousWaitTimeout: null) + MemoryCommandExecutionStorage storage + AbstractCommandExecutionIOStorage ioStorage + CommandsServiceImpl service @Override protected void setUp() @@ -65,161 +62,199 @@ public class TestCommandsServiceImpl extends GroovyTestCase clock.setCurrentTimeMillis(100000000) } + public void withStorage(Closure test) + { + clock.setCurrentTimeMillis(100000000) + + storage = new MemoryCommandExecutionStorage() + + // io storage => memory + ioStorage = new MemoryCommandExecutionIOStorage(clock: clock) + + service = + new CommandsServiceImpl(clock: clock, + commandExecutionStorage: storage, + commandExecutionIOStorage: ioStorage, + commandExecutionFirstBytesSize: MemorySize.parse("5"), + defaultSynchronousWaitTimeout: null) + + test() + + storage = new MemoryCommandExecutionStorage() + + // io storage => filesystem + FileSystemImpl.createTempFileSystem { fs -> + ioStorage = new FileSystemCommandExecutionIOStorage(clock: clock, + commandExecutionFileSystem: fs) + + service = + new CommandsServiceImpl(clock: clock, + commandExecutionStorage: storage, + commandExecutionIOStorage: ioStorage, + commandExecutionFirstBytesSize: MemorySize.parse("5"), + defaultSynchronousWaitTimeout: null) + + test() + } + } + /** * Test for the basic case */ public void testHappyPath() { - def tc = new ThreadControl(Timespan.parse('30s')) + withStorage { + def tc = new ThreadControl(Timespan.parse('30s')) - def f1 = new Fabric(name: "f1") + def f1 = new Fabric(name: "f1") - withAuthorizationService { + withAuthorizationService { - def agentsServiceMock = new MockFor(AgentsService) + def agentsServiceMock = new MockFor(AgentsService) - def commandId = null + def commandId = null - // executeShellCommand - agentsServiceMock.demand.executeShellCommand { fabric, agentName, args -> - assertEquals(f1, fabric) - assertEquals("a1", agentName) + // executeShellCommand + agentsServiceMock.demand.executeShellCommand { fabric, agentName, args -> + assertEquals(f1, fabric) + assertEquals("a1", agentName) - def copyOfArgs = [*:args] - commandId = copyOfArgs.remove("id") + def copyOfArgs = [*:args] + commandId = copyOfArgs.remove("id") - assertEqualsIgnoreType([ - command: 'uptime', - type: 'shell', - fabric: 'f1', - agent: 'a1', - username: 'u1', - redirectStderr:false - ], - copyOfArgs) + assertEqualsIgnoreType([ + command: 'uptime', + type: 'shell', + fabric: 'f1', + agent: 'a1', + username: 'u1', + redirectStderr:false + ], + copyOfArgs) - clock.addDuration(Timespan.parse("10s")) + clock.addDuration(Timespan.parse("10s")) - tc.block("executeShellCommand.end") - } + tc.block("executeShellCommand.end") + } - // stream result (this will be called asynchronously!) - agentsServiceMock.demand.streamCommandResults { Fabric fabric, - String agentName, - args, - Closure commandResultProcessor -> - assertEquals(f1, fabric) - assertEquals("a1", agentName) - - assertEqualsIgnoreType([ - id: commandId, - exitValueStream: true, - exitValueStreamTimeout: 0, - stdoutStream:true, - username: 'u1', - stderrStream:true - ], - args) - - def streams = [:] - - InputStream exitValueInputStream = new InputGeneratorStream({ 14 }) - streams[StreamType.exitValue.multiplexName] = exitValueInputStream - - streams[StreamType.stdout.multiplexName] = new ByteArrayInputStream("O123456789".bytes) - streams[StreamType.stderr.multiplexName] = new ByteArrayInputStream("E123456789".bytes) - - commandResultProcessor([stream: new MultiplexedInputStream(streams)]) - } + // stream result (this will be called asynchronously!) + agentsServiceMock.demand.streamCommandResults { Fabric fabric, + String agentName, + args, + Closure commandResultProcessor -> + assertEquals(f1, fabric) + assertEquals("a1", agentName) + + assertEqualsIgnoreType([ + id: commandId, + exitValueStream: true, + exitValueStreamTimeout: 0, + stdoutStream:true, + username: 'u1', + stderrStream:true + ], + args) + + def streams = [:] + + InputStream exitValueInputStream = new InputGeneratorStream({ 14 }) + streams[StreamType.exitValue.multiplexName] = exitValueInputStream + + streams[StreamType.stdout.multiplexName] = new ByteArrayInputStream("O123456789".bytes) + streams[StreamType.stderr.multiplexName] = new ByteArrayInputStream("E123456789".bytes) + + commandResultProcessor([stream: new MultiplexedInputStream(streams)]) + } - AgentsService agentsService = agentsServiceMock.proxyInstance() - service.agentsService = agentsService - - long startTime = clock.currentTimeMillis() - - // execute the shell command - String cid = service.executeShellCommand(f1, "a1", [command: "uptime"]) - - // we wait until the shell command is "executing" - tc.waitForBlock("executeShellCommand.end") - - long completionTime = clock.currentTimeMillis() - - // we make sure that we got the same commandId - assertEquals(cid, commandId) - - CommandExecution ce = service._currentCommandExecutions[cid] - assertTrue(ce.command.isExecuting) - - def dbCommandExecution = service.findCommandExecution(f1, cid) - assertEquals(cid, dbCommandExecution.commandId) - assertEquals("uptime", dbCommandExecution.command) - assertFalse(dbCommandExecution.redirectStderr) - assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) - assertEquals(startTime, dbCommandExecution.startTime) - assertNull(dbCommandExecution.completionTime) - assertEquals(f1.name, dbCommandExecution.fabric) - assertEquals("a1", dbCommandExecution.agent) - assertNull(dbCommandExecution.stdinFirstBytes) - assertNull(dbCommandExecution.stdinTotalBytesCount) - assertNull(dbCommandExecution.stdoutFirstBytes) - assertNull(dbCommandExecution.stdoutTotalBytesCount) - assertNull(dbCommandExecution.stderrFirstBytes) - assertNull(dbCommandExecution.stderrTotalBytesCount) - assertTrue(dbCommandExecution.isExecuting) - assertNull(dbCommandExecution.exitError) - - // test bulk methods - def ces = service.findCommandExecutions(f1, null, null) - assertEquals(1, ces.count) - assertEquals(1, ces.commandExecutions.size()) - assertTrue(ces.commandExecutions[0].isExecuting) - - // we can now unblock the call - tc.unblock("executeShellCommand.end") - - // get the results - service.withCommandExecutionAndWithOrWithoutStreams(f1, - cid, - [ - stdoutStream: true, - exitValueStream: true, - exitValueStreamTimeout: 0 - ]) { args -> - - def streams = - MultiplexedInputStream.demultiplexToString(args.stream, - [ - StreamType.exitValue.multiplexName, - StreamType.stdout.multiplexName - ] as Set, - null) - - assertEquals("14", streams[StreamType.exitValue.multiplexName]) - assertEquals("O123456789", streams[StreamType.stdout.multiplexName]) - } + AgentsService agentsService = agentsServiceMock.proxyInstance() + service.agentsService = agentsService + + long startTime = clock.currentTimeMillis() + + // execute the shell command + String cid = service.executeShellCommand(f1, "a1", [command: "uptime"]) + + // we wait until the shell command is "executing" + tc.waitForBlock("executeShellCommand.end") + + long completionTime = clock.currentTimeMillis() + + // we make sure that we got the same commandId + assertEquals(cid, commandId) + + CommandExecution ce = service._currentCommandExecutions[cid] + assertTrue(ce.command.isExecuting) + + def dbCommandExecution = service.findCommandExecution(f1, cid) + assertEquals(cid, dbCommandExecution.commandId) + assertEquals("uptime", dbCommandExecution.command) + assertFalse(dbCommandExecution.redirectStderr) + assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) + assertEquals(startTime, dbCommandExecution.startTime) + assertNull(dbCommandExecution.completionTime) + assertEquals(f1.name, dbCommandExecution.fabric) + assertEquals("a1", dbCommandExecution.agent) + assertNull(dbCommandExecution.stdinFirstBytes) + assertNull(dbCommandExecution.stdinTotalBytesCount) + assertNull(dbCommandExecution.stdoutFirstBytes) + assertNull(dbCommandExecution.stdoutTotalBytesCount) + assertNull(dbCommandExecution.stderrFirstBytes) + assertNull(dbCommandExecution.stderrTotalBytesCount) + assertTrue(dbCommandExecution.isExecuting) + assertNull(dbCommandExecution.exitError) + + // test bulk methods + def ces = service.findCommandExecutions(f1, null, null) + assertEquals(1, ces.count) + assertEquals(1, ces.commandExecutions.size()) + assertTrue(ces.commandExecutions[0].isExecuting) + + // we can now unblock the call + tc.unblock("executeShellCommand.end") + + // get the results + service.withCommandExecutionAndWithOrWithoutStreams(f1, + cid, + [ + stdoutStream: true, + exitValueStream: true, + exitValueStreamTimeout: 0 + ]) { args -> + + def streams = + MultiplexedInputStream.demultiplexToString(args.stream, + [ + StreamType.exitValue.multiplexName, + StreamType.stdout.multiplexName + ] as Set, + null) + + assertEquals("14", streams[StreamType.exitValue.multiplexName]) + assertEquals("O123456789", streams[StreamType.stdout.multiplexName]) + } - agentsServiceMock.verify(agentsService) - - assertNull("command is completed", service._currentCommandExecutions[cid]) - dbCommandExecution = service.findCommandExecution(f1, cid) - assertEquals(cid, dbCommandExecution.commandId) - assertEquals("uptime", dbCommandExecution.command) - assertFalse(dbCommandExecution.redirectStderr) - assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) - assertEquals(startTime, dbCommandExecution.startTime) - assertEquals(completionTime, dbCommandExecution.completionTime) - assertEquals(f1.name, dbCommandExecution.fabric) - assertEquals("a1", dbCommandExecution.agent) - assertNull(dbCommandExecution.stdinFirstBytes) - assertNull(dbCommandExecution.stdinTotalBytesCount) - assertEquals("O1234", new String(dbCommandExecution.stdoutFirstBytes)) - assertEquals(10, dbCommandExecution.stdoutTotalBytesCount) - assertEquals("E1234", new String(dbCommandExecution.stderrFirstBytes)) - assertEquals(10, dbCommandExecution.stderrTotalBytesCount) - assertEquals("14", dbCommandExecution.exitValue) - assertFalse(dbCommandExecution.isExecuting) - assertNull(dbCommandExecution.exitError) + agentsServiceMock.verify(agentsService) + + assertNull("command is completed", service._currentCommandExecutions[cid]) + dbCommandExecution = service.findCommandExecution(f1, cid) + assertEquals(cid, dbCommandExecution.commandId) + assertEquals("uptime", dbCommandExecution.command) + assertFalse(dbCommandExecution.redirectStderr) + assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) + assertEquals(startTime, dbCommandExecution.startTime) + assertEquals(completionTime, dbCommandExecution.completionTime) + assertEquals(f1.name, dbCommandExecution.fabric) + assertEquals("a1", dbCommandExecution.agent) + assertNull(dbCommandExecution.stdinFirstBytes) + assertNull(dbCommandExecution.stdinTotalBytesCount) + assertEquals("O1234", new String(dbCommandExecution.stdoutFirstBytes)) + assertEquals(10, dbCommandExecution.stdoutTotalBytesCount) + assertEquals("E1234", new String(dbCommandExecution.stderrFirstBytes)) + assertEquals(10, dbCommandExecution.stderrTotalBytesCount) + assertEquals("14", dbCommandExecution.exitValue) + assertFalse(dbCommandExecution.isExecuting) + assertNull(dbCommandExecution.exitError) + } } } @@ -228,160 +263,162 @@ public class TestCommandsServiceImpl extends GroovyTestCase */ public void testStdinAndRedirectStderr() { - def tc = new ThreadControl(Timespan.parse('30s')) + withStorage { + def tc = new ThreadControl(Timespan.parse('30s')) - def f1 = new Fabric(name: "f1") + def f1 = new Fabric(name: "f1") - withAuthorizationService { + withAuthorizationService { - def agentsServiceMock = new MockFor(AgentsService) + def agentsServiceMock = new MockFor(AgentsService) - def commandId = null + def commandId = null - // executeShellCommand - agentsServiceMock.demand.executeShellCommand { fabric, agentName, args -> - assertEquals(f1, fabric) - assertEquals("a1", agentName) + // executeShellCommand + agentsServiceMock.demand.executeShellCommand { fabric, agentName, args -> + assertEquals(f1, fabric) + assertEquals("a1", agentName) - def copyOfArgs = [*:args] - commandId = copyOfArgs.remove("id") - copyOfArgs.stdin = args.stdin.text + def copyOfArgs = [*:args] + commandId = copyOfArgs.remove("id") + copyOfArgs.stdin = args.stdin.text - assertEqualsIgnoreType([ - command: 'uptime', - type: 'shell', - fabric: 'f1', - agent: 'a1', - username: 'u1', - redirectStderr:true, - stdin: "I123456789" - ], - copyOfArgs) + assertEqualsIgnoreType([ + command: 'uptime', + type: 'shell', + fabric: 'f1', + agent: 'a1', + username: 'u1', + redirectStderr:true, + stdin: "I123456789" + ], + copyOfArgs) - clock.addDuration(Timespan.parse("10s")) + clock.addDuration(Timespan.parse("10s")) - tc.block("executeShellCommand.end") - } + tc.block("executeShellCommand.end") + } - // stream result (this will be called asynchronously!) - agentsServiceMock.demand.streamCommandResults { Fabric fabric, - String agentName, - args, - Closure commandResultProcessor -> - assertEquals(f1, fabric) - assertEquals("a1", agentName) + // stream result (this will be called asynchronously!) + agentsServiceMock.demand.streamCommandResults { Fabric fabric, + String agentName, + args, + Closure commandResultProcessor -> + assertEquals(f1, fabric) + assertEquals("a1", agentName) - assertEqualsIgnoreType([ - id: commandId, - exitValueStream: true, - exitValueStreamTimeout: 0, - stdoutStream:true, - username: 'u1', - ], - args) + assertEqualsIgnoreType([ + id: commandId, + exitValueStream: true, + exitValueStreamTimeout: 0, + stdoutStream:true, + username: 'u1', + ], + args) - def streams = [:] + def streams = [:] - InputStream exitValueInputStream = new InputGeneratorStream({ 14 }) - streams[StreamType.exitValue.multiplexName] = exitValueInputStream + InputStream exitValueInputStream = new InputGeneratorStream({ 14 }) + streams[StreamType.exitValue.multiplexName] = exitValueInputStream - streams[StreamType.stdout.multiplexName] = new ByteArrayInputStream("O123456789".bytes) + streams[StreamType.stdout.multiplexName] = new ByteArrayInputStream("O123456789".bytes) - commandResultProcessor([stream: new MultiplexedInputStream(streams)]) - } + commandResultProcessor([stream: new MultiplexedInputStream(streams)]) + } - AgentsService agentsService = agentsServiceMock.proxyInstance() - service.agentsService = agentsService - - long startTime = clock.currentTimeMillis() - - // execute the shell command - String cid = service.executeShellCommand(f1, - "a1", - [ - command: "uptime", - stdin: new ByteArrayInputStream("I123456789".bytes), - redirectStderr: true - ]) - - // we wait until the shell command is "executing" - tc.waitForBlock("executeShellCommand.end") - - long completionTime = clock.currentTimeMillis() - - // we make sure that we got the same commandId - assertEquals(cid, commandId) - - CommandExecution ce = service._currentCommandExecutions[cid] - assertTrue(ce.command.isExecuting) - - def dbCommandExecution = service.findCommandExecution(f1, cid) - assertEquals(cid, dbCommandExecution.commandId) - assertEquals("uptime", dbCommandExecution.command) - assertTrue(dbCommandExecution.redirectStderr) - assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) - assertEquals(startTime, dbCommandExecution.startTime) - assertNull(dbCommandExecution.completionTime) - assertEquals(f1.name, dbCommandExecution.fabric) - assertEquals("a1", dbCommandExecution.agent) - assertEquals("I1234", new String(dbCommandExecution.stdinFirstBytes)) - assertEquals(10, dbCommandExecution.stdinTotalBytesCount) - assertNull(dbCommandExecution.stdoutFirstBytes) - assertNull(dbCommandExecution.stdoutTotalBytesCount) - assertNull(dbCommandExecution.stderrFirstBytes) - assertNull(dbCommandExecution.stderrTotalBytesCount) - assertTrue(dbCommandExecution.isExecuting) - - // test bulk methods - def ces = service.findCommandExecutions(f1, null, null) - assertEquals(1, ces.count) - assertEquals(1, ces.commandExecutions.size()) - assertTrue(ces.commandExecutions[0].isExecuting) - - // we can now unblock the call - tc.unblock("executeShellCommand.end") - - // get the results - service.withCommandExecutionAndWithOrWithoutStreams(f1, - cid, - [ - stdoutStream: true, - exitValueStream: true, - exitValueStreamTimeout: 0 - ]) { args -> - - def streams = - MultiplexedInputStream.demultiplexToString(args.stream, - [ - StreamType.exitValue.multiplexName, - StreamType.stdout.multiplexName - ] as Set, - null) - - assertEquals("14", streams[StreamType.exitValue.multiplexName]) - assertEquals("O123456789", streams[StreamType.stdout.multiplexName]) - } + AgentsService agentsService = agentsServiceMock.proxyInstance() + service.agentsService = agentsService + + long startTime = clock.currentTimeMillis() + + // execute the shell command + String cid = service.executeShellCommand(f1, + "a1", + [ + command: "uptime", + stdin: new ByteArrayInputStream("I123456789".bytes), + redirectStderr: true + ]) + + // we wait until the shell command is "executing" + tc.waitForBlock("executeShellCommand.end") + + long completionTime = clock.currentTimeMillis() + + // we make sure that we got the same commandId + assertEquals(cid, commandId) + + CommandExecution ce = service._currentCommandExecutions[cid] + assertTrue(ce.command.isExecuting) + + def dbCommandExecution = service.findCommandExecution(f1, cid) + assertEquals(cid, dbCommandExecution.commandId) + assertEquals("uptime", dbCommandExecution.command) + assertTrue(dbCommandExecution.redirectStderr) + assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) + assertEquals(startTime, dbCommandExecution.startTime) + assertNull(dbCommandExecution.completionTime) + assertEquals(f1.name, dbCommandExecution.fabric) + assertEquals("a1", dbCommandExecution.agent) + assertEquals("I1234", new String(dbCommandExecution.stdinFirstBytes)) + assertEquals(10, dbCommandExecution.stdinTotalBytesCount) + assertNull(dbCommandExecution.stdoutFirstBytes) + assertNull(dbCommandExecution.stdoutTotalBytesCount) + assertNull(dbCommandExecution.stderrFirstBytes) + assertNull(dbCommandExecution.stderrTotalBytesCount) + assertTrue(dbCommandExecution.isExecuting) + + // test bulk methods + def ces = service.findCommandExecutions(f1, null, null) + assertEquals(1, ces.count) + assertEquals(1, ces.commandExecutions.size()) + assertTrue(ces.commandExecutions[0].isExecuting) + + // we can now unblock the call + tc.unblock("executeShellCommand.end") + + // get the results + service.withCommandExecutionAndWithOrWithoutStreams(f1, + cid, + [ + stdoutStream: true, + exitValueStream: true, + exitValueStreamTimeout: 0 + ]) { args -> + + def streams = + MultiplexedInputStream.demultiplexToString(args.stream, + [ + StreamType.exitValue.multiplexName, + StreamType.stdout.multiplexName + ] as Set, + null) + + assertEquals("14", streams[StreamType.exitValue.multiplexName]) + assertEquals("O123456789", streams[StreamType.stdout.multiplexName]) + } - agentsServiceMock.verify(agentsService) - - assertNull("command is completed", service._currentCommandExecutions[cid]) - dbCommandExecution = service.findCommandExecution(f1, cid) - assertEquals(cid, dbCommandExecution.commandId) - assertEquals("uptime", dbCommandExecution.command) - assertTrue(dbCommandExecution.redirectStderr) - assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) - assertEquals(startTime, dbCommandExecution.startTime) - assertEquals(completionTime, dbCommandExecution.completionTime) - assertEquals(f1.name, dbCommandExecution.fabric) - assertEquals("a1", dbCommandExecution.agent) - assertEquals("I1234", new String(dbCommandExecution.stdinFirstBytes)) - assertEquals(10, dbCommandExecution.stdinTotalBytesCount) - assertEquals("O1234", new String(dbCommandExecution.stdoutFirstBytes)) - assertEquals(10, dbCommandExecution.stdoutTotalBytesCount) - assertNull(dbCommandExecution.stderrFirstBytes) - assertNull(dbCommandExecution.stderrTotalBytesCount) - assertEquals("14", dbCommandExecution.exitValue) - assertFalse(dbCommandExecution.isExecuting) + agentsServiceMock.verify(agentsService) + + assertNull("command is completed", service._currentCommandExecutions[cid]) + dbCommandExecution = service.findCommandExecution(f1, cid) + assertEquals(cid, dbCommandExecution.commandId) + assertEquals("uptime", dbCommandExecution.command) + assertTrue(dbCommandExecution.redirectStderr) + assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) + assertEquals(startTime, dbCommandExecution.startTime) + assertEquals(completionTime, dbCommandExecution.completionTime) + assertEquals(f1.name, dbCommandExecution.fabric) + assertEquals("a1", dbCommandExecution.agent) + assertEquals("I1234", new String(dbCommandExecution.stdinFirstBytes)) + assertEquals(10, dbCommandExecution.stdinTotalBytesCount) + assertEquals("O1234", new String(dbCommandExecution.stdoutFirstBytes)) + assertEquals(10, dbCommandExecution.stdoutTotalBytesCount) + assertNull(dbCommandExecution.stderrFirstBytes) + assertNull(dbCommandExecution.stderrTotalBytesCount) + assertEquals("14", dbCommandExecution.exitValue) + assertFalse(dbCommandExecution.isExecuting) + } } } @@ -391,115 +428,117 @@ public class TestCommandsServiceImpl extends GroovyTestCase */ public void testWithFailureInCommandExecution() { - def tc = new ThreadControl(Timespan.parse('30s')) - - def f1 = new Fabric(name: "f1") + withStorage { + def tc = new ThreadControl(Timespan.parse('30s')) - withAuthorizationService { + def f1 = new Fabric(name: "f1") - def agentsServiceMock = new MockFor(AgentsService) + withAuthorizationService { - // executeShellCommand - agentsServiceMock.demand.executeShellCommand { fabric, agentName, args -> - clock.addDuration(Timespan.parse("10s")) - tc.blockWithException("executeShellCommand.end") - } + def agentsServiceMock = new MockFor(AgentsService) - AgentsService agentsService = agentsServiceMock.proxyInstance() - service.agentsService = agentsService - - long startTime = clock.currentTimeMillis() - - // execute the shell command - String cid = service.executeShellCommand(f1, - "a1", - [ - command: "uptime", - stdin: new ByteArrayInputStream("I123456789".bytes), - redirectStderr: true - ]) - - // we wait until the shell command is "executing" - tc.waitForBlock("executeShellCommand.end") - - long completionTime = clock.currentTimeMillis() - - CommandExecution ce = service._currentCommandExecutions[cid] - assertTrue(ce.command.isExecuting) - - def dbCommandExecution = service.findCommandExecution(f1, cid) - assertEquals(cid, dbCommandExecution.commandId) - assertEquals("uptime", dbCommandExecution.command) - assertTrue(dbCommandExecution.redirectStderr) - assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) - assertEquals(startTime, dbCommandExecution.startTime) - assertNull(dbCommandExecution.completionTime) - assertEquals(f1.name, dbCommandExecution.fabric) - assertEquals("a1", dbCommandExecution.agent) - assertEquals("I1234", new String(dbCommandExecution.stdinFirstBytes)) - assertEquals(10, dbCommandExecution.stdinTotalBytesCount) - assertNull(dbCommandExecution.stdoutFirstBytes) - assertNull(dbCommandExecution.stdoutTotalBytesCount) - assertNull(dbCommandExecution.stderrFirstBytes) - assertNull(dbCommandExecution.stderrTotalBytesCount) - assertTrue(dbCommandExecution.isExecuting) - assertNull(dbCommandExecution.exitError) - - // test bulk methods - def ces = service.findCommandExecutions(f1, null, null) - assertEquals(1, ces.count) - assertEquals(1, ces.commandExecutions.size()) - assertTrue(ces.commandExecutions[0].isExecuting) - - def exception = new Exception("failure => agentsService.executeShellCommand") - - // we can now unblock the call - tc.unblock("executeShellCommand.end", exception) - - // get the results - service.withCommandExecutionAndWithOrWithoutStreams(f1, - cid, - [ - exitValueStream: true, - exitValueStreamTimeout: 0 - ]) { args -> - - try - { - MultiplexedInputStream.demultiplexToString(args.stream, - [ - StreamType.exitValue.multiplexName, - ] as Set, - null) - fail("should have failed!") + // executeShellCommand + agentsServiceMock.demand.executeShellCommand { fabric, agentName, args -> + clock.addDuration(Timespan.parse("10s")) + tc.blockWithException("executeShellCommand.end") } - catch(IOException e) - { - assertEquals(exception, e.cause) + + AgentsService agentsService = agentsServiceMock.proxyInstance() + service.agentsService = agentsService + + long startTime = clock.currentTimeMillis() + + // execute the shell command + String cid = service.executeShellCommand(f1, + "a1", + [ + command: "uptime", + stdin: new ByteArrayInputStream("I123456789".bytes), + redirectStderr: true + ]) + + // we wait until the shell command is "executing" + tc.waitForBlock("executeShellCommand.end") + + long completionTime = clock.currentTimeMillis() + + CommandExecution ce = service._currentCommandExecutions[cid] + assertTrue(ce.command.isExecuting) + + def dbCommandExecution = service.findCommandExecution(f1, cid) + assertEquals(cid, dbCommandExecution.commandId) + assertEquals("uptime", dbCommandExecution.command) + assertTrue(dbCommandExecution.redirectStderr) + assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) + assertEquals(startTime, dbCommandExecution.startTime) + assertNull(dbCommandExecution.completionTime) + assertEquals(f1.name, dbCommandExecution.fabric) + assertEquals("a1", dbCommandExecution.agent) + assertEquals("I1234", new String(dbCommandExecution.stdinFirstBytes)) + assertEquals(10, dbCommandExecution.stdinTotalBytesCount) + assertNull(dbCommandExecution.stdoutFirstBytes) + assertNull(dbCommandExecution.stdoutTotalBytesCount) + assertNull(dbCommandExecution.stderrFirstBytes) + assertNull(dbCommandExecution.stderrTotalBytesCount) + assertTrue(dbCommandExecution.isExecuting) + assertNull(dbCommandExecution.exitError) + + // test bulk methods + def ces = service.findCommandExecutions(f1, null, null) + assertEquals(1, ces.count) + assertEquals(1, ces.commandExecutions.size()) + assertTrue(ces.commandExecutions[0].isExecuting) + + def exception = new Exception("failure => agentsService.executeShellCommand") + + // we can now unblock the call + tc.unblock("executeShellCommand.end", exception) + + // get the results + service.withCommandExecutionAndWithOrWithoutStreams(f1, + cid, + [ + exitValueStream: true, + exitValueStreamTimeout: 0 + ]) { args -> + + try + { + MultiplexedInputStream.demultiplexToString(args.stream, + [ + StreamType.exitValue.multiplexName, + ] as Set, + null) + fail("should have failed!") + } + catch(IOException e) + { + assertEquals(exception, e.cause) + } } - } - agentsServiceMock.verify(agentsService) - - assertNull("command is completed", service._currentCommandExecutions[cid]) - dbCommandExecution = service.findCommandExecution(f1, cid) - assertEquals(cid, dbCommandExecution.commandId) - assertEquals("uptime", dbCommandExecution.command) - assertTrue(dbCommandExecution.redirectStderr) - assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) - assertEquals(startTime, dbCommandExecution.startTime) - assertEquals(completionTime, dbCommandExecution.completionTime) - assertEquals(f1.name, dbCommandExecution.fabric) - assertEquals("a1", dbCommandExecution.agent) - assertEquals("I1234", new String(dbCommandExecution.stdinFirstBytes)) - assertEquals(10, dbCommandExecution.stdinTotalBytesCount) - assertNull(dbCommandExecution.stdoutFirstBytes) - assertNull(dbCommandExecution.stdoutTotalBytesCount) - assertNull(dbCommandExecution.stderrFirstBytes) - assertNull(dbCommandExecution.stderrTotalBytesCount) - assertNull(dbCommandExecution.exitValue) - assertFalse(dbCommandExecution.isExecuting) - assertEquals(GluGroovyJsonUtils.exceptionToJSON(exception), dbCommandExecution.exitError) + agentsServiceMock.verify(agentsService) + + assertNull("command is completed", service._currentCommandExecutions[cid]) + dbCommandExecution = service.findCommandExecution(f1, cid) + assertEquals(cid, dbCommandExecution.commandId) + assertEquals("uptime", dbCommandExecution.command) + assertTrue(dbCommandExecution.redirectStderr) + assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) + assertEquals(startTime, dbCommandExecution.startTime) + assertEquals(completionTime, dbCommandExecution.completionTime) + assertEquals(f1.name, dbCommandExecution.fabric) + assertEquals("a1", dbCommandExecution.agent) + assertEquals("I1234", new String(dbCommandExecution.stdinFirstBytes)) + assertEquals(10, dbCommandExecution.stdinTotalBytesCount) + assertNull(dbCommandExecution.stdoutFirstBytes) + assertNull(dbCommandExecution.stdoutTotalBytesCount) + assertNull(dbCommandExecution.stderrFirstBytes) + assertNull(dbCommandExecution.stderrTotalBytesCount) + assertNull(dbCommandExecution.exitValue) + assertFalse(dbCommandExecution.isExecuting) + assertEquals(GluGroovyJsonUtils.exceptionToJSON(exception), dbCommandExecution.exitError) + } } } @@ -508,119 +547,121 @@ public class TestCommandsServiceImpl extends GroovyTestCase */ public void testInterrupt() { - def tc = new ThreadControl(Timespan.parse('30s')) + withStorage { + def tc = new ThreadControl(Timespan.parse('30s')) - def f1 = new Fabric(name: "f1") + def f1 = new Fabric(name: "f1") - withAuthorizationService { + withAuthorizationService { - final def agentsServiceMock = new MockFor(AgentsService) + final def agentsServiceMock = new MockFor(AgentsService) - final def lock = new Object() + final def lock = new Object() - def simulateBlockingCommand = { - synchronized(lock) - { - tc.blockWithException("simulateBlockingCommand.start") - lock.wait() + def simulateBlockingCommand = { + synchronized(lock) + { + tc.blockWithException("simulateBlockingCommand.start") + lock.wait() + } } - } - def future = new FutureTaskExecution(simulateBlockingCommand) - future.clock = clock + def future = new FutureTaskExecution(simulateBlockingCommand) + future.clock = clock - // executeShellCommand - agentsServiceMock.demand.executeShellCommand { fabric, agentName, args -> - clock.addDuration(Timespan.parse("10s")) - future.runSync() - } + // executeShellCommand + agentsServiceMock.demand.executeShellCommand { fabric, agentName, args -> + clock.addDuration(Timespan.parse("10s")) + future.runSync() + } - // interrupt command - agentsServiceMock.demand.interruptCommand { fabric, agentName, args -> - future.cancel(true) - } + // interrupt command + agentsServiceMock.demand.interruptCommand { fabric, agentName, args -> + future.cancel(true) + } - AgentsService agentsService = agentsServiceMock.proxyInstance() - service.agentsService = agentsService + AgentsService agentsService = agentsServiceMock.proxyInstance() + service.agentsService = agentsService - long startTime = clock.currentTimeMillis() + long startTime = clock.currentTimeMillis() - // execute the shell command - String cid = service.executeShellCommand(f1, - "a1", - [ - command: "uptime", - redirectStderr: true - ]) + // execute the shell command + String cid = service.executeShellCommand(f1, + "a1", + [ + command: "uptime", + redirectStderr: true + ]) - // we wait until the shell command is "executing" - tc.waitForBlock("simulateBlockingCommand.start") + // we wait until the shell command is "executing" + tc.waitForBlock("simulateBlockingCommand.start") - long completionTime = clock.currentTimeMillis() + long completionTime = clock.currentTimeMillis() - // get the results - service.withCommandExecutionAndWithOrWithoutStreams(f1, - cid, - [ - exitValueStream: true, - exitValueStreamTimeout: 0 - ]) { args -> + // get the results + service.withCommandExecutionAndWithOrWithoutStreams(f1, + cid, + [ + exitValueStream: true, + exitValueStreamTimeout: 0 + ]) { args -> - // at this stage the exit value stream is blocking: reading the stream will block - // and throw an exception => interrupting the process - assertTrue(service.interruptCommand(f1, "a1", cid)) + // at this stage the exit value stream is blocking: reading the stream will block + // and throw an exception => interrupting the process + assertTrue(service.interruptCommand(f1, "a1", cid)) - shouldFailWithCause(CancellationException) { - NullOutputStream.INSTANCE << args.stream + shouldFailWithCause(CancellationException) { + NullOutputStream.INSTANCE << args.stream + } } - } - agentsServiceMock.verify(agentsService) - - CommandExecution ce = ioStorage.findCommandExecution(cid) - assertTrue(ce.isCompleted()) - - assertNull("command is completed", service._currentCommandExecutions[cid]) - DbCommandExecution dbCommandExecution = service.findCommandExecution(f1, cid) - assertEquals(cid, dbCommandExecution.commandId) - assertEquals("uptime", dbCommandExecution.command) - assertTrue(dbCommandExecution.redirectStderr) - assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) - assertEquals(startTime, dbCommandExecution.startTime) - assertEquals(completionTime, dbCommandExecution.completionTime) - assertEquals(f1.name, dbCommandExecution.fabric) - assertEquals("a1", dbCommandExecution.agent) - assertNull(dbCommandExecution.stdinFirstBytes) - assertNull(dbCommandExecution.stdinTotalBytesCount) - assertNull(dbCommandExecution.stdoutFirstBytes) - assertNull(dbCommandExecution.stdoutTotalBytesCount) - assertNull(dbCommandExecution.stderrFirstBytes) - assertNull(dbCommandExecution.stderrTotalBytesCount) - assertNull(dbCommandExecution.exitValue) - assertEquals(GluGroovyJsonUtils.exceptionToJSON(ce.completionValue), dbCommandExecution.exitError) - assertFalse(dbCommandExecution.isExecuting) - - // now that the command is complete... - service.withCommandExecutionAndWithOrWithoutStreams(f1, - cid, - [ - exitValueStream: true - ]) { args -> - dbCommandExecution = args.commandExecution + agentsServiceMock.verify(agentsService) + + CommandExecution ce = ioStorage.findCommandExecution(cid) + assertTrue(ce.isCompleted()) + + assertNull("command is completed", service._currentCommandExecutions[cid]) + DbCommandExecution dbCommandExecution = service.findCommandExecution(f1, cid) + assertEquals(cid, dbCommandExecution.commandId) + assertEquals("uptime", dbCommandExecution.command) + assertTrue(dbCommandExecution.redirectStderr) + assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) + assertEquals(startTime, dbCommandExecution.startTime) + assertEquals(completionTime, dbCommandExecution.completionTime) + assertEquals(f1.name, dbCommandExecution.fabric) + assertEquals("a1", dbCommandExecution.agent) + assertNull(dbCommandExecution.stdinFirstBytes) + assertNull(dbCommandExecution.stdinTotalBytesCount) + assertNull(dbCommandExecution.stdoutFirstBytes) + assertNull(dbCommandExecution.stdoutTotalBytesCount) + assertNull(dbCommandExecution.stderrFirstBytes) + assertNull(dbCommandExecution.stderrTotalBytesCount) + assertNull(dbCommandExecution.exitValue) assertEquals(GluGroovyJsonUtils.exceptionToJSON(ce.completionValue), dbCommandExecution.exitError) + assertFalse(dbCommandExecution.isExecuting) + + // now that the command is complete... + service.withCommandExecutionAndWithOrWithoutStreams(f1, + cid, + [ + exitValueStream: true + ]) { args -> + dbCommandExecution = args.commandExecution + assertEquals(GluGroovyJsonUtils.exceptionToJSON(ce.completionValue), dbCommandExecution.exitError) + + // now that the execution is complete... there is no exit value so it should not fail + assertEquals("", args.stream.text) + } - // now that the execution is complete... there is no exit value so it should not fail - assertEquals("", args.stream.text) - } - - // now that the command is complete... - service.withCommandExecutionAndWithOrWithoutStreams(f1, - cid, - [ - exitErrorStream: true - ]) { args -> - // now that the execution is complete... we should get the error stream - assertEquals(GluGroovyJsonUtils.exceptionToJSON(ce.completionValue), args.stream.text) + // now that the command is complete... + service.withCommandExecutionAndWithOrWithoutStreams(f1, + cid, + [ + exitErrorStream: true + ]) { args -> + // now that the execution is complete... we should get the error stream + assertEquals(GluGroovyJsonUtils.exceptionToJSON(ce.completionValue), args.stream.text) + } } } } diff --git a/utils/org.linkedin.glu.utils/src/main/groovy/org/linkedin/glu/groovy/utils/test/GluGroovyTestUtils.groovy b/utils/org.linkedin.glu.utils/src/main/groovy/org/linkedin/glu/groovy/utils/test/GluGroovyTestUtils.groovy new file mode 100644 index 00000000..4ebdea76 --- /dev/null +++ b/utils/org.linkedin.glu.utils/src/main/groovy/org/linkedin/glu/groovy/utils/test/GluGroovyTestUtils.groovy @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2012 Yan Pujante + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.linkedin.glu.groovy.utils.test + +import org.linkedin.groovy.util.json.JsonUtils +import org.linkedin.groovy.util.collections.IgnoreTypeComparator + +/** + * @author yan@pongasoft.com */ +public class GluGroovyTestUtils +{ + /** + * The main issue when comparing 2 maps is that if the type of the map is different then the 2 + * maps are different even if their content is the same... this method simply compares the + * content of the maps. + */ + static void assertEqualsIgnoreType(def testCase, String message = null, Map map1, Map map2) + { + checkAndFail(testCase, message, map1, map2) { + testCase.assertEquals(JsonUtils.prettyPrint(map1), JsonUtils.prettyPrint(map2)) + if(map1 == null) + testCase.assertNull("map1 is null, map2 is not", map2) + + testCase.assertEquals("map size differ ${map1.size()} != ${map2?.size()}", map1.size(), map2?.size()) + + map1.each { k, v -> + testCase.assertTrue("map2 does not contain key ${k}", map2.containsKey(k)) + assertEqualsIgnoreType(testCase, "map value for key ${k} mismatch", v, map2[k]) + } + } + } + + /** + * The main issue when comparing 2 lists is that if the type of the list is different + * then the 2 lists are different even if their content is the same... this method simply + * compares the content of the lists + */ + static void assertEqualsIgnoreType(def testCase, String message = null, List list1, List list2) + { + checkAndFail(testCase, message, list1, list2) { + testCase.assertEquals(JsonUtils.prettyPrint(list1), JsonUtils.prettyPrint(list2)) + if(list1 == null) + testCase.assertNull("list1 is null, list2 is not", list2) + + testCase.assertEquals("list size differ ${list1.size()} != ${list2?.size()}", list1.size(), list2?.size()) + + def iterator = list2.iterator() + + list1.eachWithIndex { e, idx -> + assertEqualsIgnoreType(testCase, "list index ${idx} mismatch", e, iterator.next()) + } + } + } + + /** + * The main issue when comparing 2 sets is that if the type of the list is different + * then the 2 sets are different even if their content is the same... this method simply + * compares the content of the sets + */ + static void assertEqualsIgnoreType(def testCase, String message = null, Set set1, Set set2) + { + checkAndFail(testCase, message, set1, set2) { + testCase.assertEquals(JsonUtils.prettyPrint(set1), JsonUtils.prettyPrint(set2)) + if(set1 == null) + testCase.assertNull("set1 is null, set2 is not", set2) + + testCase.assertEquals("set size differ ${set1.size()} != ${set2?.size()}", set1.size(), set2?.size()) + + assertEqualsIgnoreType(testCase, + message, + set1.sort(IgnoreTypeComparator.INSTANCE), + set2.sort(IgnoreTypeComparator.INSTANCE)) + } + } + + /** + * The main issue when comparing 2 collections is that if the type of the collection is different + * then the 2 collections are different even if their content is the same... this method simply + * compares the content of the collections + */ + static void assertEqualsIgnoreType(def testCase, String message = null, Collection c1, Collection c2) + { + assertEqualsIgnoreType(testCase, message, c1?.asList(), c2?.asList()) + } + + /** + * This method is being used for recursing purposes + */ + static void assertEqualsIgnoreType(def testCase, String message = null, Object o1, Object o2) + { + testCase.assertEquals(message, o1, o2) + } + + static void checkAndFail(def testCase, String message, Object o1, Object o2, Closure closure) + { + try + { + closure() + } + catch(Throwable th) + { + if(message) + { + try + { + testCase.fail(message) + } + catch(Throwable failureError) + { + failureError.initCause(th) + throw failureError + } + } + throw th + } + } +} \ No newline at end of file