-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathget_members.py
129 lines (111 loc) · 4.88 KB
/
get_members.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import webexteamssdk
import itertools
import time
import concurrent.futures
import tqdm
import asyncio
import aiohttp
from dotenv import load_dotenv
import os
SPACES = 100 # number of spaces to work on
MAX_WORKER = 100
load_dotenv()
ACCESS_TOKEN = os.getenv('ACCESS_TOKEN')
def get_memberships(api, spaces):
"""
Get memberships for all spaces by calling the respective API one by one
:param api: webexteamssdk api
:param spaces: list of spaces
:return: tuple list of list of memberhips, seconds
"""
#return [], 0
start = time.perf_counter()
result = list(
tqdm.tqdm(
(list(api.memberships.list(roomId=space.id)) for space in spaces),
total=len(spaces)))
diff = time.perf_counter() - start
seconds = diff
return result, seconds
def get_memberships_concurrent(api, spaces):
"""
Get memberships for all spaces by calling the respective API in multiple threads
:param api: webexteamssdk api
:param spaces: list of spaces
:return: tuple list of list of memberhips, seconds
"""
start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKER) as executor:
# create a dictionary mapping futures to the space id for all spaces and schedule tasks on the fly
# each call to executor.submit() schedules a task and returns a future for that task
future_to_space_id = {executor.submit(lambda: list(api.memberships.list(roomId=space.id))): space.id for space
in spaces}
result = {}
# then iterate through the futures as they become done
# not necessarily in the same order as they were submitted
# as_completed() is only used to be able to get an interator which can be passed to tqdm to get a progress bar
# else a simple executor.map() could be used to schedule all tasks and wait for all results
for future_done in tqdm.tqdm(concurrent.futures.as_completed(future_to_space_id), total=len(spaces)):
# store the results of the future in a dictionary mapping from space id to results
result[future_to_space_id[future_done]] = future_done.result()
# finally create list of results in the order of the spaces.
result = [result[space.id] for space in spaces]
diff = time.perf_counter() - start
seconds = diff
return result, seconds
async def get_membership(space_id, throttle):
"""
Get membership list for a given space
:param space_id: ID of space to obtain membership list for
:param throttle: throttling semaphore
:return: tuple space id, membership list
"""
url = 'https://api.ciscospark.com/v1/memberships'
headers = {'Authorization': f'Bearer {ACCESS_TOKEN}'}
params = {'roomId': space_id}
try:
await throttle.acquire()
async with aiohttp.request('GET', url, headers=headers, params=params) as resp:
resp.raise_for_status()
result = await resp.json()
finally:
throttle.release()
memberships = [webexteamssdk.Membership(i) for i in result['items']]
return space_id, memberships
async def get_memberships_asyncio(spaces):
"""
Get memberships for all spaces by calling the respective API in multiple threass
:param spaces: list of spaces
:return: tuple list of list of memberhips, seconds
"""
throttle = asyncio.Semaphore(MAX_WORKER)
start = time.perf_counter()
# create list of tasks
tasks = [get_membership(space.id, throttle) for space in spaces]
result = {}
# then iterate through the futures as they become done
# not necessarily in the same order as they were submitted
# as_completed() is only used to be able to get an interator which can be passed to tqdm to get a progress bar
# else a simple asyncio.gather() could be used to schedule all tasks and wait for all results
for future_done in tqdm.tqdm(asyncio.as_completed(tasks), total=len(spaces)):
space_id, memberships = await future_done
result[space_id] = memberships
# finally create list of results in the order of the spaces.
result = [result[space.id] for space in spaces]
diff = time.perf_counter() - start
seconds = diff
return result, seconds
def main():
api = webexteamssdk.WebexTeamsAPI(access_token=ACCESS_TOKEN)
# take 1st few spaces
print(f'Getting first {SPACES} spaces...')
spaces = list(itertools.islice(api.rooms.list(max=100), SPACES))
print(f'Got first {SPACES} spaces...')
_, seconds = get_memberships(api, spaces)
_, seconds_concurrent = get_memberships_concurrent(api, spaces)
_, seconds_asyncio = asyncio.run(get_memberships_asyncio(spaces))
print(f'get_memberships() took {seconds} seconds')
print(f'get_memberships_concurrent() took {seconds_concurrent} seconds')
print(f'get_memberships_asyncio() took {seconds_asyncio} seconds')
if __name__ == '__main__':
main()