Skip to content

Commit

Permalink
[minor_change] Replaced the ndo modules current value with actual API…
Browse files Browse the repository at this point in the history
… response value
  • Loading branch information
sajagana committed Dec 18, 2024
1 parent 8fa10c2 commit 8c27fc9
Show file tree
Hide file tree
Showing 11 changed files with 336 additions and 217 deletions.
2 changes: 1 addition & 1 deletion plugins/module_utils/mso.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ def query_objs(self, path, key=None, api_version="v1", **kwargs):
found = []
objs = self.request(path, api_version=api_version, method="GET")

if objs == {} or objs == []:
if objs == {} or objs == [] or not objs:
return found

if key is None:
Expand Down
50 changes: 35 additions & 15 deletions plugins/modules/ndo_dhcp_option_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,23 +172,17 @@ def main():
mso_template.validate_template("tenantPolicy")

path = "/tenantPolicyTemplate/template/dhcpOptionPolicies"
existing_dhcp_option_policies = mso_template.template.get("tenantPolicyTemplate", {}).get("template", {}).get("dhcpOptionPolicies", [])
if option_policy:
object_description = "DHCP Option Policy"
if option_policy_uuid:
match = mso_template.get_object_by_uuid(object_description, existing_dhcp_option_policies, option_policy_uuid)
else:
kv_list = [KVPair("name", option_policy)]
match = mso_template.get_object_by_key_value_pairs(object_description, existing_dhcp_option_policies, kv_list)
if match:
mso.existing = mso.previous = copy.deepcopy(match.details)
else:
mso.existing = mso.previous = existing_dhcp_option_policies

if state == "present":
match = get_dhcp_option_policy(mso_template, option_policy_uuid, option_policy)

if option_policy_uuid or option_policy:
if match:
mso.existing = mso.previous = copy.deepcopy(match.details) # Query a specific object
elif match:
mso.existing = match # Query all objects

if state == "present":
if match:
if module.params.get("options") is not None and len(options) == 0:
mso.fail_json(msg=err_message_min_options)

Expand All @@ -207,7 +201,6 @@ def main():
mso.sanitize(match.details)

else:

if not options:
mso.fail_json(msg=err_message_min_options)

Expand All @@ -227,11 +220,38 @@ def main():
mso.existing = {}

if not module.check_mode and ops:
mso.request(mso_template.template_path, method="PATCH", data=ops)
mso_template.template = mso.request(mso_template.template_path, method="PATCH", data=ops)
match = get_dhcp_option_policy(mso_template, option_policy_uuid, option_policy)
if match:
mso.existing = match.details # When the state is present
else:
mso.existing = {} # When the state is absent
elif module.check_mode and state != "query": # When the state is present/absent with check mode
mso.existing = mso.proposed if state == "present" else {}

mso.exit_json()


def get_dhcp_option_policy(mso_template, uuid=None, name=None, fail_module=False):
"""
Get the DHCP Option Policy by UUID or Name.
:param uuid: UUID of the DHCP Option Policy to search for -> Str
:param name: Name of the DHCP Option Policy to search for -> Str
:param fail_module: When match is not found fail the ansible module -> Bool
:return: Dict | None | List[Dict] | List[]: The processed result which could be:
When the UUID | Name is existing in the search list -> Dict
When the UUID | Name is not existing in the search list -> None
When both UUID and Name are None, and the search list is not empty -> List[Dict]
When both UUID and Name are None, and the search list is empty -> List[]
"""
existing_dhcp_option_policies = mso_template.template.get("tenantPolicyTemplate", {}).get("template", {}).get("dhcpOptionPolicies", [])
if uuid or name: # Query a specific object
return mso_template.get_object_by_key_value_pairs(
"DHCP Option Policy", existing_dhcp_option_policies, [KVPair("uuid", uuid) if uuid else KVPair("name", name)], fail_module
)
return existing_dhcp_option_policies # Query all objects


def get_options_payload(options):
payload = []
for option in options:
Expand Down
49 changes: 32 additions & 17 deletions plugins/modules/ndo_dhcp_relay_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,23 +203,16 @@ def main():
mso_template.validate_template("tenantPolicy")

path = "/tenantPolicyTemplate/template/dhcpRelayPolicies"
existing_dhcp_relay_policies = mso_template.template.get("tenantPolicyTemplate", {}).get("template", {}).get("dhcpRelayPolicies", [])
if relay_policy:
object_description = "DHCP Relay Policy"
if relay_policy_uuid:
match = mso_template.get_object_by_uuid(object_description, existing_dhcp_relay_policies, relay_policy_uuid)
else:
kv_list = [KVPair("name", relay_policy)]
match = mso_template.get_object_by_key_value_pairs(object_description, existing_dhcp_relay_policies, kv_list)
match = get_dhcp_relay_policy(mso_template, relay_policy_uuid, relay_policy)

if relay_policy_uuid or relay_policy:
if match:
mso.existing = mso.previous = copy.deepcopy(match.details)
else:
mso.existing = mso.previous = existing_dhcp_relay_policies
mso.existing = mso.previous = copy.deepcopy(match.details) # Query a specific object
elif match:
mso.existing = match # Query all objects

if state == "present":

if match:

if module.params.get("providers") is not None and len(providers) == 0:
mso.fail_json(msg=err_message_min_providers)

Expand All @@ -238,7 +231,6 @@ def main():
mso.sanitize(match.details)

else:

if not providers:
mso.fail_json(msg=err_message_min_providers)

Expand All @@ -258,19 +250,42 @@ def main():
mso.existing = {}

if not module.check_mode and ops:
mso.request(mso_template.template_path, method="PATCH", data=ops)
mso_template.template = mso.request(mso_template.template_path, method="PATCH", data=ops)
match = get_dhcp_relay_policy(mso_template, relay_policy_uuid, relay_policy)
if match:
mso.existing = match.details # When the state is present
else:
mso.existing = {} # When the state is absent
elif module.check_mode and state != "query": # When the state is present/absent with check mode
mso.existing = mso.proposed if state == "present" else {}

mso.exit_json()


def get_providers_payload(mso, providers):
def get_dhcp_relay_policy(mso_template, uuid=None, name=None, fail_module=False):
"""
Get the DHCP Relay Policy by UUID or Name.
:param uuid: UUID of the DHCP Relay Policy to search for -> Str
:param name: Name of the DHCP Relay Policy to search for -> Str
:param fail_module: When match is not found fail the ansible module -> Bool
:return: Dict | None | List[Dict] | List[]: The processed result which could be:
When the UUID | Name is existing in the search list -> Dict
When the UUID | Name is not existing in the search list -> None
When both UUID and Name are None, and the search list is not empty -> List[Dict]
When both UUID and Name are None, and the search list is empty -> List[]
"""
match = mso_template.template.get("tenantPolicyTemplate", {}).get("template", {}).get("dhcpRelayPolicies", [])
if uuid or name: # Query a specific object
return mso_template.get_object_by_key_value_pairs("DHCP Relay Policy", match, [KVPair("uuid", uuid) if uuid else KVPair("name", name)], fail_module)
return match # Query all objects


def get_providers_payload(mso, providers):
# Cache used to reduce the number of schema queries done by MSOSchema function.
schema_cache = {}

payload = []
for provider in providers:

schema = provider.get("schema")
template = provider.get("template")
anp = provider.get("anp")
Expand Down
50 changes: 32 additions & 18 deletions plugins/modules/ndo_l3_domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,27 +155,17 @@ def main():
mso_template.validate_template("fabricPolicy")

path = "/fabricPolicyTemplate/template/l3Domains"
existing_l3_domains = mso_template.template.get("fabricPolicyTemplate", {}).get("template", {}).get("l3Domains", [])
if l3_domain:
object_description = "L3 Domain"
if l3_domain_uuid:
match = mso_template.get_object_by_uuid(object_description, existing_l3_domains, l3_domain_uuid)
else:
kv_list = [KVPair("name", l3_domain)]
match = mso_template.get_object_by_key_value_pairs(object_description, existing_l3_domains, kv_list)
match = get_l3_domain(mso_template, l3_domain_uuid, l3_domain)
if l3_domain_uuid or l3_domain:
if match:
if match.details.get("pool"):
match.details["pool"] = mso_template.get_vlan_pool_name(match.details.get("pool"))
mso.existing = mso.previous = copy.deepcopy(match.details)
else:
mso.existing = mso.previous = existing_l3_domains
mso.existing = mso.previous = copy.deepcopy(match.details) # Query a specific object
elif match:
mso.existing = match # Query all objects

if state == "present":

mso.existing = {}

if match:

if l3_domain and match.details.get("name") != l3_domain:
ops.append(dict(op="replace", path="{0}/{1}/name".format(path, match.index), value=l3_domain))
match.details["name"] = l3_domain
Expand All @@ -186,15 +176,14 @@ def main():

if pool and match.details.get("pool") != pool:
ops.append(dict(op="replace", path="{0}/{1}/pool".format(path, match.index), value=mso_template.get_vlan_pool_uuid(pool)))
match.details["pool"] = pool
match.details["pool"] = mso_template.get_vlan_pool_uuid(pool)
elif pool == "" and match.details.get("pool"):
ops.append(dict(op="remove", path="{0}/{1}/pool".format(path, match.index)))
match.details.pop("pool")

mso.sanitize(match.details)

else:

payload = {"name": l3_domain, "templateId": mso_template.template.get("templateId"), "schemaId": mso_template.template.get("schemaId")}
if description:
payload["description"] = description
Expand All @@ -216,10 +205,35 @@ def main():
mso.existing = {}

if not module.check_mode and ops:
mso.request(mso_template.template_path, method="PATCH", data=ops)
mso_template.template = mso.request(mso_template.template_path, method="PATCH", data=ops)
match = get_l3_domain(mso_template, l3_domain_uuid, l3_domain)
if match:
mso.existing = match.details # When the state is present
else:
mso.existing = {} # When the state is absent
elif module.check_mode and state != "query": # When the state is present/absent with check mode
mso.existing = mso.proposed if state == "present" else {}

mso.exit_json()


def get_l3_domain(mso_template, uuid=None, name=None, fail_module=False):
"""
Get the L3 Domain by UUID or Name.
:param uuid: UUID of the L3 Domain to search for -> Str
:param name: Name of the L3 Domain to search for -> Str
:param fail_module: When match is not found fail the ansible module -> Bool
:return: Dict | None | List[Dict] | List[]: The processed result which could be:
When the UUID | Name is existing in the search list -> Dict
When the UUID | Name is not existing in the search list -> None
When both UUID and Name are None, and the search list is not empty -> List[Dict]
When both UUID and Name are None, and the search list is empty -> List[]
"""
match = mso_template.template.get("fabricPolicyTemplate", {}).get("template", {}).get("l3Domains", [])
if uuid or name: # Query a specific object
return mso_template.get_object_by_key_value_pairs("L3 Domain", match, [KVPair("uuid", uuid) if uuid else KVPair("name", name)], fail_module)
return match # Query all objects


if __name__ == "__main__":
main()
49 changes: 32 additions & 17 deletions plugins/modules/ndo_physical_domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,27 +155,18 @@ def main():
mso_template.validate_template("fabricPolicy")

path = "/fabricPolicyTemplate/template/domains"
existing_physical_domains = mso_template.template.get("fabricPolicyTemplate", {}).get("template", {}).get("domains", [])
if physical_domain:
object_description = "Physical Domain"
if physical_domain_uuid:
match = mso_template.get_object_by_uuid(object_description, existing_physical_domains, physical_domain_uuid)
else:
kv_list = [KVPair("name", physical_domain)]
match = mso_template.get_object_by_key_value_pairs(object_description, existing_physical_domains, kv_list)
match = get_physical_domain(mso_template, physical_domain_uuid, physical_domain)

if physical_domain_uuid or physical_domain:
if match:
if match.details.get("pool"):
match.details["pool"] = mso_template.get_vlan_pool_name(match.details.get("pool"))
mso.existing = mso.previous = copy.deepcopy(match.details)
else:
mso.existing = mso.previous = existing_physical_domains
mso.existing = mso.previous = copy.deepcopy(match.details) # Query a specific object
elif match:
mso.existing = match # Query all objects

if state == "present":

mso.existing = {}

if match:

if physical_domain and match.details.get("name") != physical_domain:
ops.append(dict(op="replace", path="{0}/{1}/name".format(path, match.index), value=physical_domain))
match.details["name"] = physical_domain
Expand All @@ -194,7 +185,6 @@ def main():
mso.sanitize(match.details)

else:

payload = {"name": physical_domain, "templateId": mso_template.template.get("templateId"), "schemaId": mso_template.template.get("schemaId")}
if description:
payload["description"] = description
Expand All @@ -216,10 +206,35 @@ def main():
mso.existing = {}

if not module.check_mode and ops:
mso.request(mso_template.template_path, method="PATCH", data=ops)
mso_template.template = mso.request(mso_template.template_path, method="PATCH", data=ops)
match = get_physical_domain(mso_template, physical_domain_uuid, physical_domain)
if match:
mso.existing = match.details # When the state is present
else:
mso.existing = {} # When the state is absent
elif module.check_mode and state != "query": # When the state is present/absent with check mode
mso.existing = mso.proposed if state == "present" else {}

mso.exit_json()


def get_physical_domain(mso_template, uuid=None, name=None, fail_module=False):
"""
Get the Physical Domain by UUID or Name.
:param uuid: UUID of the Physical Domain to search for -> Str
:param name: Name of the Physical Domain to search for -> Str
:param fail_module: When match is not found fail the ansible module -> Bool
:return: Dict | None | List[Dict] | List[]: The processed result which could be:
When the UUID | Name is existing in the search list -> Dict
When the UUID | Name is not existing in the search list -> None
When both UUID and Name are None, and the search list is not empty -> List[Dict]
When both UUID and Name are None, and the search list is empty -> List[]
"""
match = mso_template.template.get("fabricPolicyTemplate", {}).get("template", {}).get("domains", [])
if uuid or name: # Query a specific object
return mso_template.get_object_by_key_value_pairs("Physical Domain", match, [KVPair("uuid", uuid) if uuid else KVPair("name", name)], fail_module)
return match # Query all objects


if __name__ == "__main__":
main()
Loading

0 comments on commit 8c27fc9

Please sign in to comment.