Skip to content

Commit

Permalink
#166: added filesystem storage for commands in agent
Browse files Browse the repository at this point in the history
  • Loading branch information
ypujante committed Nov 7, 2012
1 parent 98199cd commit cb4b76b
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ import org.linkedin.glu.agent.rest.resources.CommandStreamsResource
import org.linkedin.glu.agent.impl.command.CommandManager
import org.linkedin.glu.utils.core.DisabledFeatureProxy
import org.linkedin.glu.agent.impl.command.CommandManagerImpl
import org.linkedin.glu.commands.impl.MemoryCommandExecutionIOStorage
import org.linkedin.glu.commands.impl.FileSystemCommandExecutionIOStorage

/**
* This is the main class to start the agent.
Expand Down Expand Up @@ -439,7 +439,7 @@ class AgentMain implements LifecycleListener, Configurable
[
rootShell: rootShell,
shellForScripts: createShell(rootShell, "${prefix}.agent.scriptRootDir"),
commandManager: createCommandsManager(),
commandManager: createCommandsManager(rootShell),
agentLogDir: rootShell.toResource(Config.getRequiredString(_config, "${prefix}.agent.logDir")),
storage: _storage,
sigar: _sigar,
Expand Down Expand Up @@ -617,12 +617,35 @@ class AgentMain implements LifecycleListener, Configurable
agentProperties: _agentProperties)
}

protected CommandManager createCommandsManager()
protected CommandManager createCommandsManager(ShellImpl rootShell)
{
if(Config.getOptionalBoolean(_config, "${prefix}.agent.features.commands.enabled", true))
{
log.info "Feature [commands] => [enabled]"

def storageType = Config.getOptionalString(_config,
"${prefix}.agent.commands.storageType",
"filesystem")

def storage

switch(storageType)
{
case "filesystem":
def commandsDir = Config.getRequiredString(_config,
"${prefix}.agent.commands.filesystem.dir")
def filesystem = rootShell.fileSystem.newFileSystem(commandsDir)
storage = new FileSystemCommandExecutionIOStorage(commandExecutionFileSystem: filesystem)
break

default:
throw new IllegalArgumentException("unsupported storageType [${storageType}]")
}

storage.clock = _agent.clock

new CommandManagerImpl(agentContext: _agent,
storage: new MemoryCommandExecutionIOStorage(clock: _agent.clock))
storage: storage)
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ glu.agent.homeDir=${fileSystem.toResource("/agent/server/home").file.canonicalPa
glu.agent.scriptRootDir=\${glu.agent.apps}
glu.agent.dataDir=\${glu.agent.homeDir}/data
glu.agent.commandsDir=\${glu.agent.homeDir}/commands
glu.agent.logDir=\${glu.agent.dataDir}/logs
glu.agent.tempDir=\${glu.agent.dataDir}/tmp
glu.agent.scriptStateDir=\${glu.agent.dataDir}/scripts/state
Expand All @@ -74,6 +75,10 @@ glu.agent.port=${agentPort}
glu.agent.zkConnectString=localhost:${zkClientPort}
glu.agent.zookeeper.root=/org/glu
glu.agent.features.commands.enabled=true
glu.agent.commands.storageType=filesystem
glu.agent.commands.filesystem.dir=\${glu.agent.dataDir}/commands
# security
glu.agent.sslEnabled=true
glu.agent.keystorePath=${devKeysDir.canonicalPath}/agent.keystore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ glu.agent.scriptRootDir=${glu.agent.apps}
# the directory where the data (non version specific) is stored
glu.agent.dataDir=${glu.agent.homeDir}/data

# the directory where the data (non version specific) is stored
# the directory where the logs (non version specific) is stored
glu.agent.logDir=${glu.agent.dataDir}/logs

# This is the temporary directory for the agent
Expand All @@ -37,6 +37,18 @@ glu.agent.rest.nonSecure.port=12907
# this file will store the properties used so that they become 'default' values for the next run
glu.agent.persistent.properties=${glu.agent.dataDir}/config/agent.properties

################################
# Commands configuration

# is commands feature enabled?
glu.agent.features.commands.enabled=true

# currently 1 value: filesystem
glu.agent.commands.storageType=filesystem

# the directory where the commands (non version specific) are stored (if filesystem) (if undefined => use temp folder)
glu.agent.commands.filesystem.dir=${glu.agent.dataDir}/commands

################################
# ZooKeeper configuration:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import org.linkedin.groovy.util.config.Config

/**
* @author yan@pongasoft.com */
public class CommandExecution
public class CommandExecution<T>
{
private final String _commandId
private final def _args
def command
T command

synchronized long startTime = 0L
synchronized long completionTime = 0L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,15 @@ public class CommandsServiceImpl implements CommandsService

/**
* The commands that are currently executing */
private final Map<String, CommandExecution> _currentCommandExecutions = [:]
private final Map<String, CommandExecution<DbCommandExecution>> _currentCommandExecutions = [:]

@Override
Map<String, DbCommandExecution> findCurrentCommandExecutions(Collection<String> commandIds)
{
synchronized(_currentCommandExecutions)
{
GluGroovyCollectionUtils.subMap(_currentCommandExecutions, commandIds)
def map = GluGroovyCollectionUtils.subMap(_currentCommandExecutions, commandIds)
GluGroovyCollectionUtils.collectKey(map, [:]) { k, v -> v.command }
}
}

Expand All @@ -164,7 +165,7 @@ public class CommandsServiceImpl implements CommandsService

synchronized(_currentCommandExecutions)
{
commandExecution = _currentCommandExecutions[commandId]
commandExecution = _currentCommandExecutions[commandId]?.command
if(commandExecution?.fabric != fabric.name)
commandExecution = null
}
Expand All @@ -184,7 +185,7 @@ public class CommandsServiceImpl implements CommandsService
{
// replace db by current running
map.commandExecutions = map.commandExecutions?.collect { DbCommandExecution ce ->
def current = _currentCommandExecutions[ce.commandId]
def current = _currentCommandExecutions[ce.commandId]?.command
if(current)
return current
else
Expand Down

0 comments on commit cb4b76b

Please sign in to comment.