-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker_ids.py
62 lines (49 loc) · 1.88 KB
/
worker_ids.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import json
import os
from pprint import pprint
import upwork
from dotenv import load_dotenv
from oauthlib.oauth2.rfc6749.errors import InvalidClientIdError
from upwork.routers.hr import engagements
load_dotenv()
def create_client():
with open('token.json', 'r') as f:
token_string = f.read()
token = json.loads(token_string)
config = upwork.Config({
"client_id": os.getenv("CLIENT_ID"),
"client_secret": os.getenv("CLIENT_SECRET"),
"token": token
})
client = upwork.Client(config)
return client
def main():
client = create_client()
try:
engagement_list = engagements.Api(client).get_list(params={
"created_time_from": "2023-04-01T00:00:01",
"status": "active",
"page": "page=0;100"
})['engagements']['engagement']
except InvalidClientIdError:
authorization_url, state = client.get_authorization_url()
authz_code = input("Please enter the full callback URL you get "
"following this link:\n{0}\n\n> ".format(authorization_url))
print("Retrieving access and refresh tokens.... ")
token = client.get_access_token(authz_code)
token_str = json.dumps(token)
with open('token.json', 'w') as f:
f.write(token_str)
client = create_client() # recreate client with new token
engagement_list = engagements.Api(client).get_list(params={
"created_time_from": "2023-04-01T00:00:01",
"status": "active",
"page": "page=0;100"
})['engagements']['engagement']
with open('engagement_list.csv', 'w') as f:
f.write("portrait_url,reference,provider__reference\n")
for e in engagement_list:
f.write(f"{e['portrait_url']}, {e['reference']}, {e['provider__reference']}\n")
return engagement_list
if __name__ == '__main__':
main()