Skip to content

Commit

Permalink
pongasoft#166: refactoring to share code console/agent
Browse files Browse the repository at this point in the history
  • Loading branch information
ypujante committed Oct 31, 2012
1 parent 1e49c13 commit c49d303
Show file tree
Hide file tree
Showing 29 changed files with 356 additions and 237 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -612,30 +612,6 @@ def interface Shell
OutputStream stdout,
OutputStream stderr)

/**
* Runs the closure asynchronously. This call returns right away and does not wait for the closure
* to complete execution. Use the returned value to wait on the completion if necessary. Note
* that this call should be used carefully and the typical use case is to parallelize a couple of
* I/O intensive operations. But you should "wait" before the end of the phase. Example:
*
* def install = {
* def f1 = shell.async {
* skeleton = shell.fetch(params.skeleton)
* }
* def f2 = shell.async {
* war = shell.fetch(params.war)
* }
*
* // wait for both operation to complete!
* f1.get()
* f2.get()
* }
*
* @param closure
* @return the future
*/
FutureExecution async(Closure closure)

/**
* Shortcut/More efficient implementation of the more generic {@link #chmod(Object, Object)} call
*
Expand Down
1 change: 1 addition & 0 deletions agent/org.linkedin.glu.agent-impl/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ apply plugin: 'org.linkedin.release'
dependencies {
compile project(':agent:org.linkedin.glu.agent-api')
compile project(':agent:org.linkedin.glu.agent-rest-common')
compile project(':commands:org.linkedin.glu.commands-impl')
compile project(':utils:org.linkedin.glu.utils')
compile spec.external.linkedinUtilsGroovy
compile spec.external.linkedinZookeeperImpl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ import java.util.concurrent.ExecutionException
import org.linkedin.glu.agent.impl.command.CommandManager
import org.linkedin.glu.agent.impl.command.CommandManagerImpl
import org.linkedin.glu.agent.api.NoSuchCommandException
import org.linkedin.glu.agent.impl.command.CommandNode
import org.linkedin.glu.agent.impl.command.MemoryCommandExecutionIOStorage
import org.linkedin.glu.commands.impl.MemoryCommandExecutionIOStorage
import org.linkedin.glu.commands.impl.CommandExecution

/**
* The main implementation of the agent
Expand Down Expand Up @@ -91,7 +91,7 @@ def class AgentImpl implements Agent, AgentContext, Shutdownable
_scriptManager = args.scriptManager ?: new ScriptManagerImpl(agentContext: this)
_commandManager = args.commandManager ?:
new CommandManagerImpl(agentContext: this,
storage: new MemoryCommandExecutionIOStorage(agentContext: this))
storage: new MemoryCommandExecutionIOStorage(clock: clock))
def storage = args.storage
if(storage != null)
_scriptManager = new StateKeeperScriptManager(scriptManager: _scriptManager,
Expand Down Expand Up @@ -516,17 +516,17 @@ def class AgentImpl implements Agent, AgentContext, Shutdownable
if(log.isDebugEnabled())
log.debug("streamCommandResults(${args})")

def res = _commandManager.findCommandNodeAndStreams(args)
def res = _commandManager.findCommandExecutionAndStreams(args)
if(res == null)
throw new NoSuchCommandException(args.id)

CommandNode commandNode = res.remove('commandNode')
CommandExecution commandExecution = res.remove('commandExecution')

if(commandNode.startTime > 0)
res.startTime = commandNode.startTime
if(commandExecution.startTime > 0)
res.startTime = commandExecution.startTime

if(commandNode.isCompleted())
res.completionTime = commandNode.completionTime
if(commandExecution.isCompleted())
res.completionTime = commandExecution.completionTime

return res
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ import org.linkedin.util.io.resource.Resource
import javax.management.Attribute
import org.linkedin.glu.agent.impl.storage.AgentProperties
import org.linkedin.util.url.QueryBuilder
import org.linkedin.glu.agent.impl.script.FutureExecutionImpl
import org.linkedin.glu.agent.api.FutureExecution
import org.linkedin.groovy.util.rest.RestException
import org.linkedin.util.reflect.ReflectUtils
import org.linkedin.glu.agent.rest.common.AgentRestUtils
Expand Down Expand Up @@ -626,33 +624,6 @@ def class ShellImpl implements Shell
return originalException
}

/**
* Runs the closure asynchronously. This call returns right away and does not wait for the closure
* to complete execution. Use the returned value to wait on the completion if necessary.
*
* @param closure
* @return the future
*/
FutureExecution async(Closure closure)
{
FutureExecutionImpl future = new FutureExecutionImpl(closure)

future.startTime = clock.currentTimeMillis()

Thread.startDaemon { ->
try
{
future.run()
}
finally
{
future.completionTime = clock.currentTimeMillis()
}
}

return future
}

/**
* Shortcut/More efficient implementation of the more generic {@link #chmod(Object, Object) call
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
package org.linkedin.glu.agent.impl.command

import org.linkedin.glu.agent.api.AgentException
import org.linkedin.glu.commands.impl.CommandExecution

/**
* @author yan@pongasoft.com */
public interface CommandManager
{
/**
* TODO HIGH YP
* {@see org.linkedin.glu.agent.api.Agent#executeShellCommand} for a description of the arguments
*/
CommandNode executeShellCommand(def args) throws AgentException
CommandExecution executeShellCommand(def args) throws AgentException

/**
* @param args.exitValue if you want the exit value to be part of the stream
Expand All @@ -52,15 +53,18 @@ public interface CommandManager
* <code>0</code> by default)
* @param args.stderrLen how many bytes to read maximum (optional, <code>int</code>,
* <code>-1</code> by default which means read all)
* @return a map with <code>commandNode</code> and <code>stream</code> or <code>null</code> if
* @return a map with <code>commandExecution</code> and <code>stream</code> or <code>null</code> if
* not found
*/
def findCommandNodeAndStreams(def args) throws AgentException
def findCommandExecutionAndStreams(def args) throws AgentException

/**
* TODO HIGH YP
* {@see org.linkedin.glu.agent.api.Agent#waitForCommand} for a description of the arguments
*/
def waitForCommand(def args) throws AgentException

/**
* {@see org.linkedin.glu.agent.api.Agent#interruptCommand} for a description of the arguments
*/
boolean interruptCommand(def args) throws AgentException
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.linkedin.glu.agent.impl.command

import org.linkedin.glu.agent.impl.script.AgentContext
import org.linkedin.glu.agent.impl.script.CallExecution
import org.linkedin.glu.groovy.utils.concurrent.CallExecution
import org.linkedin.glu.groovy.utils.collections.GluGroovyCollectionUtils
import org.slf4j.Logger
import org.slf4j.LoggerFactory
Expand All @@ -27,10 +27,14 @@ import java.util.concurrent.Executors
import org.linkedin.util.annotations.Initializable
import org.linkedin.glu.agent.api.NoSuchCommandException
import org.linkedin.glu.agent.api.AgentException
import org.linkedin.glu.agent.impl.concurrent.FutureTaskExecution
import org.linkedin.glu.groovy.utils.concurrent.FutureTaskExecution
import org.apache.commons.io.input.TeeInputStream
import java.util.concurrent.TimeoutException
import org.linkedin.groovy.util.config.Config
import org.linkedin.glu.commands.impl.CommandStreamStorage
import org.linkedin.glu.commands.impl.CommandExecutionIOStorage
import org.linkedin.glu.commands.impl.CommandExecution
import org.linkedin.glu.commands.impl.GluCommandFactory

/**
* @author yan@pongasoft.com */
Expand All @@ -48,19 +52,24 @@ public class CommandManagerImpl implements CommandManager
@Initializable(required = true)
CommandExecutionIOStorage storage

private final Map<String, CommandNode> _commands = [:]
private final Map<String, CommandExecution> _commands = [:]

/**
void setStorage(CommandExecutionIOStorage storage)
{
storage.gluCommandFactory = createGluCommand as GluCommandFactory
this.storage = storage
}
/**
* {@inheritdoc}
*/
@Override
CommandNode executeShellCommand(def args)
CommandExecution executeShellCommand(def args)
{
args = GluGroovyCollectionUtils.subMap(args, ['command', 'redirectStderr', 'stdin'])

args.type = 'shell'

CommandNodeWithStorage command = storage.createStorageForCommand(args)
CommandExecution command = storage.createStorageForCommandExecution(args)

def asyncProcessing = {
command.captureIO { CommandStreamStorage storage ->
Expand Down Expand Up @@ -129,30 +138,30 @@ public class CommandManagerImpl implements CommandManager
@Override
def waitForCommand(def args) throws AgentException
{
CommandNode node = getCommand(args.id)
CommandExecution commandExecution = getCommand(args.id)

try
{
def res = node.waitForCompletion(args.timeout)
node.log.info("waitForCommand(${GluGroovyCollectionUtils.xorMap(args, ['id'])}): ${res}")
def res = commandExecution.waitForCompletion(args.timeout)
commandExecution.log.info("waitForCommand(${GluGroovyCollectionUtils.xorMap(args, ['id'])}): ${res}")
return res
}
catch(TimeoutException e)
{
node.log.info("waitForCommand(${GluGroovyCollectionUtils.xorMap(args, ['id'])}): <timeout>")
commandExecution.log.info("waitForCommand(${GluGroovyCollectionUtils.xorMap(args, ['id'])}): <timeout>")
throw e
}
}

@Override
def findCommandNodeAndStreams(def args)
def findCommandExecutionAndStreams(def args)
{
CommandNode node = findCommand(args.id)
if(node)
node.log.info("findCommandNodeAndStreams(${GluGroovyCollectionUtils.xorMap(args, ['id'])})")
CommandExecution commandExecution = findCommand(args.id)
if(commandExecution)
commandExecution.log.info("findCommandExecutionAndStreams(${GluGroovyCollectionUtils.xorMap(args, ['id'])})")
else
log.info("findCommandNodeAndStreams(${args})): not found")
return storage.findCommandNodeAndStreams(args.id, args)
log.info("findCommandExecutionAndStreams(${args})): not found")
return storage.findCommandExecutionAndStreams(args.id, args)
}

/**
Expand All @@ -163,11 +172,11 @@ public class CommandManagerImpl implements CommandManager
{
boolean res = false

def node = findCommand(args.id)
if(node)
def commandExecution = findCommand(args.id)
if(commandExecution)
{
res = node.interruptExecution()
node.log.info("interruptCommand(${GluGroovyCollectionUtils.xorMap(args, ['id'])})")
res = commandExecution.interruptExecution()
commandExecution.log.info("interruptCommand(${GluGroovyCollectionUtils.xorMap(args, ['id'])})")
}
else
{
Expand All @@ -177,28 +186,54 @@ public class CommandManagerImpl implements CommandManager
return res
}

CommandNode findCommand(String id)
CommandExecution findCommand(String id)
{
CommandNode commandNode
CommandExecution commandExecution

// first we look to see if the command is still currently running
synchronized(_commands)
{
commandNode = _commands[id]
commandExecution = _commands[id]
}

// not found... should be completed => look in storage
if(!commandNode)
commandNode = storage.findCommandNode(id)
if(!commandExecution)
commandExecution = storage.findCommandExecution(id)

return commandNode
return commandExecution
}

CommandNode getCommand(String id)
CommandExecution getCommand(String id)
{
CommandNode command = findCommand(id)
CommandExecution command = findCommand(id)
if(!command)
throw new NoSuchCommandException(id)
return command
}

/**
* Create the correct glu command
*/
def createGluCommand = {String commandId, def args ->
if(args.type != "shell")
throw new UnsupportedOperationException("cannot create non shell commands")

def shellCommand = new ShellGluCommand()

def commandProperties = [:]

def log = LoggerFactory.getLogger("org.linkedin.glu.agent.command.${commandId}")

commandProperties.putAll(
[
getId: { commandId },
getShell: { agentContext.shellForCommands },
getLog: { log },
getSelf: { findCommand(commandId) },
])

return agentContext.mop.wrapScript(script: shellCommand,
scriptProperties: commandProperties)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,17 @@
* the License.
*/



package org.linkedin.glu.agent.impl.command
package org.linkedin.glu.agent.impl.script

/**
* This class encapsulates the storage of IO for commands, this way it is easy to "disable" the
* storage.
* Simple adapter to "bridge" the 2 apis coming from 2 different packages
*
* @author yan@pongasoft.com */
public interface CommandExecutionIOStorage
public class FutureExecutionAdapter<K> implements
org.linkedin.glu.groovy.utils.concurrent.FutureExecution<K>,
org.linkedin.glu.agent.api.FutureExecution<K>
{
def findCommandNodeAndStreams(String commandId, def args)

/**
* @return the command node of a previously run/stored command
*/
CommandNode findCommandNode(String commandId)

CommandNodeWithStorage createStorageForCommand(def args)
}
// YP Implementation note: using FutureExecution does not work with the compile (don't ask
// me why!)
@Delegate org.linkedin.glu.groovy.utils.concurrent.FutureTaskExecution<K> futureExecution
}
Loading

0 comments on commit c49d303

Please sign in to comment.