Skip to content

Commit

Permalink
Merge pull request #831 from shtripat/expand-cluster-with-detected-peers
Browse files Browse the repository at this point in the history
Added flow ExpandCluserWithDetectedPeers
  • Loading branch information
r0h4n authored Feb 26, 2018
2 parents 8660bbf + 223a239 commit 089a6b3
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 0 deletions.
144 changes: 144 additions & 0 deletions tendrl/commons/flows/expand_cluster_with_detected_peers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import time
import json
import uuid

import etcd

from tendrl.commons import flows
from tendrl.commons.flows.exceptions import FlowExecutionFailedError
from tendrl.commons.objects.job import Job
from tendrl.commons.utils import etcd_utils
from tendrl.commons.utils import log_utils as logger


class ExpandClusterWithDetectedPeers(flows.BaseFlow):
def __init__(self, *args, **kwargs):
super(
ExpandClusterWithDetectedPeers,
self
).__init__(*args, **kwargs)

def run(self):
integration_id = self.parameters['TendrlContext.integration_id']
_cluster = NS.tendrl.objects.Cluster(
integration_id=integration_id
).load()
if _cluster.status is not None and _cluster.status != "" and \
_cluster.status in ["importing", "unmanaging", "expanding"]:
raise FlowExecutionFailedError(
"Another job in progress for cluster, please wait till "
"the job finishes (job_id: %s) (integration_id: %s) " % (
_cluster.current_job['job_id'],
integration_id
)
)
_lock_details = {
'node_id': NS.node_context.node_id,
'fqdn': NS.node_context.fqdn,
'tags': NS.node_context.tags,
'type': NS.type,
'job_name': self.__class__.__name__,
'job_id': self.job_id
}
_cluster.locked_by = _lock_details
_cluster.status = "expanding"
_cluster.current_job = {
'job_id': self.job_id,
'job_name': self.__class__.__name__,
'status': 'in_progress'
}
_cluster.save()

try:
integration_id_index_key = \
"indexes/tags/tendrl/integration/%s" % integration_id
node_ids = etcd_utils.read(
integration_id_index_key).value
node_ids = json.loads(node_ids)
except etcd.EtcdKeyNotFound:
raise FlowExecutionFailedError("Cluster with "
"integration_id "
"(%s) not found, cannot "
"import" % integration_id)

job_ids = []
for node_id in node_ids:
_cnc = NS.tendrl.objects.ClusterNodeContext(
node_id=node_id
).load()
if _cnc.is_managed.lower() == "yes":
continue

params = {
'TendrlContext.integration_id': integration_id,
'Node[]': [node_id],
'Cluster.enable_volume_profiling':
_cluster.enable_volume_profiling
}
payload = {
"tags": ["tendrl/node_%s" % node_id],
"run": "tendrl.flows.ImportCluster",
"status": "new",
"parent": self.parameters['job_id'],
"parameters": params,
"type": "node"
}
_job_id = str(uuid.uuid4())
Job(job_id=_job_id, status="new", payload=payload).save()
logger.log(
"info",
NS.publisher_id,
{
"message": "Importing (job: %s) Node %s "
"to cluster %s" % (
_job_id,
node_id,
integration_id
)
},
job_id=self.parameters['job_id']
)
job_ids.append(_job_id)

loop_count = 0
# Wait for (no of nodes) * 6 minutes for import to complete
wait_count = len(node_ids) * 36
while True:
if loop_count >= wait_count:
logger.log(
"info",
NS.publisher_id,
{
"message": "Import jobs not yet complete "
"on all new nodes. Timing out. (%s, %s)" %
(str(node_ids), integration_id)
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id']
)
return False
time.sleep(10)
finished = True
for job_id in job_ids:
job = Job(job_id=job_id).load()
if job.status != "finished":
finished = False
break
if finished:
break
else:
loop_count += 1
continue

_cluster = NS.tendrl.objects.Cluster(
integration_id=integration_id
).load()
_cluster.status = ""
_cluster.locked_by = {}
_cluster.current_job = {
'status': "finished",
'job_name': self.__class__.__name__,
'job_id': self.job_id
}
_cluster.save()
return True
12 changes: 12 additions & 0 deletions tendrl/commons/objects/definition/master.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ namespace.tendrl:
type: Update
uuid: 2f94a48a-05d7-408c-b400-e27827f4efed
version: 1
ExpandClusterWithDetectedPeers:
tags:
- "provisioner/$TendrlContext.integration_id"
help: "Expand an existing cluster with newly detected peers"
enabled: true
inputs:
mandatory:
- TendrlContext.integration_id
run: tendrl.flows.ExpandClusterWithDetectedPeers
type: Update
uuid: 2f94a48a-05d7-408c-b400-27827f4edcae
version: 1

objects:
Cluster:
Expand Down

0 comments on commit 089a6b3

Please sign in to comment.