Skip to content

Commit

Permalink
#166: added interrupt handling
Browse files Browse the repository at this point in the history
  • Loading branch information
ypujante committed Nov 8, 2012
1 parent ecb8396 commit b722a82
Show file tree
Hide file tree
Showing 17 changed files with 454 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,13 @@ public interface Agent
*/
def streamCommandResults(def args) throws NoSuchCommandException, AgentException

/**
* Interrupts the command.
*
* @param args.id the id of the command to interrupt
* @return <code>true</code> if the command was interrupted properly or <code>false</code> if
* there was no such command or already completed
*/
boolean interruptCommand(args) throws AgentException

/********************************************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public class CommandManagerImpl implements CommandManager

try
{
def res = commandExecution.waitForCompletion(args.timeout)
def res = commandExecution.getExitValue(args.timeout)
commandExecution.log.info("waitForCommand(${GluGroovyCollectionUtils.xorMap(args, ['id'])}): ${res}")
return res
}
Expand Down Expand Up @@ -173,7 +173,7 @@ public class CommandManagerImpl implements CommandManager
if(commandExecution)
{
res = commandExecution.interruptExecution()
commandExecution.log.info("interruptCommand(${GluGroovyCollectionUtils.xorMap(args, ['id'])})")
commandExecution.log.info("interruptCommand(${GluGroovyCollectionUtils.xorMap(args, ['id'])}): ${res}")
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService
import org.linkedin.glu.groovy.utils.concurrent.FutureTaskExecution
import org.slf4j.Logger
import org.linkedin.groovy.util.config.Config
import java.util.concurrent.TimeoutException

/**
* @author yan@pongasoft.com */
Expand Down Expand Up @@ -78,15 +79,18 @@ public class CommandExecution<T>
completionTime > 0
}

def waitForCompletion(def timeout)
void waitForCompletion(def timeout)
{
try
if(!isCompleted())
{
futureExecution.get(timeout)
}
catch(ExecutionException e)
{
throw e.cause
try
{
futureExecution.get(timeout)
}
catch(ExecutionException e)
{
// ok we just want to wait until the command completes but no more than the timeout
}
}
}

Expand All @@ -98,25 +102,67 @@ public class CommandExecution<T>
def getExitValue()
{
if(isCompleted())
{
try
{
futureExecution.get()
}
catch(ExecutionException e)
{
throw e.cause
}
}
else
return null
}

def getExitValue(timeout)
{
try
{
futureExecution.get()
futureExecution.get(timeout)
}
catch(ExecutionException e)
{
throw e.cause
}
}

def getExitValue(timeout)
/**
* Completion value either return the result of the call if succeeded or the exception
* if an exception was thrown. Does not throw an exception! Does not wait!
*/
def getCompletionValue()
{
if(isCompleted())
{
try
{
futureExecution.get()
}
catch(ExecutionException e)
{
e.cause
}
}
else
return null
}

/**
* Completion value either return the result of the call if succeeded or the exception
* if an exception was thrown. Throws only the <code>TimeoutException</code> if cannot get
* a result in the timeout provided
*/
def getCompletionValue(timeout) throws TimeoutException
{
try
{
futureExecution.get(timeout)
}
catch(ExecutionException e)
{
throw e.cause
e.cause
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,14 @@ interface AgentsService
args,
Closure commandResultProcessor)

/**
* Interrupts the command.
*
* @param args.id the id of the command to interrupt
* @return <code>true</code> if the command was interrupted properly or <code>false</code> if
* there was no such command or already completed
*/
boolean interruptCommand(Fabric fabric,
String agentName,
args)
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,14 @@ class AgentsServiceImpl implements AgentsService, AgentURIProvider
}
}

@Override
boolean interruptCommand(Fabric fabric, String agentName, def args)
{
withRemoteAgent(fabric, agentName) { Agent agent ->
agent.interruptCommand(args)
} as boolean
}

/**
* Create the system entry for the given agent and mountpoint.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,11 @@ public class AuditedAgentsService implements AgentsService
res?.id?.toString())
}
}

@Override
boolean interruptCommand(Fabric fabric, String agentName, def args)
{
auditLogService.audit('agent.interruptCommand', "${agentName} / ${fabric.name} / ${args.id}")
agentsService.interruptCommand(fabric, agentName, args)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ public interface CommandExecutionStorage
Long stderrTotalBytesCount,
String exitValue)

DbCommandExecution endExecution(String commandId,
long endTime,
byte[] stdoutFirstBytes,
Long stdoutTotalBytesCount,
byte[] stderrFirstBytes,
Long stderrTotalBytesCount,
Throwable exception)

DbCommandExecution findCommandExecution(String fabric, String commandId)

Map findCommandExecutions(String fabric, String agent, def params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.linkedin.util.annotations.Initializable
import org.linkedin.glu.groovy.utils.collections.GluGroovyCollectionUtils
import org.linkedin.util.lang.MemorySize
import org.linkedin.glu.utils.io.LimitedOutputStream

/**
* @author yan@pongasoft.com */
Expand All @@ -32,6 +34,9 @@ public class CommandExecutionStorageImpl implements CommandExecutionStorage
@Initializable
int maxResults = 25

@Initializable
MemorySize stackTraceMaxSize = MemorySize.parse('255')

@Override
DbCommandExecution startExecution(String fabric,
String agent,
Expand Down Expand Up @@ -71,6 +76,25 @@ public class CommandExecutionStorageImpl implements CommandExecutionStorage
byte[] stderrFirstBytes,
Long stderrTotalBytesCount,
String exitValue)
{
endExecution(commandId,
endTime,
stdoutFirstBytes,
stdoutTotalBytesCount,
stderrFirstBytes,
stderrTotalBytesCount,
exitValue,
false)
}

DbCommandExecution endExecution(String commandId,
long endTime,
byte[] stdoutFirstBytes,
Long stdoutTotalBytesCount,
byte[] stderrFirstBytes,
Long stderrTotalBytesCount,
String exitValue,
boolean isException)
{
DbCommandExecution.withTransaction {
DbCommandExecution execution = DbCommandExecution.findByCommandId(commandId)
Expand All @@ -86,6 +110,7 @@ public class CommandExecutionStorageImpl implements CommandExecutionStorage
execution.stderrFirstBytes = stderrFirstBytes
execution.stderrTotalBytesCount = stderrTotalBytesCount
execution.exitValue = exitValue
execution.isException = isException

if(!execution.save())
{
Expand All @@ -96,6 +121,29 @@ public class CommandExecutionStorageImpl implements CommandExecutionStorage
}
}

@Override
DbCommandExecution endExecution(String commandId,
long endTime,
byte[] stdoutFirstBytes,
Long stdoutTotalBytesCount,
byte[] stderrFirstBytes,
Long stderrTotalBytesCount,
Throwable exception)
{
def baos = new ByteArrayOutputStream()

def os = new PrintStream(new LimitedOutputStream(baos, stackTraceMaxSize))
os.withStream { exception.printStackTrace(it) }
endExecution(commandId,
endTime,
stdoutFirstBytes,
stdoutTotalBytesCount,
stderrFirstBytes,
stderrTotalBytesCount,
new String(baos.toByteArray()),
true)
}

@Override
DbCommandExecution findCommandExecution(String fabric, String commandId)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,15 @@ public interface CommandsService
DbCommandExecution findCommandExecution(Fabric fabric, String commandId)

Map findCommandExecutions(Fabric fabric, String agentName, def params)

/**
* Interrupts the command.
*
* @param args.id the id of the command to interrupt
* @return <code>true</code> if the command was interrupted properly or <code>false</code> if
* there was no such command or already completed
*/
boolean interruptCommand(Fabric fabric,
String agentName,
String commandId)
}
Loading

0 comments on commit b722a82

Please sign in to comment.