Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added flow ExpandCluserWithDetectedPeers #831

Merged
merged 7 commits into from
Feb 26, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions tendrl/commons/flows/expand_cluster_with_detected_peers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import time
import uuid

from tendrl.commons import flows
from tendrl.commons.flows.exceptions import FlowExecutionFailedError
from tendrl.commons.objects.job import Job
from tendrl.commons.utils import log_utils as logger


class ExpandClusterWithDetectedPeers(flows.BaseFlow):
def __init__(self, *args, **kwargs):
super(
ExpandClusterWithDetectedPeers,
self
).__init__(*args, **kwargs)

def run(self):
integration_id = self.parameters['TendrlContext.integration_id']
_cluster = NS.tendrl.objects.Cluster(
integration_id=integration_id
).load()
if _cluster.status is not None and _cluster.status != "" and \
_cluster.status in ["importing", "unmanaging", "expanding"]:
raise FlowExecutionFailedError(
"Another job in progress for cluster, please wait till "
"the job finishes (job_id: %s) (integration_id: %s) " % (
_cluster.current_job['job_id'],
integration_id
)
)
_lock_details = {
'node_id': NS.node_context.node_id,
'fqdn': NS.node_context.fqdn,
'tags': NS.node_context.tags,
'type': NS.type,
'job_name': self.__class__.__name__,
'job_id': self.job_id
}
_cluster.locked_by = _lock_details
_cluster.status = "expanding"
_cluster.current_job = {
'job_id': self.job_id,
'job_name': self.__class__.__name__,
'status': 'in_progress'
}
_cluster.save()

node_ids = self.parameters.get("Node[]", [])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to detect which nodes are new (check :https://github.com/Tendrl/commons/blob/master/tendrl/commons/flows/import_cluster/__init__.py#L40)

Also, Add a flag "is_managed" to cluster node context, set this from tendrl-gluster-integration service, use this to figure out which nodes are new ("clusternodecontext.is_managed") wouldnt exist for such nodes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought detection code would be separate and after detection, it would invoke this flow. Anyway will try to introduce this is_managed flag with cluster_node_context object and use it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still feel flow should be independent of detection code and as expected it should expect a nodes list for executing the expand of cluster. If that was the case we could just formed and saved jobs while detection itself, why have a separate flow defined for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not talking about detecting the sds or making detected_cluster_id. This flow will be called only after the new nodes have the same TendrlContext (i.e same integration_id). So all you need to do is read list of nodes from here https://github.com/Tendrl/commons/blob/master/tendrl/commons/flows/import_cluster/__init__.py#L38

and then import those nodes for which ClusterNodeContext.is_managed is "no"

job_ids = []
for node_id in node_ids:
params = {
'TendrlContext.integration_id': integration_id,
'Node[]': [node_id],
'Cluster.enable_volume_profiling':
_cluster.enable_volume_profiling
}
payload = {
"tags": ["tendrl/node_%s" % node_id],
"run": "tendrl.flows.ImportCluster",
"status": "new",
"parent": self.parameters['job_id'],
"parameters": params,
"type": "node"
}
_job_id = str(uuid.uuid4())
Job(job_id=_job_id, status="new", payload=payload).save()
logger.log(
"info",
NS.publisher_id,
{
"message": "Importing (job: %s) Node %s "
"to cluster %s" % (
_job_id,
node_id,
integration_id
)
},
job_id=self.parameters['job_id']
)
job_ids.append(_job_id)

loop_count = 0
# Wait for (no of nodes) * 6 minutes for import to complete
wait_count = len(node_ids) * 36
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@r0h4n node_ids is not the list of nodes with which the cluster is expanded. Rather this is the full list of nodes as few skips might happen at L#70. We should wait for only no of nodes which actually executed import cluster flow here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please send a fix

while True:
if loop_count >= wait_count:
logger.log(
"info",
NS.publisher_id,
{
"message": "Import jobs not yet complete "
"on all new nodes. Timing out. (%s, %s)" %
(str(node_ids), integration_id)
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id']
)
return False
time.sleep(10)
finished = True
for job_id in job_ids:
job = Job(job_id=job_id).load()
if job.status != "finished":
finished = False
break
if finished:
break
else:
loop_count += 1
continue

_cluster = NS.tendrl.objects.Cluster(
integration_id=integration_id
).load()
_cluster.status = ""
_cluster.locked_by = {}
_cluster.current_job = {
'status': "finished",
'job_name': self.__class__.__name__,
'job_id': self.job_id
}
_cluster.save()
return True
13 changes: 13 additions & 0 deletions tendrl/commons/objects/definition/master.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,19 @@ namespace.tendrl:
type: Update
uuid: 2f94a48a-05d7-408c-b400-e27827f4efed
version: 1
ExpandClusterWithDetectedPeers:
tags:
- "provisioner/$TendrlContext.integration_id"
help: "expanding an existing cluster with newly detected peers"
enabled: true
inputs:
mandatory:
- "Node[]"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Node[] not required

- TendrlContext.integration_id
run: tendrl.flows.ExpandClusterWithDetectedPeers
type: Update
uuid: 2f94a48a-05d7-408c-b400-27827f4edcae
version: 1

objects:
Cluster:
Expand Down