-
Notifications
You must be signed in to change notification settings - Fork 13
/
org_memberships.py
115 lines (91 loc) · 4.41 KB
/
org_memberships.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""
Module for Terraform Enterprise/Cloud Migration Worker: Org Memberships.
"""
from terrasnek import exceptions
from .base_worker import TFCMigratorBaseWorker
class OrgMembershipsWorker(TFCMigratorBaseWorker):
"""
A class to represent the worker that will migrate all org memberships from
one TFC/E org to another TFC/E org.
"""
_api_module_used = "org_memberships"
_required_entitlements = []
def migrate(self, teams_map):
"""
Function to migrate all org memberships from one TFC/E org to another TFC/E org.
"""
self._logger.info("Migrating org memberships...")
# Set proper membership filters
active_member_filter = [
{
"keys": ["status"],
"value": "active"
}
]
source_org_members = self._api_source.org_memberships.list_all_for_org( \
filters=active_member_filter)["data"]
target_org_members = self._api_target.org_memberships.list_all_for_org()["data"]
target_org_members_data = {}
for target_org_member in target_org_members:
target_org_members_data[target_org_member["attributes"]["email"]] = \
target_org_member["id"]
org_membership_map = {}
for source_org_member in source_org_members:
source_org_member_email = source_org_member["attributes"]["email"]
source_org_member_id = source_org_member["relationships"]["user"]["data"]["id"]
if source_org_member_email in target_org_members_data:
org_membership_map[source_org_member_id] = \
target_org_members_data[source_org_member_email]
# TODO: should the team membership be checked for an existing org member
# and updated to match the source_org value if different?
self._logger.info("Org member: %s, exists. Skipped.", source_org_member_email)
continue
for team in source_org_member["relationships"]["teams"]["data"]:
team["id"] = teams_map[team["id"]]
# Build the new user invite payload
new_user_invite_payload = {
"data": {
"attributes": {
"email": source_org_member_email
},
"relationships": {
"teams": {
"data": source_org_member["relationships"]["teams"]["data"]
},
},
"type": "organization-memberships"
}
}
# try statement required in case a user account tied to the email address does
# not yet exist
try:
target_org_member = self._api_target.org_memberships.invite( \
new_user_invite_payload)["data"]
except:
org_membership_map[source_org_member["relationships"]["user"]["data"]["id"]] = \
None
self._logger.info(\
"User account for email: %s does not exist. Skipped.", source_org_member_email)
continue
new_user_id = target_org_member["relationships"]["user"]["data"]["id"]
org_membership_map[source_org_member["relationships"]["user"]["data"]["id"]] = \
new_user_id
self._logger.info("Org memberships migrated.")
return org_membership_map
def delete_all_from_target(self):
"""
Function to delete all org memberships from the target TFC/E org.
"""
self._logger.info("Deleting organization members...")
org_members = self._api_target.org_memberships.list_all_for_org()["data"]
for org_member in org_members:
try:
self._api_target.org_memberships.remove(org_member["id"])
self._logger.info("Org member: %s, deleted.", org_member["attributes"]["email"])
except exceptions.TFCHTTPForbidden as forbidden:
# Rather than add some complicated logic, if we get the error message saying
# we can't delete ourselves from a group we own, just skip it. Otherwise
# raise the exception. You will still see an error in the logs.
if "remove the last owner" not in str(forbidden):
raise forbidden
self._logger.info("Organization members deleted.")