This repository has been archived by the owner on Apr 5, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
main.py
117 lines (101 loc) · 3.63 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from network.iaaa import IAAAClient
from network.epe import EpeClient
import time
import random
import configparser
config = configparser.ConfigParser()
config.read("config.ini")
ua = config["loop"]["ua"]
prefer_site = eval(config["target"]["prefer_site"])
prefer_time = eval(config["target"]["prefer_time"])
game_interval = int(config["target"]["interval"])
slt = float(config["loop"]["sleepTime"])
sltNA = float(config["loop"]["sleepTimeIfNotAvail"])
def asciiTime():
return time.asctime( time.localtime(time.time()) )
def searchForAppropSite(r, date, ts, sites, length):
data = r.json()["data"]
timeMap = data["spaceTimeInfo"]
spaceMap = data["reservationDateSpaceInfo"][date]
timeId = []
avails = []
for t in ts:
timeId.append(timeMap[t]["id"])
for tId in timeId:
for s in sites:
ls = spaceMap[s]
if ls[str(tId)]["reservationStatus"] == 1:
flag = True
for i in range(1, length):
if ls[str(tId + i)]["reservationStatus"] != 1:
flag = False
if flag:
tIds = [str(i) for i in range(tId, tId+length)]
avails.append((ls["id"], tIds))
return avails
# Epe login to get cgAuthorization
epe = EpeClient(1, timeout=1000)
epe.set_user_agent(ua)
r = epe.redirectVenue()
iaaa = IAAAClient(timeout=30)
iaaa.set_user_agent(ua)
r1 = iaaa.oauth_home()
r1 = iaaa.oauth_login(config["user"]["user_id"], config["user"]["passwd"])
# print("token: ", r1.json())
try:
if not "token" in r1.json():
raise Exception("get token error")
token = r1.json()["token"]
except Exception as e:
print(r1)
r2 = epe.get_ticket(token)
sso_pku_token = epe._session.cookies.get_dict()["sso_pku_token"]
# print("sso pku token ", sso_pku_token)
r3 = epe.beforeRoleLogin(sso_pku_token)
access_token = r3.json()["data"]["token"]["access_token"]
r = epe.roleLogin(access_token)
cgAuth = r.json()["data"]["token"]["access_token"]
## start looking for available fields
while True:
r = epe.infoLookup(cgAuth, (config["target"]["gym"], config["target"]["date"]))
try:
if r.json()["message"] != "OK":
raise Exception(r.text)
except Exception as e:
print(asciiTime(), ": ", e)
time.sleep(slt + random.random())
continue
avails = searchForAppropSite(r, config["target"]["date"], prefer_time, prefer_site, game_interval)
if len(avails) == 0:
print(asciiTime(), " ", "所选时段暂无可用场地")
time.sleep(sltNA + random.random())
continue
for x in avails:
try:
orderLs = []
spaceId = str(x[0])
for tId in x[1]:
orderLs.append(
{
"spaceId": spaceId,
"timeId": tId,
"venueSpaceGroupId":None
}
)
r2 = epe.makeOrder(cgAuth, [config["target"]["gym"], config["target"]["date"]], orderLs)
if r2.json()["message"] != "OK":
raise Exception(r2.text)
time.sleep(0.5)
continue
r3 = epe.submit(cgAuth, [config["target"]["gym"], config["target"]["date"]], orderLs, config["user"]["phonenumber"])
if r3.json()["message"] != "OK":
raise Exception(r3.text)
time.sleep(0.5)
continue
print("预订成功!记得去智慧场馆付款\n", r3.text)
break
except Exception as e:
print(asciiTime(), " ", e)
continue
break
epe.logout()