forked from tl-its-umich-edu/kartograafr
-
Notifications
You must be signed in to change notification settings - Fork 0
/
arcgisUM.py
217 lines (163 loc) · 8.1 KB
/
arcgisUM.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# Wrapper around calls to arcgis. Helps with testing and future changes.
import json
from io import StringIO
import datetime
import logging
logger = logging.getLogger(__name__)
import arcgis
import dateutil.tz
import config
# secrets really is used during (import to change sensitive properties).
import secrets # @UnusedImport
import util
##### Improved code tracebacks for exceptions
import traceback
def handleError(self, record): # @UnusedVariable
traceback.print_stack()
logging.Handler.handleError = handleError
#####
TIMEZONE_UTC = dateutil.tz.tzutc()
RUN_START_TIME = datetime.datetime.now(tz=TIMEZONE_UTC)
RUN_START_TIME_FORMATTED = RUN_START_TIME.strftime('%Y%m%d%H%M%S')
# Hold parsed options
options = None
# TODO: required in this module?
courseLogHandlers = dict()
courseLoggers = dict()
def getArcGISConnection(securityinfo):
"""
Get a connection object for ArcGIS based on configuration options
:return: Connection object for the ArcGIS service
:rtype: arcresthelper.orgtools.orgtools
:raises: RuntimeError if ArcGIS connection is not valid
:raises: arcresthelper.common.ArcRestHelperError for ArcREST-related errors
"""
if not isinstance(securityinfo, dict):
raise TypeError('Argument securityinfo type should be dict')
try:
arcGIS = arcgis.GIS(securityinfo['org_url'],
securityinfo['username'],
securityinfo['password']);
except RuntimeError as exp:
logger.error("RuntimeError: getArcGISConnection: {}".format(exp))
raise RuntimeError(str('ArcGIS connection invalid: {}'.format(exp)))
return arcGIS
def getArcGISGroupByTitle(arcGISAdmin, title):
"""
Given a possible title of a group, search for it in ArcGIS
and return a Group object if found or None otherwise.
:param arcGISAdmin: ArcGIS Administration REST service connection object
:type arcGISAdmin: arcrest.manageorg.administration.Administration
:param title: Group title to be found
:type title: str
:return: ArcGIS Group object or None
:rtype: Group or None
"""
searchString = "title:"+title
logger.debug("group search string: original: {}".format(searchString))
# quote characters that are special in the group search.
searchString = searchString.translate(str.maketrans({"?": r"\?","*": r"\*"}))
logger.debug("group search string: escaped: {}".format(searchString))
try:
gis_groups = arcGISAdmin.groups.search(searchString)
except RuntimeError as exp:
logger.error("arcGIS error finding group: {} exception: {}".format(searchString,exp))
return None
if len(gis_groups) > 0:
return gis_groups.pop()
return None
def addCanvasUsersToGroup(instructorLog, group, courseUsers):
"""Add new users to the ArcGIS group. """
groupNameAndID = util.formatNameAndID(group)
logger.info("addCanvasUsersToGroup: enter")
if len(courseUsers) == 0:
logger.info('No new users to add to ArcGIS Group {}'.format(groupNameAndID))
return instructorLog
logger.info('Adding Canvas Users to ArcGIS Group {}: {}'.format(groupNameAndID, courseUsers))
# ArcGIS usernames are U-M uniqnames with the ArcGIS organization name appended.
arcGISFormatUsers = formatUsersNamesForArcGIS(courseUsers)
logger.debug("addCanvasUsersToGroup: formatted: {}".format(arcGISFormatUsers))
results = group.add_users(arcGISFormatUsers)
logger.debug("adding: results: {}".format(results))
usersNotAdded = results.get('notAdded')
""":type usersNotAdded: list"""
usersCount = len(arcGISFormatUsers)
usersCount -= len(usersNotAdded) if usersNotAdded else 0
logger.debug("usersCount: {}".format(usersCount))
logger.debug("aCUTG: instructorLog 1: [{}]".format(instructorLog))
instructorLog += 'Number of users added to group: [{}]\n\n'.format(usersCount)
logger.debug("aCUTG: instructorLog 2: [{}]".format(instructorLog))
if usersNotAdded:
logger.warning('Warning: Some or all users not added to ArcGIS group {}: {}'.format(groupNameAndID, usersNotAdded))
instructorLog += 'Users not in group (these users need ArcGIS accounts created for them):\n' + '\n'.join(['* ' + userNotAdded for userNotAdded in usersNotAdded]) + '\n\n' + 'ArcGIS group ID number:\n{}\n\n'.format(group.id)
instructorLog += '- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -\n'
logger.debug("aCUTG: instructorLog 3: [{}]".format(instructorLog))
logger.info("addCanvasUsersToGroup: instructorLog: [{}]".format(instructorLog))
return instructorLog
def getCurrentArcGISMembers(group, groupNameAndID):
groupAllMembers = {}
try:
groupAllMembers = group.get_members()
except RuntimeError as exception:
logger.error('Exception while getting users for ArcGIS group "{}": {}'.format(groupNameAndID, exception))
groupUsers = groupAllMembers.get('users')
""":type groupUsers: list"""
return groupUsers
def removeListOfUsersFromArcGISGroup(group, groupNameAndID, groupUsers):
"""Remove only listed users from ArcGIS group."""
if len(groupUsers) == 0:
logger.info('No obsolete users to remove from ArcGIS Group {}'.format(groupNameAndID))
return None
logger.info('ArcGIS Users to be removed from ArcGIS Group [{}] [{}]'.format(groupNameAndID, ','.join(groupUsers)))
results = None
try:
results = group.removeUsersFromGroup(','.join(groupUsers))
except RuntimeError as exception:
logger.error('Exception while removing users from ArcGIS group "{}": {}'.format(groupNameAndID, exception))
return None
usersNotRemoved = results.get('notRemoved')
""":type usersNotRemoved: list"""
if usersNotRemoved:
logger.warning('Warning: Some or all users not removed from ArcGIS group {}: {}'.format(groupNameAndID, usersNotRemoved))
return results
def removeSomeExistingGroupMembers(groupTitle, group,instructorLog,groupUsers):
"""Get list of ArgGIS users to remove from group and call method to remove them."""
results = ''
groupNameAndID = util.formatNameAndID(group)
logger.info('Found ArcGIS group: {}'.format(groupNameAndID))
instructorLog += 'Updating ArcGIS group: "{}"\n'.format(groupTitle)
if not groupUsers:
logger.info('Existing ArcGIS group {} does not have users to remove.'.format(groupNameAndID))
else:
results = removeListOfUsersFromArcGISGroup(group, groupNameAndID, groupUsers)
return instructorLog, results
def createNewArcGISGroup(arcGIS, groupTags, groupTitle,instructorLog):
"""Create a new ArgGIS group. Return group and any creation messages."""
group=None
logger.info('Creating ArcGIS group: "{}"'.format(groupTitle))
instructorLog += 'Creating ArcGIS group: "{}"\n'.format(groupTitle)
try:
group = arcGIS.groups.create(groupTitle,groupTags)
except RuntimeError as exception:
logger.exception('Exception while creating ArcGIS group "{}": {}'.format(groupTitle, exception))
return group, instructorLog
# Get ArcGIS group with this title (if it exists)
def lookForExistingArcGISGroup(arcGIS, groupTitle):
"""Find an ArgGIS group with a matching title."""
logger.info('Searching for existing ArcGIS group "{}"'.format(groupTitle))
try:
group = getArcGISGroupByTitle(arcGIS, groupTitle)
except RuntimeError as exception:
logger.exception('Exception while searching for ArcGIS group "{}": {}'.format(groupTitle, exception))
return group
def formatUsersNamesForArcGIS(userList):
"""Convert list of Canvas user name to the format used in ArcGIS."""
userList = [user + '_' + config.ArcGIS.ORG_NAME for user in userList]
return userList
def updateArcGISItem(item: arcgis.gis.Item, data: dict):
itemType = arcgis.gis.Item
assert isinstance(item, itemType), '"item" is not type "' + str(itemType) + '"'
dataType = dict
assert isinstance(data, dataType), '"data" is not type "' + str(dataType) + '"'
with StringIO(json.dumps(data)) as dataJson:
item.update(data = dataJson.read())