From ecb8396a9f343b9b7b2c2cf7694847ac83cfaf25 Mon Sep 17 00:00:00 2001 From: Yan Pujante Date: Wed, 7 Nov 2012 16:24:49 -1000 Subject: [PATCH] #166: added test for CommandsServiceImpl --- .../impl/command/CommandManagerImpl.groovy | 6 +- .../impl/AbstractCommandStreamStorage.groovy | 4 +- .../commands/impl/CommandStreamStorage.groovy | 4 +- ...FileSystemCommandExecutionIOStorage.groovy | 2 +- .../MemoryCommandExecutionIOStorage.groovy | 13 +- ...FileSystemCommandExecutionIOStorage.groovy | 24 +- ...TestMemoryCommandExecutionIOStorage.groovy | 24 +- .../grails-app/conf/spring/resources.groovy | 16 +- .../controllers/CommandsController.groovy | 2 +- .../grails-app/views/commands/_command.gsp | 2 +- .../CommandExecutionStorageImpl.groovy | 7 +- .../commands/CommandExecutionStream.groovy | 73 +++ .../commands/CommandsServiceImpl.groovy | 195 +++---- .../engine/commands/DbCommandExecution.groovy | 8 +- .../MemoryCommandExecutionStorage.groovy | 142 +++++ .../commands/TestCommandsServiceImpl.groovy | 537 ++++++++++++++++++ .../GluGroovyCollectionUtils.groovy | 19 + .../utils/io/InputGeneratorStream.groovy | 52 +- .../utils/exceptions/MultipleExceptions.java | 27 +- .../glu/utils/io/MultiplexedInputStream.java | 18 +- .../TestGluGroovyCollectionUtils.groovy | 13 + 21 files changed, 993 insertions(+), 195 deletions(-) create mode 100644 orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStream.groovy create mode 100644 orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/MemoryCommandExecutionStorage.groovy create mode 100644 orchestration/org.linkedin.glu.orchestration-engine/src/test/groovy/test/orchestration/engine/commands/TestCommandsServiceImpl.groovy diff --git a/agent/org.linkedin.glu.agent-impl/src/main/groovy/org/linkedin/glu/agent/impl/command/CommandManagerImpl.groovy b/agent/org.linkedin.glu.agent-impl/src/main/groovy/org/linkedin/glu/agent/impl/command/CommandManagerImpl.groovy index c8e056d6..cfeb67d8 100644 --- a/agent/org.linkedin.glu.agent-impl/src/main/groovy/org/linkedin/glu/agent/impl/command/CommandManagerImpl.groovy +++ b/agent/org.linkedin.glu.agent-impl/src/main/groovy/org/linkedin/glu/agent/impl/command/CommandManagerImpl.groovy @@ -75,17 +75,17 @@ public class CommandManagerImpl implements CommandManager def actionArgs = [*:command.args] // handle stdin... - storage.withStorageInput(StreamType.STDIN) { stdin -> + storage.withOrWithoutStorageInput(StreamType.STDIN) { stdin -> if(stdin) actionArgs.stdin = stdin // handle stdout... - storage.withStorageOutput(StreamType.STDOUT) { stdout -> + storage.withOrWithoutStorageOutput(StreamType.STDOUT) { stdout -> if(stdout) actionArgs.stdout = stdout // handle stderr - storage.withStorageOutput(StreamType.STDERR) { stderr -> + storage.withOrWithoutStorageOutput(StreamType.STDERR) { stderr -> if(stderr) actionArgs.stderr = stderr diff --git a/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/AbstractCommandStreamStorage.groovy b/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/AbstractCommandStreamStorage.groovy index d445dcb9..86c5b1b7 100644 --- a/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/AbstractCommandStreamStorage.groovy +++ b/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/AbstractCommandStreamStorage.groovy @@ -40,7 +40,7 @@ public abstract class AbstractCommandStreamStoragestream and size or null diff --git a/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/FileSystemCommandExecutionIOStorage.groovy b/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/FileSystemCommandExecutionIOStorage.groovy index b84656e0..a1632a1c 100644 --- a/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/FileSystemCommandExecutionIOStorage.groovy +++ b/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/FileSystemCommandExecutionIOStorage.groovy @@ -144,7 +144,7 @@ public class FileSystemCommandExecutionIOStorage extends AbstractCommandExecutio // save stdin if there is any if(stdin) - storage.withStorageOutput(StreamType.STDIN) { it << stdin } + storage.withOrWithoutStorageOutput(StreamType.STDIN) { it << stdin } return storage } diff --git a/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/MemoryCommandExecutionIOStorage.groovy b/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/MemoryCommandExecutionIOStorage.groovy index 0153cdb1..2defcc8b 100644 --- a/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/MemoryCommandExecutionIOStorage.groovy +++ b/commands/org.linkedin.glu.commands-impl/src/main/groovy/org/linkedin/glu/commands/impl/MemoryCommandExecutionIOStorage.groovy @@ -18,7 +18,6 @@ package org.linkedin.glu.commands.impl import org.slf4j.Logger import org.slf4j.LoggerFactory -import org.linkedin.util.annotations.Initializable import org.linkedin.glu.utils.collections.EvictingWithLRUPolicyMap import org.linkedin.util.annotations.Initializer @@ -31,14 +30,13 @@ public class MemoryCommandExecutionIOStorage extends AbstractCommandExecutionIOS public static final String MODULE = MemoryCommandExecutionIOStorage.class.getName(); public static final Logger log = LoggerFactory.getLogger(MODULE); - @Initializable - int maxNumberOfElements = 25 + private int _maxNumberOfElements = 25 /** * Completed commands: keeps a maximum number of elements. */ Map completedCommands = - new EvictingWithLRUPolicyMap(maxNumberOfElements) + new EvictingWithLRUPolicyMap(_maxNumberOfElements) /** * The commands that are currently executing */ @@ -48,10 +46,15 @@ public class MemoryCommandExecutionIOStorage extends AbstractCommandExecutionIOS * For the compile to stop bugging me with commands being non final... */ private final Object _lock = new Object() + int getMaxNumberOfElements() + { + return _maxNumberOfElements + } + @Initializer void setMaxNumberOfElements(int maxNumberOfElements) { - this.maxNumberOfElements = maxNumberOfElements + _maxNumberOfElements = maxNumberOfElements completedCommands = new EvictingWithLRUPolicyMap(maxNumberOfElements) } diff --git a/commands/org.linkedin.glu.commands-impl/src/test/groovy/test/commands/impl/TestFileSystemCommandExecutionIOStorage.groovy b/commands/org.linkedin.glu.commands-impl/src/test/groovy/test/commands/impl/TestFileSystemCommandExecutionIOStorage.groovy index 8231d0c9..d7383d5f 100644 --- a/commands/org.linkedin.glu.commands-impl/src/test/groovy/test/commands/impl/TestFileSystemCommandExecutionIOStorage.groovy +++ b/commands/org.linkedin.glu.commands-impl/src/test/groovy/test/commands/impl/TestFileSystemCommandExecutionIOStorage.groovy @@ -112,12 +112,12 @@ public class TestFileSystemCommandExecutionIOStorage extends GroovyTestCase long completionTime = 0L def processing = { CommandStreamStorage storage -> - storage.withStorageInput(StreamType.STDIN) { stdin -> + storage.withOrWithoutStorageInput(StreamType.STDIN) { stdin -> assertNull("no stdin", stdin) - storage.withStorageOutput(StreamType.STDOUT) { stdout -> + storage.withOrWithoutStorageOutput(StreamType.STDOUT) { stdout -> - storage.withStorageOutput(StreamType.STDERR) { stderr-> + storage.withOrWithoutStorageOutput(StreamType.STDERR) { stderr-> stdout << "out0" stderr << "err0" @@ -268,12 +268,12 @@ public class TestFileSystemCommandExecutionIOStorage extends GroovyTestCase assertFalse(fs.toResource("${path}/${commandId}/${ioStorage.stderrStreamFileName}").exists()) def processing = { CommandStreamStorage storage -> - storage.withStorageInput(StreamType.STDIN) { stdin -> + storage.withOrWithoutStorageInput(StreamType.STDIN) { stdin -> assertEquals("in0", stdin.text) - storage.withStorageOutput(StreamType.STDOUT) { stdout -> + storage.withOrWithoutStorageOutput(StreamType.STDOUT) { stdout -> - storage.withStorageOutput(StreamType.STDERR) { stderr-> + storage.withOrWithoutStorageOutput(StreamType.STDERR) { stderr-> stdout << "out0" stderr << "err0" @@ -420,12 +420,12 @@ public class TestFileSystemCommandExecutionIOStorage extends GroovyTestCase assertFalse(fs.toResource("${path}/${commandId}/${ioStorage.stderrStreamFileName}").exists()) def processing = { CommandStreamStorage storage -> - storage.withStorageInput(StreamType.STDIN) { stdin -> + storage.withOrWithoutStorageInput(StreamType.STDIN) { stdin -> assertNull("no stdin", stdin) - storage.withStorageOutput(StreamType.STDOUT) { stdout -> + storage.withOrWithoutStorageOutput(StreamType.STDOUT) { stdout -> - storage.withStorageOutput(StreamType.STDERR) { stderr-> + storage.withOrWithoutStorageOutput(StreamType.STDERR) { stderr-> assertNull("stderr redirected", stderr) stdout << "out0" @@ -550,11 +550,11 @@ public class TestFileSystemCommandExecutionIOStorage extends GroovyTestCase assertFalse(fs.toResource("${path}/${commandId}/${ioStorage.stderrStreamFileName}").exists()) def processing = { CommandStreamStorage storage -> - storage.withStorageInput(StreamType.STDIN) { stdin -> + storage.withOrWithoutStorageInput(StreamType.STDIN) { stdin -> - storage.withStorageOutput(StreamType.STDOUT) { stdout -> + storage.withOrWithoutStorageOutput(StreamType.STDOUT) { stdout -> - storage.withStorageOutput(StreamType.STDERR) { stderr-> + storage.withOrWithoutStorageOutput(StreamType.STDERR) { stderr-> stdout.write("o0".bytes) stderr.write("e0".bytes) diff --git a/commands/org.linkedin.glu.commands-impl/src/test/groovy/test/commands/impl/TestMemoryCommandExecutionIOStorage.groovy b/commands/org.linkedin.glu.commands-impl/src/test/groovy/test/commands/impl/TestMemoryCommandExecutionIOStorage.groovy index df419a81..9863523d 100644 --- a/commands/org.linkedin.glu.commands-impl/src/test/groovy/test/commands/impl/TestMemoryCommandExecutionIOStorage.groovy +++ b/commands/org.linkedin.glu.commands-impl/src/test/groovy/test/commands/impl/TestMemoryCommandExecutionIOStorage.groovy @@ -87,12 +87,12 @@ public class TestMemoryCommandExecutionIOStorage extends GroovyTestCase assertEquals(1, ioStorage.executingCommands.size()) assertTrue(ioStorage.executingCommands[ce.id].is(ce)) - storage.withStorageInput(StreamType.STDIN) { stdin -> + storage.withOrWithoutStorageInput(StreamType.STDIN) { stdin -> assertNull("no stdin", stdin) - storage.withStorageOutput(StreamType.STDOUT) { stdout -> + storage.withOrWithoutStorageOutput(StreamType.STDOUT) { stdout -> - storage.withStorageOutput(StreamType.STDERR) { stderr-> + storage.withOrWithoutStorageOutput(StreamType.STDERR) { stderr-> stdout << "out0" stderr << "err0" @@ -200,12 +200,12 @@ public class TestMemoryCommandExecutionIOStorage extends GroovyTestCase assertEquals(1, ioStorage.executingCommands.size()) assertTrue(ioStorage.executingCommands[ce.id].is(ce)) - storage.withStorageInput(StreamType.STDIN) { stdin -> + storage.withOrWithoutStorageInput(StreamType.STDIN) { stdin -> assertEquals("in0", stdin.text) - storage.withStorageOutput(StreamType.STDOUT) { stdout -> + storage.withOrWithoutStorageOutput(StreamType.STDOUT) { stdout -> - storage.withStorageOutput(StreamType.STDERR) { stderr-> + storage.withOrWithoutStorageOutput(StreamType.STDERR) { stderr-> stdout << "out0" stderr << "err0" @@ -308,12 +308,12 @@ public class TestMemoryCommandExecutionIOStorage extends GroovyTestCase assertEquals(1, ioStorage.executingCommands.size()) assertTrue(ioStorage.executingCommands[ce.id].is(ce)) - storage.withStorageInput(StreamType.STDIN) { stdin -> + storage.withOrWithoutStorageInput(StreamType.STDIN) { stdin -> assertNull("no stdin", stdin) - storage.withStorageOutput(StreamType.STDOUT) { stdout -> + storage.withOrWithoutStorageOutput(StreamType.STDOUT) { stdout -> - storage.withStorageOutput(StreamType.STDERR) { stderr-> + storage.withOrWithoutStorageOutput(StreamType.STDERR) { stderr-> assertNull("stderr redirected", stderr) stdout << "out0" @@ -402,11 +402,11 @@ public class TestMemoryCommandExecutionIOStorage extends GroovyTestCase long startTime = clock.currentTimeMillis() def processing = { CommandStreamStorage storage -> - storage.withStorageInput(StreamType.STDIN) { stdin -> + storage.withOrWithoutStorageInput(StreamType.STDIN) { stdin -> - storage.withStorageOutput(StreamType.STDOUT) { stdout -> + storage.withOrWithoutStorageOutput(StreamType.STDOUT) { stdout -> - storage.withStorageOutput(StreamType.STDERR) { stderr-> + storage.withOrWithoutStorageOutput(StreamType.STDERR) { stderr-> stdout.write("o0".bytes) stderr.write("e0".bytes) diff --git a/console/org.linkedin.glu.console-webapp/grails-app/conf/spring/resources.groovy b/console/org.linkedin.glu.console-webapp/grails-app/conf/spring/resources.groovy index f87b5d06..b09f9756 100644 --- a/console/org.linkedin.glu.console-webapp/grails-app/conf/spring/resources.groovy +++ b/console/org.linkedin.glu.console-webapp/grails-app/conf/spring/resources.groovy @@ -28,6 +28,7 @@ import org.linkedin.glu.utils.core.DisabledFeatureProxy import org.linkedin.glu.orchestration.engine.commands.CommandsService import org.linkedin.glu.orchestration.engine.commands.CommandExecutionStorageImpl import org.linkedin.glu.commands.impl.FileSystemCommandExecutionIOStorage +import org.linkedin.glu.commands.impl.MemoryCommandExecutionIOStorage // Place your Spring DSL code here beans = { @@ -70,18 +71,19 @@ beans = { { case Environment.DEVELOPMENT: case Environment.TEST: + commandExecutionIOStorage(MemoryCommandExecutionIOStorage) + break + + default: commandExecutionFileSystem(FileSystemImpl) { bean -> bean.factoryMethod = "createTempFileSystem" bean.destroyMethod = "destroy" } - break - default: - throw new RuntimeException("TODO") - } - - commandExecutionIOStorage(FileSystemCommandExecutionIOStorage) { - commandExecutionFileSystem = ref("commandExecutionFileSystem") + commandExecutionIOStorage(FileSystemCommandExecutionIOStorage) { + commandExecutionFileSystem = ref("commandExecutionFileSystem") + } + break } commandsService(CommandsServiceImpl) { diff --git a/console/org.linkedin.glu.console-webapp/grails-app/controllers/org/linkedin/glu/console/controllers/CommandsController.groovy b/console/org.linkedin.glu.console-webapp/grails-app/controllers/org/linkedin/glu/console/controllers/CommandsController.groovy index 65340ef8..bca8caaa 100644 --- a/console/org.linkedin.glu.console-webapp/grails-app/controllers/org/linkedin/glu/console/controllers/CommandsController.groovy +++ b/console/org.linkedin.glu.console-webapp/grails-app/controllers/org/linkedin/glu/console/controllers/CommandsController.groovy @@ -69,7 +69,7 @@ public class CommandsController extends ControllerBase /** * Writes the stream requested * - * curl -v -u "glua:password" "http://localhost:8080/console/rest/v1/glu-dev-1/command/2d044e0b-a1f5-4cbd-9210-cf42c77f6e94/stdout" + * curl -v -u "glua:password" "http://localhost:8080/console/rest/v1/glu-dev-1/command/2d044e0b-a1f5-4cbd-9210-cf42c77f6e94/streams?stdoutStream=true" */ def rest_show_command_execution_streams = { try diff --git a/console/org.linkedin.glu.console-webapp/grails-app/views/commands/_command.gsp b/console/org.linkedin.glu.console-webapp/grails-app/views/commands/_command.gsp index 524f7a01..8cbca61d 100644 --- a/console/org.linkedin.glu.console-webapp/grails-app/views/commands/_command.gsp +++ b/console/org.linkedin.glu.console-webapp/grails-app/views/commands/_command.gsp @@ -15,5 +15,5 @@ --}% -
×
${command.username.encodeAsHTML()}@${command.agent.encodeAsHTML()}# ${command.command.encodeAsHTML()} 2>&1 [] [${'$'}?=${command.exitValue?.encodeAsHTML()}]
${command.username.encodeAsHTML()}@${command.agent.encodeAsHTML()}# []
Spinner
setTimeout("renderCommand('${command.commandId}', 'shell-${command.commandId}')", '${params.refreshRate ?: 2000}')
+
×
${command.username.encodeAsHTML()}@${command.agent.encodeAsHTML()}# ${command.command.encodeAsHTML()} 2>&1 [] [${'$'}?=${command.exitValue?.encodeAsHTML()}]
${command.username.encodeAsHTML()}@${command.agent.encodeAsHTML()}# []
Spinner
setTimeout("renderCommand('${command.commandId}', 'shell-${command.commandId}')", '${params.refreshRate ?: 2000}')
diff --git a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStorageImpl.groovy b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStorageImpl.groovy index 02e4941f..28ed656e 100644 --- a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStorageImpl.groovy +++ b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStorageImpl.groovy @@ -80,7 +80,7 @@ public class CommandExecutionStorageImpl implements CommandExecutionStorage } else { - execution.endTime = endTime + execution.completionTime = endTime execution.stdoutFirstBytes = stdoutFirstBytes execution.stdoutTotalBytesCount = stdoutTotalBytesCount execution.stderrFirstBytes = stderrFirstBytes @@ -105,10 +105,9 @@ public class CommandExecutionStorageImpl implements CommandExecutionStorage @Override Map findCommandExecutions(String fabric, String agent, def params) { - params = GluGroovyCollectionUtils.subMap(params, ['offset', 'max', 'sort', 'order']) + params = GluGroovyCollectionUtils.subMap(params ?: [:], ['offset', 'max', 'sort', 'order']) - if(params.offset == null) - params.offset = 0 + params.offset = params.offset?.toInteger() ?: 0 params.max = Math.min(params.max ? params.max.toInteger() : maxResults, maxResults) params.sort = params.sort ?: 'startTime' params.order = params.order ?: 'desc' diff --git a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStream.groovy b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStream.groovy new file mode 100644 index 00000000..f849fe4b --- /dev/null +++ b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandExecutionStream.groovy @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2012 Yan Pujante + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.linkedin.glu.orchestration.engine.commands + +import org.linkedin.glu.commands.impl.StreamType +import org.linkedin.glu.commands.impl.CommandStreamStorage +import org.linkedin.glu.utils.io.LimitedOutputStream +import org.apache.tools.ant.util.TeeOutputStream +import org.linkedin.util.lang.MemorySize + +/** + * Helper class to manage stream capture + * + * @author yan@pongasoft.com */ +public class CommandExecutionStream +{ + MemorySize commandExecutionFirstBytesSize + StreamType streamType + boolean captureStream + CommandStreamStorage storage + def streams + + private ByteArrayOutputStream _firstBytesOutputStream + private LimitedOutputStream _limitedOutputStream + private OutputStream _stream + + def capture(Closure c) + { + if(captureStream) + { + _firstBytesOutputStream = + new ByteArrayOutputStream((int) commandExecutionFirstBytesSize.sizeInBytes) + _limitedOutputStream = + new LimitedOutputStream(_firstBytesOutputStream, commandExecutionFirstBytesSize) + + storage.withOrWithoutStorageOutput(streamType) { OutputStream stream -> + + if(stream) + _stream = new TeeOutputStream(stream, _limitedOutputStream) + else + _stream = _limitedOutputStream + + streams[streamType.multiplexName] = _stream + } + } + + c(this) + } + + byte[] getBytes() + { + _firstBytesOutputStream?.toByteArray() + } + + Long getTotalNumberOfBytes() + { + _limitedOutputStream?.totalNumberOfBytes + } +} diff --git a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandsServiceImpl.groovy b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandsServiceImpl.groovy index 1a67a206..fe1b9606 100644 --- a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandsServiceImpl.groovy +++ b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/CommandsServiceImpl.groovy @@ -25,7 +25,6 @@ import org.linkedin.glu.orchestration.engine.authorization.AuthorizationService import org.linkedin.glu.orchestration.engine.commands.DbCommandExecution.CommandType import org.linkedin.glu.orchestration.engine.fabric.Fabric import org.linkedin.glu.utils.io.DemultiplexedOutputStream -import org.linkedin.glu.utils.io.LimitedOutputStream import org.linkedin.glu.utils.io.NullOutputStream import org.linkedin.util.annotations.Initializable import org.linkedin.util.clock.Clock @@ -43,7 +42,7 @@ import org.linkedin.glu.commands.impl.CommandExecution import org.linkedin.glu.commands.impl.CommandExecutionIOStorage import org.linkedin.glu.commands.impl.CommandStreamStorage import org.linkedin.glu.commands.impl.GluCommandFactory -import org.apache.tools.ant.util.TeeOutputStream +import org.linkedin.groovy.util.lang.GroovyLangUtils /** * @author yan@pongasoft.com */ @@ -94,54 +93,6 @@ public class CommandsServiceImpl implements CommandsService @Initializable Timespan defaultSynchronousWaitTimeout = Timespan.parse('1s') - /** - * Helper class to manage stream capture - */ - private class CommandExecutionStream - { - StreamType streamType - boolean captureStream - CommandStreamStorage storage - def streams - - private ByteArrayOutputStream _firstBytesOutputStream - private LimitedOutputStream _limitedOutputStream - private OutputStream _stream - - def capture(Closure c) - { - if(captureStream) - { - _firstBytesOutputStream = - new ByteArrayOutputStream((int) commandExecutionFirstBytesSize.sizeInBytes) - _limitedOutputStream = - new LimitedOutputStream(_firstBytesOutputStream, commandExecutionFirstBytesSize) - - storage.withStorageOutput(streamType) { OutputStream stream -> - - if(stream) - _stream = new TeeOutputStream(stream, _limitedOutputStream) - else - _stream = _limitedOutputStream - - streams[streamType.multiplexName] = _stream - } - } - - c(this) - } - - byte[] getBytes() - { - _firstBytesOutputStream?.toByteArray() - } - - Long getTotalNumberOfBytes() - { - _limitedOutputStream?.totalNumberOfBytes - } - } - /** * The commands that are currently executing */ private final Map> _currentCommandExecutions = [:] @@ -202,14 +153,17 @@ public class CommandsServiceImpl implements CommandsService NullOutputStream.INSTANCE << res.stream } - try + if(defaultSynchronousWaitTimeout) { - // we are willing to wait a little bit for the command to complete before returning - command.getExitValue(defaultSynchronousWaitTimeout) - } - catch (TimeoutException e) - { - // it is ok... we did not get any result during this amount of time + try + { + // we are willing to wait a little bit for the command to complete before returning + command.getExitValue(defaultSynchronousWaitTimeout) + } + catch (TimeoutException e) + { + // it is ok... we did not get any result during this amount of time + } } return command.id @@ -240,71 +194,92 @@ public class CommandsServiceImpl implements CommandsService // define what will do asynchronously def asyncProcessing = { CommandStreamStorage storage -> - def agentArgs = [*:command.args] + try + { + def agentArgs = [*:command.args] - // we execute the command on the proper agent (this is an asynchronous call which return - // right away) - storage.withStorageInput(StreamType.STDIN) { stdin -> - if(stdin) - agentArgs.stdin = stdin + // we execute the command on the proper agent (this is an asynchronous call which return + // right away) + storage.withOrWithoutStorageInput(StreamType.STDIN) { stdin -> + if(stdin) + agentArgs.stdin = stdin - agentsService.executeShellCommand(fabric, agentName, agentArgs) - } - - def streamResultArgs = [ - id: command.id, - exitValueStream: true, - exitValueStreamTimeout: 0, // we block until the command completes - stdoutStream: true, - username: args.username - ] + agentsService.executeShellCommand(fabric, agentName, agentArgs) + } - if(!command.redirectStderr) - streamResultArgs.stderrStream = true + def streamResultArgs = [ + id: command.id, + exitValueStream: true, + exitValueStreamTimeout: 0, // we block until the command completes + stdoutStream: true, + username: args.username + ] - // this is a blocking call - agentsService.streamCommandResults(fabric, agentName, streamResultArgs) { res -> + if(!command.redirectStderr) + streamResultArgs.stderrStream = true - def streams = [:] + // this is a blocking call + agentsService.streamCommandResults(fabric, agentName, streamResultArgs) { res -> - // stdout - new CommandExecutionStream(streamType: StreamType.STDOUT, - captureStream: true, - storage: storage, - streams: streams).capture { stdout -> + def streams = [:] - // stderr - new CommandExecutionStream(streamType: StreamType.STDERR, - captureStream: !command.redirectStderr, + // stdout + new CommandExecutionStream(streamType: StreamType.STDOUT, + commandExecutionFirstBytesSize: commandExecutionFirstBytesSize, + captureStream: true, storage: storage, - streams: streams).capture { stderr -> - - // exitValue - ByteArrayOutputStream exitValueStream = new ByteArrayOutputStream() - streams[StreamType.EXIT_VALUE.multiplexName] = exitValueStream - - // this will demultiplex the result - DemultiplexedOutputStream dos = new DemultiplexedOutputStream(streams) - - dos.withStream { OutputStream os -> - onResultStreamAvailable(id: command.id, stream: new TeeInputStream(res.stream, os)) + streams: streams).capture { stdout -> + + // stderr + new CommandExecutionStream(streamType: StreamType.STDERR, + commandExecutionFirstBytesSize: commandExecutionFirstBytesSize, + captureStream: !command.redirectStderr, + storage: storage, + streams: streams).capture { stderr -> + + // exitValue + ByteArrayOutputStream exitValueStream = new ByteArrayOutputStream() + streams[StreamType.EXIT_VALUE.multiplexName] = exitValueStream + + // this will demultiplex the result + DemultiplexedOutputStream dos = new DemultiplexedOutputStream(streams) + + dos.withStream { OutputStream os -> + onResultStreamAvailable(id: command.id, stream: new TeeInputStream(res.stream, os)) + } + + long completionTime = clock.currentTimeMillis() + + // we now update the storage with the various results + def exitValue = commandExecutionStorage.endExecution(command.id, + completionTime, + stdout.bytes, + stdout.totalNumberOfBytes, + stderr.bytes, + stderr.totalNumberOfBytes, + toString(exitValueStream)).exitValue + + return [exitValue: exitValue, completionTime: completionTime] } - - long completionTime = clock.currentTimeMillis() - - // we now update the storage with the various results - def exitValue = commandExecutionStorage.endExecution(command.id, - completionTime, - stdout.bytes, - stdout.totalNumberOfBytes, - stderr.bytes, - stderr.totalNumberOfBytes, - toString(exitValueStream)).exitValue - - return [exitValue: exitValue, completionTime: completionTime] } } } + catch(Throwable th) + { + long completionTime = clock.currentTimeMillis() + + GroovyLangUtils.noException { + commandExecutionStorage.endExecution(command.id, + completionTime, + null, + null, + null, + null, + null) + } + + return [completionTime: completionTime, exception: th] + } } // what to do when the command ends diff --git a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/DbCommandExecution.groovy b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/DbCommandExecution.groovy index fe9d3c6b..4f8a6efd 100644 --- a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/DbCommandExecution.groovy +++ b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/DbCommandExecution.groovy @@ -39,7 +39,7 @@ public class DbCommandExecution commandId(unique: true, nullable: false, blank: false) command(nullable: false, blank: false) commandType(nullable: false) - endTime(nullable: true) + completionTime(nullable: true) fabric(nullable: false, blank: false) agent(nullable: false, blank: false) username(nullable: false, blank: false) @@ -80,15 +80,15 @@ public class DbCommandExecution /** * time the command ended */ - Long endTime + Long completionTime /** * @return the duration */ Timespan getDuration() { - if(endTime) - return new Timespan(endTime - startTime) + if(completionTime) + return new Timespan(completionTime - startTime) else return null } diff --git a/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/MemoryCommandExecutionStorage.groovy b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/MemoryCommandExecutionStorage.groovy new file mode 100644 index 00000000..8022b76d --- /dev/null +++ b/orchestration/org.linkedin.glu.orchestration-engine/src/main/groovy/org/linkedin/glu/orchestration/engine/commands/MemoryCommandExecutionStorage.groovy @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2012 Yan Pujante + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.linkedin.glu.orchestration.engine.commands + +import org.linkedin.glu.utils.collections.EvictingWithLRUPolicyMap +import org.linkedin.util.annotations.Initializable +import org.linkedin.util.annotations.Initializer +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.linkedin.glu.groovy.utils.collections.GluGroovyCollectionUtils + +/** + * Mostly used for testing purposes... + * + * @author yan@pongasoft.com */ +public class MemoryCommandExecutionStorage implements CommandExecutionStorage +{ + public static final String MODULE = MemoryCommandExecutionStorage.class.getName(); + public static final Logger log = LoggerFactory.getLogger(MODULE); + + @Initializable + int maxResults = 25 + + private int _maxNumberOfElements = 25 + + @Initializable(required=true) + Map memory = + new EvictingWithLRUPolicyMap(_maxNumberOfElements) + + int getMaxNumberOfElements() + { + return _maxNumberOfElements + } + + @Initializer + void setMaxNumberOfElements(int maxNumberOfElements) + { + _maxNumberOfElements = maxNumberOfElements + memory = new EvictingWithLRUPolicyMap(maxNumberOfElements) + } + + @Override + synchronized DbCommandExecution startExecution(String fabric, + String agent, + String username, + String command, + boolean redirectStderr, + byte[] stdinFirstBytes, + Long stdinTotalBytesCount, + String commandId, + DbCommandExecution.CommandType commandType, + long startTime) + { + DbCommandExecution res = new DbCommandExecution(fabric: fabric, + agent: agent, + username: username, + command: command, + redirectStderr: redirectStderr, + stdinFirstBytes: stdinFirstBytes, + stdinTotalBytesCount: stdinTotalBytesCount, + commandId: commandId, + commandType: commandType, + startTime: startTime) + + memory[commandId] = res + + return res + } + + @Override + synchronized DbCommandExecution endExecution(String commandId, + long endTime, + byte[] stdoutFirstBytes, + Long stdoutTotalBytesCount, + byte[] stderrFirstBytes, + Long stderrTotalBytesCount, + String exitValue) + { + DbCommandExecution execution = memory[commandId] + if(!execution) + { + log.warn("could not find command execution ${commandId}") + } + else + { + execution.completionTime = endTime + execution.stdoutFirstBytes = stdoutFirstBytes + execution.stdoutTotalBytesCount = stdoutTotalBytesCount + execution.stderrFirstBytes = stderrFirstBytes + execution.stderrTotalBytesCount = stderrTotalBytesCount + execution.exitValue = exitValue + } + + return execution + } + + @Override + synchronized DbCommandExecution findCommandExecution(String fabric, String commandId) + { + DbCommandExecution execution = memory[commandId] + if(execution?.fabric == fabric) + return execution + else + return null + } + + @Override + synchronized Map findCommandExecutions(String fabric, String agent, def params) + { + params = GluGroovyCollectionUtils.subMap(params ?: [:], ['offset', 'max', 'sort', 'order']) + + params.offset = params.offset?.toInteger() ?: 0 + params.max = Math.min(params.max ? params.max.toInteger() : maxResults, maxResults) + params.sort = params.sort ?: 'startTime' + params.order = params.order ?: 'desc' + + def ces = memory.values().findAll { DbCommandExecution ce -> + ce.fabric == fabric && (agent == null || ce.agent == agent) + } + + def count = ces.size() + + // paginate + ces = GluGroovyCollectionUtils.paginate(ces, params.max, params.offset) + + return [commandExecutions: ces, count: count] + } +} \ No newline at end of file diff --git a/orchestration/org.linkedin.glu.orchestration-engine/src/test/groovy/test/orchestration/engine/commands/TestCommandsServiceImpl.groovy b/orchestration/org.linkedin.glu.orchestration-engine/src/test/groovy/test/orchestration/engine/commands/TestCommandsServiceImpl.groovy new file mode 100644 index 00000000..9a0b0a95 --- /dev/null +++ b/orchestration/org.linkedin.glu.orchestration-engine/src/test/groovy/test/orchestration/engine/commands/TestCommandsServiceImpl.groovy @@ -0,0 +1,537 @@ +/* + * Copyright (c) 2012 Yan Pujante + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package test.orchestration.engine.commands + +import groovy.mock.interceptor.MockFor +import org.linkedin.glu.commands.impl.MemoryCommandExecutionIOStorage +import org.linkedin.glu.commands.impl.StreamType +import org.linkedin.glu.groovy.utils.GluGroovyLangUtils +import org.linkedin.glu.groovy.utils.io.InputGeneratorStream +import org.linkedin.glu.orchestration.engine.agents.AgentsService +import org.linkedin.glu.orchestration.engine.authorization.AuthorizationService +import org.linkedin.glu.orchestration.engine.commands.CommandsServiceImpl +import org.linkedin.glu.orchestration.engine.commands.MemoryCommandExecutionStorage +import org.linkedin.glu.orchestration.engine.fabric.Fabric +import org.linkedin.glu.utils.io.MultiplexedInputStream +import org.linkedin.util.clock.SettableClock + +import java.util.concurrent.Executors +import org.linkedin.groovy.util.json.JsonUtils +import org.linkedin.groovy.util.collections.GroovyCollectionsUtils +import org.linkedin.util.concurrent.ThreadControl +import org.linkedin.util.clock.Timespan +import org.linkedin.glu.orchestration.engine.commands.DbCommandExecution +import org.linkedin.glu.commands.impl.CommandExecution +import org.linkedin.util.lang.MemorySize + +/** + * @author yan@pongasoft.com */ +public class TestCommandsServiceImpl extends GroovyTestCase +{ + SettableClock clock = new SettableClock() + + MemoryCommandExecutionStorage storage = new MemoryCommandExecutionStorage() + MemoryCommandExecutionIOStorage ioStorage = new MemoryCommandExecutionIOStorage(clock: clock) + + + CommandsServiceImpl service = + new CommandsServiceImpl(clock: clock, + commandExecutionStorage: storage, + commandExecutionIOStorage: ioStorage, + commandExecutionFirstBytesSize: MemorySize.parse("5"), + defaultSynchronousWaitTimeout: null) + + def shutdownSequence = [] + + @Override + protected void setUp() + { + super.setUp() + shutdownSequence << { super.tearDown() } + + clock.setCurrentTimeMillis(100000000) + + def executorService = Executors.newCachedThreadPool() + shutdownSequence << { executorService.shutdownNow() } + service.executorService = executorService + + + } + + @Override + protected void tearDown() + { + GluGroovyLangUtils.onlyOneException(shutdownSequence.reverse()) + } + + /** + * Test for the basic case + */ + public void testHappyPath() + { + def tc = new ThreadControl(Timespan.parse('30s')) + + def f1 = new Fabric(name: "f1") + + withAuthorizationService { + + def agentsServiceMock = new MockFor(AgentsService) + + def commandId = null + + // executeShellCommand + agentsServiceMock.demand.executeShellCommand { fabric, agentName, args -> + assertEquals(f1, fabric) + assertEquals("a1", agentName) + + def copyOfArgs = [*:args] + commandId = copyOfArgs.remove("id") + + assertEqualsIgnoreType([ + command: 'uptime', + type: 'shell', + fabric: 'f1', + agent: 'a1', + username: 'u1', + redirectStderr:false + ], + copyOfArgs) + + clock.addDuration(Timespan.parse("10s")) + + tc.block("executeShellCommand.end") + } + + // stream result (this will be called asynchronously!) + agentsServiceMock.demand.streamCommandResults { Fabric fabric, + String agentName, + args, + Closure commandResultProcessor -> + assertEquals(f1, fabric) + assertEquals("a1", agentName) + + assertEqualsIgnoreType([ + id: commandId, + exitValueStream: true, + exitValueStreamTimeout: 0, + stdoutStream:true, + username: 'u1', + stderrStream:true + ], + args) + + def streams = [:] + + InputStream exitValueInputStream = new InputGeneratorStream({ 14 }) + streams[StreamType.EXIT_VALUE.multiplexName] = exitValueInputStream + + streams[StreamType.STDOUT.multiplexName] = new ByteArrayInputStream("O123456789".bytes) + streams[StreamType.STDERR.multiplexName] = new ByteArrayInputStream("E123456789".bytes) + + commandResultProcessor([stream: new MultiplexedInputStream(streams)]) + } + + AgentsService agentsService = agentsServiceMock.proxyInstance() + service.agentsService = agentsService + + long startTime = clock.currentTimeMillis() + + // execute the shell command + String cid = service.executeShellCommand(f1, "a1", [command: "uptime"]) + + // we wait until the shell command is "executing" + tc.waitForBlock("executeShellCommand.end") + + long completionTime = clock.currentTimeMillis() + + // we make sure that we got the same commandId + assertEquals(cid, commandId) + + CommandExecution ce = service._currentCommandExecutions[cid] + assertTrue(ce.command.isExecuting) + + def dbCommandExecution = service.findCommandExecution(f1, cid) + assertEquals(cid, dbCommandExecution.commandId) + assertEquals("uptime", dbCommandExecution.command) + assertFalse(dbCommandExecution.redirectStderr) + assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) + assertEquals(startTime, dbCommandExecution.startTime) + assertNull(dbCommandExecution.completionTime) + assertEquals(f1.name, dbCommandExecution.fabric) + assertEquals("a1", dbCommandExecution.agent) + assertNull(dbCommandExecution.stdinFirstBytes) + assertNull(dbCommandExecution.stdinTotalBytesCount) + assertNull(dbCommandExecution.stdoutFirstBytes) + assertNull(dbCommandExecution.stdoutTotalBytesCount) + assertNull(dbCommandExecution.stderrFirstBytes) + assertNull(dbCommandExecution.stderrTotalBytesCount) + assertTrue(dbCommandExecution.isExecuting) + + // test bulk methods + def ces = service.findCommandExecutions(f1, null, null) + assertEquals(1, ces.count) + assertEquals(1, ces.commandExecutions.size()) + assertTrue(ces.commandExecutions[0].isExecuting) + + // we can now unblock the call + tc.unblock("executeShellCommand.end") + + // get the results + service.withCommandExecutionAndWithOrWithoutStreams(f1, + cid, + [ + stdoutStream: true, + exitValueStream: true, + exitValueStreamTimeout: 0 + ]) { args -> + + def streams = + MultiplexedInputStream.demultiplexToString(args.stream, + [ + StreamType.EXIT_VALUE.multiplexName, + StreamType.STDOUT.multiplexName + ] as Set, + null) + + assertEquals("14", streams[StreamType.EXIT_VALUE.multiplexName]) + assertEquals("O123456789", streams[StreamType.STDOUT.multiplexName]) + } + + agentsServiceMock.verify(agentsService) + + assertNull("command is completed", service._currentCommandExecutions[cid]) + dbCommandExecution = service.findCommandExecution(f1, cid) + assertEquals(cid, dbCommandExecution.commandId) + assertEquals("uptime", dbCommandExecution.command) + assertFalse(dbCommandExecution.redirectStderr) + assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) + assertEquals(startTime, dbCommandExecution.startTime) + assertEquals(completionTime, dbCommandExecution.completionTime) + assertEquals(f1.name, dbCommandExecution.fabric) + assertEquals("a1", dbCommandExecution.agent) + assertNull(dbCommandExecution.stdinFirstBytes) + assertNull(dbCommandExecution.stdinTotalBytesCount) + assertEquals("O1234", new String(dbCommandExecution.stdoutFirstBytes)) + assertEquals(10, dbCommandExecution.stdoutTotalBytesCount) + assertEquals("E1234", new String(dbCommandExecution.stderrFirstBytes)) + assertEquals(10, dbCommandExecution.stderrTotalBytesCount) + assertEquals("14", dbCommandExecution.exitValue) + assertFalse(dbCommandExecution.isExecuting) + } + } + + /** + * Test with stdin and redirectStderr + */ + public void testStdinAndRedirectStderr() + { + def tc = new ThreadControl(Timespan.parse('30s')) + + def f1 = new Fabric(name: "f1") + + withAuthorizationService { + + def agentsServiceMock = new MockFor(AgentsService) + + def commandId = null + + // executeShellCommand + agentsServiceMock.demand.executeShellCommand { fabric, agentName, args -> + assertEquals(f1, fabric) + assertEquals("a1", agentName) + + def copyOfArgs = [*:args] + commandId = copyOfArgs.remove("id") + copyOfArgs.stdin = args.stdin.text + + assertEqualsIgnoreType([ + command: 'uptime', + type: 'shell', + fabric: 'f1', + agent: 'a1', + username: 'u1', + redirectStderr:true, + stdin: "I123456789" + ], + copyOfArgs) + + clock.addDuration(Timespan.parse("10s")) + + tc.block("executeShellCommand.end") + } + + // stream result (this will be called asynchronously!) + agentsServiceMock.demand.streamCommandResults { Fabric fabric, + String agentName, + args, + Closure commandResultProcessor -> + assertEquals(f1, fabric) + assertEquals("a1", agentName) + + assertEqualsIgnoreType([ + id: commandId, + exitValueStream: true, + exitValueStreamTimeout: 0, + stdoutStream:true, + username: 'u1', + ], + args) + + def streams = [:] + + InputStream exitValueInputStream = new InputGeneratorStream({ 14 }) + streams[StreamType.EXIT_VALUE.multiplexName] = exitValueInputStream + + streams[StreamType.STDOUT.multiplexName] = new ByteArrayInputStream("O123456789".bytes) + + commandResultProcessor([stream: new MultiplexedInputStream(streams)]) + } + + AgentsService agentsService = agentsServiceMock.proxyInstance() + service.agentsService = agentsService + + long startTime = clock.currentTimeMillis() + + // execute the shell command + String cid = service.executeShellCommand(f1, + "a1", + [ + command: "uptime", + stdin: new ByteArrayInputStream("I123456789".bytes), + redirectStderr: true + ]) + + // we wait until the shell command is "executing" + tc.waitForBlock("executeShellCommand.end") + + long completionTime = clock.currentTimeMillis() + + // we make sure that we got the same commandId + assertEquals(cid, commandId) + + CommandExecution ce = service._currentCommandExecutions[cid] + assertTrue(ce.command.isExecuting) + + def dbCommandExecution = service.findCommandExecution(f1, cid) + assertEquals(cid, dbCommandExecution.commandId) + assertEquals("uptime", dbCommandExecution.command) + assertTrue(dbCommandExecution.redirectStderr) + assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) + assertEquals(startTime, dbCommandExecution.startTime) + assertNull(dbCommandExecution.completionTime) + assertEquals(f1.name, dbCommandExecution.fabric) + assertEquals("a1", dbCommandExecution.agent) + assertEquals("I1234", new String(dbCommandExecution.stdinFirstBytes)) + assertEquals(10, dbCommandExecution.stdinTotalBytesCount) + assertNull(dbCommandExecution.stdoutFirstBytes) + assertNull(dbCommandExecution.stdoutTotalBytesCount) + assertNull(dbCommandExecution.stderrFirstBytes) + assertNull(dbCommandExecution.stderrTotalBytesCount) + assertTrue(dbCommandExecution.isExecuting) + + // test bulk methods + def ces = service.findCommandExecutions(f1, null, null) + assertEquals(1, ces.count) + assertEquals(1, ces.commandExecutions.size()) + assertTrue(ces.commandExecutions[0].isExecuting) + + // we can now unblock the call + tc.unblock("executeShellCommand.end") + + // get the results + service.withCommandExecutionAndWithOrWithoutStreams(f1, + cid, + [ + stdoutStream: true, + exitValueStream: true, + exitValueStreamTimeout: 0 + ]) { args -> + + def streams = + MultiplexedInputStream.demultiplexToString(args.stream, + [ + StreamType.EXIT_VALUE.multiplexName, + StreamType.STDOUT.multiplexName + ] as Set, + null) + + assertEquals("14", streams[StreamType.EXIT_VALUE.multiplexName]) + assertEquals("O123456789", streams[StreamType.STDOUT.multiplexName]) + } + + agentsServiceMock.verify(agentsService) + + assertNull("command is completed", service._currentCommandExecutions[cid]) + dbCommandExecution = service.findCommandExecution(f1, cid) + assertEquals(cid, dbCommandExecution.commandId) + assertEquals("uptime", dbCommandExecution.command) + assertTrue(dbCommandExecution.redirectStderr) + assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) + assertEquals(startTime, dbCommandExecution.startTime) + assertEquals(completionTime, dbCommandExecution.completionTime) + assertEquals(f1.name, dbCommandExecution.fabric) + assertEquals("a1", dbCommandExecution.agent) + assertEquals("I1234", new String(dbCommandExecution.stdinFirstBytes)) + assertEquals(10, dbCommandExecution.stdinTotalBytesCount) + assertEquals("O1234", new String(dbCommandExecution.stdoutFirstBytes)) + assertEquals(10, dbCommandExecution.stdoutTotalBytesCount) + assertNull(dbCommandExecution.stderrFirstBytes) + assertNull(dbCommandExecution.stderrTotalBytesCount) + assertEquals("14", dbCommandExecution.exitValue) + assertFalse(dbCommandExecution.isExecuting) + } + } + + /** + * Test with failure when the command execute (note that typical use case here would be + * that we cannot talk to the agent...) + */ + public void testWithFailureInCommandExecution() + { + def tc = new ThreadControl(Timespan.parse('30s')) + + def f1 = new Fabric(name: "f1") + + withAuthorizationService { + + def agentsServiceMock = new MockFor(AgentsService) + + // executeShellCommand + agentsServiceMock.demand.executeShellCommand { fabric, agentName, args -> + clock.addDuration(Timespan.parse("10s")) + tc.blockWithException("executeShellCommand.end") + } + + AgentsService agentsService = agentsServiceMock.proxyInstance() + service.agentsService = agentsService + + long startTime = clock.currentTimeMillis() + + // execute the shell command + String cid = service.executeShellCommand(f1, + "a1", + [ + command: "uptime", + stdin: new ByteArrayInputStream("I123456789".bytes), + redirectStderr: true + ]) + + // we wait until the shell command is "executing" + tc.waitForBlock("executeShellCommand.end") + + long completionTime = clock.currentTimeMillis() + + CommandExecution ce = service._currentCommandExecutions[cid] + assertTrue(ce.command.isExecuting) + + def dbCommandExecution = service.findCommandExecution(f1, cid) + assertEquals(cid, dbCommandExecution.commandId) + assertEquals("uptime", dbCommandExecution.command) + assertTrue(dbCommandExecution.redirectStderr) + assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) + assertEquals(startTime, dbCommandExecution.startTime) + assertNull(dbCommandExecution.completionTime) + assertEquals(f1.name, dbCommandExecution.fabric) + assertEquals("a1", dbCommandExecution.agent) + assertEquals("I1234", new String(dbCommandExecution.stdinFirstBytes)) + assertEquals(10, dbCommandExecution.stdinTotalBytesCount) + assertNull(dbCommandExecution.stdoutFirstBytes) + assertNull(dbCommandExecution.stdoutTotalBytesCount) + assertNull(dbCommandExecution.stderrFirstBytes) + assertNull(dbCommandExecution.stderrTotalBytesCount) + assertTrue(dbCommandExecution.isExecuting) + + // test bulk methods + def ces = service.findCommandExecutions(f1, null, null) + assertEquals(1, ces.count) + assertEquals(1, ces.commandExecutions.size()) + assertTrue(ces.commandExecutions[0].isExecuting) + + def exception = new Exception("failure => agentsService.executeShellCommand") + + // we can now unblock the call + tc.unblock("executeShellCommand.end", exception) + + // get the results + service.withCommandExecutionAndWithOrWithoutStreams(f1, + cid, + [ + exitValueStream: true, + exitValueStreamTimeout: 0 + ]) { args -> + + try + { + MultiplexedInputStream.demultiplexToString(args.stream, + [ + StreamType.EXIT_VALUE.multiplexName, + ] as Set, + null) + fail("should have failed!") + } + catch(IOException e) + { + assertEquals(exception, e.cause) + } + } + + agentsServiceMock.verify(agentsService) + + assertNull("command is completed", service._currentCommandExecutions[cid]) + dbCommandExecution = service.findCommandExecution(f1, cid) + assertEquals(cid, dbCommandExecution.commandId) + assertEquals("uptime", dbCommandExecution.command) + assertTrue(dbCommandExecution.redirectStderr) + assertEquals(DbCommandExecution.CommandType.SHELL, dbCommandExecution.commandType) + assertEquals(startTime, dbCommandExecution.startTime) + assertEquals(completionTime, dbCommandExecution.completionTime) + assertEquals(f1.name, dbCommandExecution.fabric) + assertEquals("a1", dbCommandExecution.agent) + assertEquals("I1234", new String(dbCommandExecution.stdinFirstBytes)) + assertEquals(10, dbCommandExecution.stdinTotalBytesCount) + assertNull(dbCommandExecution.stdoutFirstBytes) + assertNull(dbCommandExecution.stdoutTotalBytesCount) + assertNull(dbCommandExecution.stderrFirstBytes) + assertNull(dbCommandExecution.stderrTotalBytesCount) + assertNull(dbCommandExecution.exitValue) + assertFalse(dbCommandExecution.isExecuting) + } + } + + private void withAuthorizationService(Closure closure) + { + def authorizationServiceMock = new MockFor(AuthorizationService) + authorizationServiceMock.demand.getExecutingPrincipal { "u1" } + + AuthorizationService authorizationService = authorizationServiceMock.proxyInstance() + service.authorizationService = authorizationService + + closure() + + authorizationServiceMock.verify(authorizationService) + } + + /** + * Convenient call to compare and ignore type + */ + void assertEqualsIgnoreType(o1, o2) + { + assertEquals(JsonUtils.prettyPrint(o1), JsonUtils.prettyPrint(o2)) + assertTrue("expected <${o1}> but was <${o2}>", GroovyCollectionsUtils.compareIgnoreType(o1, o2)) + } + +} \ No newline at end of file diff --git a/utils/org.linkedin.glu.utils/src/main/groovy/org/linkedin/glu/groovy/utils/collections/GluGroovyCollectionUtils.groovy b/utils/org.linkedin.glu.utils/src/main/groovy/org/linkedin/glu/groovy/utils/collections/GluGroovyCollectionUtils.groovy index 920ffd44..bc795a99 100644 --- a/utils/org.linkedin.glu.utils/src/main/groovy/org/linkedin/glu/groovy/utils/collections/GluGroovyCollectionUtils.groovy +++ b/utils/org.linkedin.glu.utils/src/main/groovy/org/linkedin/glu/groovy/utils/collections/GluGroovyCollectionUtils.groovy @@ -65,4 +65,23 @@ public class GluGroovyCollectionUtils extends GroovyCollectionsUtils subMap(map, newKeys) } + + /** + * Paginates a collection: return how many elements you want (which also represent the number of + * elements per "page" and an optional offset representing at which "page" to start + * + * @return the paginated collection + */ + static Collection paginate(Collection c, int max, int offset = 0) + { + if(c == null) + return null + + int firstIndex = max * offset + if(firstIndex >= c.size()) + return [] + int lastIndex = Math.min(firstIndex + max, c.size()) + + return c[firstIndex.. causes) thr */ public static void throwIfExceptions(String message, Collection causes) throws Throwable + { + Throwable throwable = createIfExceptions(message, causes); + if(throwable != null) + throw throwable; + } + + /** + * Convenient call to create a multiple exception or not depending on the collection + */ + public static Throwable createIfExceptions(Collection causes) + { + return createIfExceptions(null, causes); + } + + /** + * Convenient call to create a multiple exception or not depending on the collection + */ + public static Throwable createIfExceptions(String message, + Collection causes) { if(causes == null) - return; + return null; if(causes.isEmpty()) - return; + return null; if(causes.size() == 1) - throw causes.iterator().next(); + return causes.iterator().next(); - throw new MultipleExceptions(message, causes); + return new MultipleExceptions(message, causes); } } diff --git a/utils/org.linkedin.glu.utils/src/main/java/org/linkedin/glu/utils/io/MultiplexedInputStream.java b/utils/org.linkedin.glu.utils/src/main/java/org/linkedin/glu/utils/io/MultiplexedInputStream.java index 952d2d77..0f6b3fe5 100644 --- a/utils/org.linkedin.glu.utils/src/main/java/org/linkedin/glu/utils/io/MultiplexedInputStream.java +++ b/utils/org.linkedin.glu.utils/src/main/java/org/linkedin/glu/utils/io/MultiplexedInputStream.java @@ -16,6 +16,7 @@ package org.linkedin.glu.utils.io; +import org.linkedin.glu.utils.exceptions.MultipleExceptions; import org.linkedin.util.io.IOUtils; import org.linkedin.util.lang.MemorySize; import org.slf4j.Logger; @@ -322,18 +323,19 @@ public int read(byte[] b, int off, int len) throws IOException throw new IOException("closed"); // some exceptions were generated... - if(!_exceptions.isEmpty()) + Throwable th = + MultipleExceptions.createIfExceptions("Exceptions while processing some input streams", + _exceptions); + if(th != null) { - Throwable exceptionToThrow = null; - for(Throwable throwable : _exceptions) + if(th instanceof IOException) + throw (IOException) th; + else { - if(exceptionToThrow == null) - exceptionToThrow = throwable; - else - log.warn("Multiple exception detected. This one is ignored", throwable); + throw new IOException(th); } - throw new IOException(exceptionToThrow); } + // nothing else to read... reach end of all streams! if(_multiplexedBuffer.position() == 0) return -1; diff --git a/utils/org.linkedin.glu.utils/src/test/groovy/test/utils/collections/TestGluGroovyCollectionUtils.groovy b/utils/org.linkedin.glu.utils/src/test/groovy/test/utils/collections/TestGluGroovyCollectionUtils.groovy index ac5844d3..92e65eee 100644 --- a/utils/org.linkedin.glu.utils/src/test/groovy/test/utils/collections/TestGluGroovyCollectionUtils.groovy +++ b/utils/org.linkedin.glu.utils/src/test/groovy/test/utils/collections/TestGluGroovyCollectionUtils.groovy @@ -32,4 +32,17 @@ public class TestGluGroovyCollectionUtils extends GroovyTestCase assertEquals([:], GluGroovyCollectionUtils.subMap([a: 1, b: 2, c: 3], [])) assertEquals([:], GluGroovyCollectionUtils.subMap([a: 1, b: 2, c: 3], null)) } + + public void testPaginate() + { + assertNull(GluGroovyCollectionUtils.paginate(null, 2)) + assertEquals([], GluGroovyCollectionUtils.paginate([], 2)) + assertEquals([1], GluGroovyCollectionUtils.paginate([1], 2)) + assertEquals([1, 2], GluGroovyCollectionUtils.paginate([1, 2], 2)) + assertEquals([1, 2], GluGroovyCollectionUtils.paginate([1, 2, 3], 2)) + assertEquals([3], GluGroovyCollectionUtils.paginate([1, 2, 3], 2, 1)) + assertEquals([2], GluGroovyCollectionUtils.paginate([1, 2, 3], 1, 1)) + assertEquals([3], GluGroovyCollectionUtils.paginate([1, 2, 3], 1, 2)) + assertEquals([], GluGroovyCollectionUtils.paginate([1, 2, 3], 1, 3)) + } } \ No newline at end of file