diff --git a/src/main/resources/web/js/console.js b/src/main/resources/web/js/console.js index 6eeeae7..865147c 100644 --- a/src/main/resources/web/js/console.js +++ b/src/main/resources/web/js/console.js @@ -29,6 +29,18 @@ jQuery(document).ready(function($) { else if (command == 'show twirl-template') { sendMessage('[command]\nshow twirl-template') } + else if (command.startsWith('show kafka-config ')) { + sendMessage('[command]\nshow kafka-config\n[name]\n'+command.split(" ").pop(2)+'\n') + } + else if (command.startsWith('show generator-config')) { + sendMessage('[command]\nshow generator-config\n[name]\n'+command.split(" ").pop(2)+'\n') + } + else if (command.startsWith('show avro-config')) { + sendMessage('[command]\nshow avro-config\n[name]\n'+command.split(" ").pop(2)+'\n') + } + else if (command.startsWith('show twirl-template')) { + sendMessage('[command]\nshow twirl-template\n[name]\n'+command.split(" ").pop(2)+'\n') + } else { term.echo("unknown command " + command); } @@ -143,7 +155,7 @@ function setupWebSocket(endpoint, name, term) { ws.onmessage = function(event) { console.log(event); data = event.data; - if (data.indexOf("-") != -1) + if (data.indexOf("|") != -1) term.echo(parseLs(event.data)) else term.echo(parseOkResponse(event.data)); diff --git a/src/main/scala/com/stratio/khermes/cluster/collector/CommandCollectorActor.scala b/src/main/scala/com/stratio/khermes/cluster/collector/CommandCollectorActor.scala index c376e72..8d789ab 100644 --- a/src/main/scala/com/stratio/khermes/cluster/collector/CommandCollectorActor.scala +++ b/src/main/scala/com/stratio/khermes/cluster/collector/CommandCollectorActor.scala @@ -33,7 +33,7 @@ import com.stratio.khermes.commons.constants.AppConstants import com.stratio.khermes.commons.implicits.AppImplicits._ import scala.concurrent.duration._ -import scala.util.Try +import scala.util.{Failure, Success, Try} class CommandCollectorActor extends ActorPublisher[CommandCollectorActor.Result] with ActorLogging { @@ -72,17 +72,17 @@ class CommandCollectorActor extends ActorPublisher[CommandCollectorActor.Result] case WSProtocolMessage(WsProtocolCommand.CreateAvroConfig, args) => createConfig(args, WsProtocolCommand.CreateAvroConfig, AppConstants.AvroConfigPath) - case WSProtocolMessage(WsProtocolCommand.ShowTwirlTemplate, _) => - showConfig(AppConstants.TwirlTemplatePath) + case WSProtocolMessage(WsProtocolCommand.ShowTwirlTemplate, args) => + showConfig(args, WsProtocolCommand.ShowTwirlTemplate, AppConstants.TwirlTemplatePath) - case WSProtocolMessage(WsProtocolCommand.ShowGeneratorConfig, _) => - showConfig(AppConstants.GeneratorConfigPath) + case WSProtocolMessage(WsProtocolCommand.ShowGeneratorConfig, args) => + showConfig(args, WsProtocolCommand.ShowGeneratorConfig, AppConstants.GeneratorConfigPath) - case WSProtocolMessage(WsProtocolCommand.ShowKafkaConfig, _) => - showConfig(AppConstants.KafkaConfigPath) + case WSProtocolMessage(WsProtocolCommand.ShowKafkaConfig, args) => + showConfig(args, WsProtocolCommand.ShowKafkaConfig, AppConstants.KafkaConfigPath) - case WSProtocolMessage(WsProtocolCommand.ShowAvroConfig, _) => - showConfig(AppConstants.AvroConfigPath) + case WSProtocolMessage(WsProtocolCommand.ShowAvroConfig, args) => + showConfig(args, WsProtocolCommand.ShowAvroConfig, AppConstants.AvroConfigPath) case result: NodeSupervisorActor.Result => collectResult(result) @@ -115,11 +115,11 @@ class CommandCollectorActor extends ActorPublisher[CommandCollectorActor.Result] val argsAvroConfigOption = args.get(WsProtocolCommand.ArgsAvroConfig) val nodeIds = args.get(WsProtocolCommand.ArgsNodeIds).map(value => value.split(" ")).toSeq.flatten - val twirlTemplate = configDAO.read(s"${AppConstants.TwirlTemplatePath}/${argsTwirlTemplate}") - val kafkaConfig = configDAO.read(s"${AppConstants.KafkaConfigPath}/${argsKafkaConfig}") - val generatorConfig = configDAO.read(s"${AppConstants.GeneratorConfigPath}/${argsGeneratorConfig}") + val twirlTemplate = configDAO.read(s"${AppConstants.TwirlTemplatePath}/$argsTwirlTemplate") + val kafkaConfig = configDAO.read(s"${AppConstants.KafkaConfigPath}/$argsKafkaConfig") + val generatorConfig = configDAO.read(s"${AppConstants.GeneratorConfigPath}/$argsGeneratorConfig") val avroConfig = argsAvroConfigOption.map( - argsAvroConfig => configDAO.read(s"${AppConstants.AvroConfigPath}/${argsAvroConfig}")) + argsAvroConfig => configDAO.read(s"${AppConstants.AvroConfigPath}/$argsAvroConfig")) mediator ! Publish("content", NodeSupervisorActor.Start(nodeIds, AppConfig(generatorConfig, kafkaConfig, twirlTemplate, avroConfig))) @@ -132,19 +132,31 @@ class CommandCollectorActor extends ActorPublisher[CommandCollectorActor.Result] self ! Result("OK", s"Sending Stop signal to nodes ${nodeIds.mkString(" ")}") } - def createConfig(args: Map[String,String], protocolCommand: WsProtocolCommandValue, basePath: String): Unit = { + def createConfig(args: Map[String, String], protocolCommand: WsProtocolCommandValue, basePath: String): Unit = { val name = args.get(WsProtocolCommand.ArgsName).getOrElse( throw new IllegalArgumentException(s"Not found name for ${protocolCommand.toString}")) val content = args.get(WsProtocolCommand.ArgsContent).getOrElse( throw new IllegalArgumentException(s"not found content for ${protocolCommand.toString}")) - configDAO.create(s"${basePath}/${name}", content) - self ! Result("OK", s"Created node in ZK: ${basePath}/${name}") + configDAO.create(s"$basePath/$name", content) + self ! Result("OK", s"Created node in ZK: $basePath/$name") } - def showConfig(basePath: String): Unit ={ - val list = configDAO.list(s"$basePath") - self ! Result(s"OK \n$list", s"Show config of $basePath") + def showConfig(args: Map[String, String], protocolCommand: WsProtocolCommandValue, basePath: String): Unit = { + val name = args.getOrElse(WsProtocolCommand.ArgsName, "") + if (name == "") { + val list = Try(configDAO.list(s"$basePath")) match { + case Success(list) => s"OK \n$list" + case Failure(e) => s"There is none $basePath stored." + } + self ! Result(s"$list", s"Show config of $basePath") + } else { + val read = Try(configDAO.read(s"$basePath/$name")) match { + case Success(config) => s"OK \n$config" + case Failure(e) => s"$name config does not exist." + } + self ! Result(s"$read", s"Show config of $basePath/$name") + } } def checkCommandHasEnd(): Unit = { @@ -159,7 +171,7 @@ class CommandCollectorActor extends ActorPublisher[CommandCollectorActor.Result] } def performOnNext(message: CommandCollectorActor.Result): Unit = { - if(totalDemand > 0 && isActive) { + if (totalDemand > 0 && isActive) { onNext(message) } } @@ -169,7 +181,10 @@ class CommandCollectorActor extends ActorPublisher[CommandCollectorActor.Result] } object CommandCollectorActor { + case object CheckCommandHasEnd + case class Result(value: String) + def props: Props = Props[CommandCollectorActor] }