diff --git a/tendrl/commons/jobs/__init__.py b/tendrl/commons/jobs/__init__.py index cfe8b1dd..24770666 100644 --- a/tendrl/commons/jobs/__init__.py +++ b/tendrl/commons/jobs/__init__.py @@ -39,7 +39,8 @@ def run(self): _job_sync_interval = 5 NS.node_context = NS.node_context.load() NS.tendrl_context = NS.tendrl_context.load() - if "tendrl/monitor" not in NS.node_context.tags: + if "tendrl/monitor" not in NS.node_context.tags and \ + "tendrl/integration/monitoring" not in NS.node_context.tags: if NS.tendrl_context.integration_id is None or \ NS.node_context.fqdn is None: time.sleep(_job_sync_interval) @@ -144,7 +145,6 @@ def process_job(jid): _now_plus_10_epoch = (_now_plus_10 - _epoch_start).total_seconds() - time.sleep(7) job = job.load() if job.status == "new": # To avoid server and storage node do save same time @@ -209,7 +209,7 @@ def process_job(jid): obj_name, flow_name) else: runnable_flow = current_ns.ns.get_flow(flow_name) - + time.sleep(3) job = job.load() lock_info = dict(node_id=NS.node_context.node_id, fqdn=NS.node_context.fqdn, diff --git a/tendrl/commons/objects/cluster/atoms/import_cluster/__init__.py b/tendrl/commons/objects/cluster/atoms/import_cluster/__init__.py index 53b1dc14..103ed7fb 100644 --- a/tendrl/commons/objects/cluster/atoms/import_cluster/__init__.py +++ b/tendrl/commons/objects/cluster/atoms/import_cluster/__init__.py @@ -74,6 +74,7 @@ def run(self): # create same flow for each node in node list except # $this payload = {"tags": ["tendrl/node_%s" % node], + "node_id": node, "run": "tendrl.flows.ImportCluster", "status": "new", "parameters": new_params, @@ -115,7 +116,8 @@ def run(self): logger.log( "error", NS.publisher_id, - {"message": "Failed to detect underlying cluster version"}, + {"message": "Failed to detect underlying cluster " + "version. Error: %s" % err}, job_id=self.parameters['job_id'], flow_id=self.parameters['flow_id'] ) @@ -220,15 +222,6 @@ def run(self): job_id=self.parameters['job_id'] ).load() if loop_count >= wait_count: - logger.log( - "error", - NS.publisher_id, - {"message": "Import jobs on cluster(%s) not yet " - "complete on all nodes(%s). Timing out." % - (_cluster.short_name, str(node_list))}, - job_id=self.parameters['job_id'], - flow_id=self.parameters['flow_id'] - ) # Marking child jobs as failed which did not complete # as the parent job has timed out. This has to be done # explicitly because these jobs will still be processed @@ -239,10 +232,41 @@ def run(self): job_id=child_job_id ).load() if child_job.status not in ["finished", "failed"]: + if child_job.status in ["new", ""]: + node_id = child_job.payload.get( + "node_id", "" + ) + node_obj = NS.tendrl.objects.NodeContext( + node_id = node_id + ).load() + logger.log( + "error", + NS.publisher_id, + {"message": "Import child job %s is " + "not yet picked by %s, Either node is" + " down or tendrl-node-agent service " + "is not running" % ( + child_job.job_id, + node_obj.fqdn + )}, + job_id=self.parameters['job_id'], + flow_id=self.parameters['flow_id'] + ) child_job.status = "failed" child_job.save() + + logger.log( + "error", + NS.publisher_id, + {"message": "Import jobs on cluster(%s) not yet " + "complete on all nodes(%s). Timing out." % + (_cluster.short_name, str(node_list))}, + job_id=self.parameters['job_id'], + flow_id=self.parameters['flow_id'] + ) + return False - time.sleep(10) + time.sleep(5) completed = True for child_job_id in parent_job.children: child_job = NS.tendrl.objects.Job( diff --git a/tendrl/commons/objects/cluster/atoms/setup_cluster_alias/__init__.py b/tendrl/commons/objects/cluster/atoms/setup_cluster_alias/__init__.py index 2bbdf70e..dacc2f02 100644 --- a/tendrl/commons/objects/cluster/atoms/setup_cluster_alias/__init__.py +++ b/tendrl/commons/objects/cluster/atoms/setup_cluster_alias/__init__.py @@ -35,14 +35,24 @@ def run(self): wait_count = 24 while True: child_job_failed = False + job = Job(job_id=_job_id).load() if loop_count >= wait_count: + if job.status in ["new", ""]: + msg = ("Child job %s for setting up cluster alias not yet " + "picked up by the server, Service " + "tendrl-monitoring-integration may be down. " + "Timing out. (%s)" % ( + job.job_id, integration_id + )) + else: + msg = ("Child job %s for setting up cluster alias not yet" + "compeleted by the server. Timing out. (%s)" % ( + job.job_id, integration_id)) logger.log( "error", NS.publisher_id, { - "message": "Setting up cluster alias" - "not yet complete. Timing out. (%s)" % - integration_id + "message": msg }, job_id=self.parameters['job_id'], flow_id=self.parameters['flow_id'], @@ -50,7 +60,6 @@ def run(self): return False time.sleep(5) finished = True - job = Job(job_id=_job_id).load() if job.status not in ["finished", "failed"]: finished = False elif job.status == "failed": diff --git a/tendrl/commons/objects/definition/master.yaml b/tendrl/commons/objects/definition/master.yaml index d762d20c..ab707557 100644 --- a/tendrl/commons/objects/definition/master.yaml +++ b/tendrl/commons/objects/definition/master.yaml @@ -68,6 +68,7 @@ namespace.tendrl: optional: - Cluster.short_name pre_run: + - tendrl.objects.Node.atoms.CheckServiceStatus - tendrl.objects.Cluster.atoms.CheckClusterNodesUp - tendrl.objects.Node.atoms.IsNodeTendrlManaged - tendrl.objects.Cluster.atoms.ValidImportClusterParams @@ -83,6 +84,7 @@ namespace.tendrl: UnmanageCluster: tags: - "tendrl/monitor" + - "tendrl/integration/monitoring" atoms: - tendrl.objects.Cluster.atoms.SetClusterUnmanaged - tendrl.objects.Cluster.atoms.StopMonitoringServices @@ -97,6 +99,7 @@ namespace.tendrl: optional: - Cluster.delete_telemetry_data pre_run: + - tendrl.objects.Node.atoms.CheckServiceStatus - tendrl.objects.Cluster.atoms.CheckClusterNodesUp post_run: - tendrl.objects.Cluster.atoms.IsClusterImportReady @@ -378,12 +381,16 @@ namespace.tendrl: type: Create uuid: eda0b13a-7362-48d5-b5ca-4b6d6533a5ab attrs: + node_id: + type: String running: type: String exists: type: String service: type: String + error: + type: List enabled: true list: nodes/$NodeContext.node_id/Services help: "Service" @@ -938,6 +945,13 @@ namespace.tendrl: run: tendrl.objects.Node.atoms.Cmd type: Create uuid: dc8fff3a-34d9-4786-9282-55eff6abb6c3 + CheckServiceStatus: + help: Check all necessary services are running to import a cluster + enabled: true + run: tendrl.objects.Node.atoms.CheckServiceStatus + type: check + uuid: aec1d8b6-0689-4b14-a7cb-d085a0e1d10c + version: 1 attrs: node_id: type: String diff --git a/tendrl/commons/objects/node/atoms/check_service_status/__init__.py b/tendrl/commons/objects/node/atoms/check_service_status/__init__.py new file mode 100644 index 00000000..800bff69 --- /dev/null +++ b/tendrl/commons/objects/node/atoms/check_service_status/__init__.py @@ -0,0 +1,142 @@ +import etcd + +from tendrl.commons import objects +from tendrl.commons.objects import AtomExecutionFailedError +from tendrl.commons.utils import etcd_utils +from tendrl.commons.utils import log_utils as logger + + +TENDRL_SERVICES = { + "server": [ + "tendrl-node-agent", + "tendrl-monitoring-integration", + "tendrl-api", + ], + "storage_node": [ + "tendrl-node-agent", + "glusterd", + ] +} + + +class CheckServiceStatus(objects.BaseAtom): + def __init__(self, *args, **kwargs): + super(CheckServiceStatus, self).__init__(*args, **kwargs) + + def run(self): + node_context = NS.tendrl.objects.NodeContext().load() + tags = list(node_context.tags) + service_status = True + if "tendrl/monitor" in tags or \ + "tendrl/integration/monitoring" in tags: + # check neccessary service status in server + for service_name in TENDRL_SERVICES["server"]: + service = get_service_status( + service_name + ) + if not service.running: + if len(service.error) > 0: + msg = ("Unable to find status of the service %s " + "on server-node. Error: %s" % ( + service_name, + service.error + )) + else: + msg = ("Service %s is not running on a server-node, " + "Please start it manually or check the log " + "file to figure out the exact problem" % service_name) + logger.log( + "error", + NS.get("publisher_id", None), + { + "message": msg + }, + job_id=self.parameters['job_id'], + flow_id=self.parameters['flow_id'] + ) + service_status = False + else: + # check neccessary service status in storage nodes + for service_name in TENDRL_SERVICES["storage_node"]: + service = get_service_status(service_name) + if not service.running: + if len(service.error) > 0: + msg = ("Unable to find status of the service %s " + "on %s. Error: %s" % ( + service_name, + NS.node_context.fqdn, + service.error + )) + else: + msg = ("Service %s is not running on %s, Please " + "start it manually or check the log file to " + "figure out the exact problem" % ( + service_name, + NS.node_context.fqdn + )) + logger.log( + "error", + NS.get("publisher_id", None), + { + "message": msg + }, + job_id=self.parameters['job_id'], + flow_id=self.parameters['flow_id'] + ) + service_status = False + node_list = self.parameters['Node[]'] + if service_status and len(node_list) > 1: + # check monitoring integration is running on server + try: + node_arr = etcd_utils.read( + "/indexes/tags/tendrl/integration/monitoring" + ).value + node_id = eval(node_arr)[0] + service = NS.tendrl.objects.Service( + service="tendrl-monitoring-integration", + node_id=node_id + ).load() + service.error = list(service.error) + if not service.running: + if len(service.error) > 0: + msg = ("Unable to find status of the service " + "tendrl-monitoring-integration on " + "server-node. Error: %s" % service.error) + else: + msg = ("Service tendrl-monitoring-integration is " + "not running on a server-node, Please start" + " it manually or check the log file to " + "figure out the exact problem") + logger.log( + "error", + NS.get("publisher_id", None), + { + "message": msg + }, + job_id=self.parameters['job_id'], + flow_id=self.parameters['flow_id'] + ) + service_status = False + except (etcd.EtcdKeyNotFound, IndexError): + msg = ("Service tendrl-monitoring-integration is " + "not running in a server, Please start it " + "manually or check the log message to " + "figure out the exact problem") + logger.log( + "error", + NS.get("publisher_id", None), + { + "message": msg + }, + job_id=self.parameters['job_id'], + flow_id=self.parameters['flow_id'] + ) + service_status = False + return service_status + + +def get_service_status(service_name): + service = NS.tendrl.objects.Service( + service=service_name + ) + return service diff --git a/tendrl/commons/objects/node/flows/stop_services/__init__.py b/tendrl/commons/objects/node/flows/stop_services/__init__.py index 83e786ce..8f2cc7f9 100644 --- a/tendrl/commons/objects/node/flows/stop_services/__init__.py +++ b/tendrl/commons/objects/node/flows/stop_services/__init__.py @@ -1,4 +1,5 @@ from tendrl.commons import flows +from tendrl.commons.flows.exceptions import FlowExecutionFailedError from tendrl.commons.utils import cmd_utils from tendrl.commons.utils import log_utils as logger @@ -23,50 +24,64 @@ def run(self): ) srv = NS.tendrl.objects.Service(service=service) if not srv.running: - logger.log( - "debug", - NS.publisher_id, - { - "message": "%s not running on " - "%s" % (service, NS.node_context.node_id) - }, - job_id=self.parameters['job_id'], - flow_id=self.parameters['flow_id'], - ) + if len(srv.error) > 0: + raise FlowExecutionFailedError( + "Unable to check status of service %s " + "on %s. Error: %s" % ( + service, + NS.node_context.node_id, + srv.error + ) + ) + else: + logger.log( + "debug", + NS.publisher_id, + { + "message": "%s not running on " + "%s" % (service, NS.node_context.node_id) + }, + job_id=self.parameters['job_id'], + flow_id=self.parameters['flow_id'], + ) continue _cmd_str = "systemctl stop %s" % service cmd = cmd_utils.Command(_cmd_str) - err, out, rc = cmd.run() + out, err, rc = cmd.run() if err: logger.log( - "error", + "debug", NS.publisher_id, { "message": "Could not stop %s" - " service on %s. Error: %s" % (service, err, - NS.node_context.node_id) + " service on %s. Error: %s" % ( + service, + NS.node_context.node_id, + err + ) }, job_id=self.parameters['job_id'], flow_id=self.parameters['flow_id'], ) - return False _cmd_str = "systemctl disable %s" % service cmd = cmd_utils.Command(_cmd_str) - err, out, rc = cmd.run() + out, err, rc = cmd.run() if err: logger.log( - "error", + "debug", NS.publisher_id, { "message": "Could not disable %s" - " service on %s. Error: %s" % (service, err, - NS.node_context.node_id) + " service on %s. Error: %s" % ( + service, + NS.node_context.node_id, + err + ) }, job_id=self.parameters['job_id'], flow_id=self.parameters['flow_id'], ) - return False return True diff --git a/tendrl/commons/objects/service/__init__.py b/tendrl/commons/objects/service/__init__.py index d86410dc..f952363e 100644 --- a/tendrl/commons/objects/service/__init__.py +++ b/tendrl/commons/objects/service/__init__.py @@ -4,22 +4,34 @@ class Service(objects.BaseObject): def __init__(self, service=None, running=None, exists=None, - *args, **kwargs): + node_id=None, error=None, *args, **kwargs): super(Service, self).__init__(*args, **kwargs) service_detail = self.get_service_info(service) self.list = 'nodes/{0}/Services/' self.running = running or service_detail['running'] self.service = service self.exists = exists or service_detail['exists'] + self.node_id = node_id or NS.node_context.node_id + self.error = error or service_detail['error'] self.value = 'nodes/{0}/Services/{1}' def get_service_info(self, service_name): + error = [] service = service_status.ServiceStatus(service_name) - return {"exists": service.exists(), "running": service.status()} + service_exists, err = service.exists() + if err: + error.append(err) + service_running, err = service.status() + if err: + error.append(err) + return {"exists": service_exists, + "running": service_running, + "error": error + } def render(self): self.value = self.value.format( - NS.node_context.node_id, + self.node_id, self.service.strip("@*") ) return super(Service, self).render() diff --git a/tendrl/commons/utils/ansible_module_runner.py b/tendrl/commons/utils/ansible_module_runner.py index f93e6618..7b496525 100644 --- a/tendrl/commons/utils/ansible_module_runner.py +++ b/tendrl/commons/utils/ansible_module_runner.py @@ -108,7 +108,10 @@ def run(self): stderr=subprocess.PIPE ) out, err = cmd.communicate() - result = json.loads(out) + if err: + result = {'stderr': repr(err)} + else: + result = json.loads(out) except (subprocess.CalledProcessError, ValueError) as ex: result = {'stderr': repr(ex)} diff --git a/tendrl/commons/utils/service_status.py b/tendrl/commons/utils/service_status.py index 536c212c..f6ec5609 100644 --- a/tendrl/commons/utils/service_status.py +++ b/tendrl/commons/utils/service_status.py @@ -24,14 +24,14 @@ def exists(self): ) ) if rc == 0 and stdout.find('LoadState=loaded') >= 0: - return True + return True, stderr else: - return False + return False, stderr def status(self): stdout, stderr, rc = self._execute_service_command('is-active') if "inactive" in stdout: - return False + return False, stderr elif "active" in stdout: - return True - return False + return True, stderr + return False, stderr