-
Notifications
You must be signed in to change notification settings - Fork 0
/
scheduler.py
57 lines (50 loc) · 1.42 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from ip_proxy.setting import *
from multiprocessing import Process
from ip_proxy.api import app
from ip_proxy.getter import Getter
from ip_proxy.tester import Tester
import time
class Scheduler():
def schedule_tester(self, cycle=TESTER_CYCLE):
"""
定时测试代理
:param cycle: 间隔时间
:return: None
"""
tester = Tester()
while True:
print('测试器开始执行')
tester.run()
time.sleep(cycle)
def schedule_getter(self, cycle=GETTER_CYCLE):
"""
定时获取代理
:param cycle:间隔时间
:return: None
"""
getter = Getter()
while True:
print('开始抓取代理')
getter.run()
time.sleep(cycle)
def schedule_api(self):
"""
开启Api
:return:None
"""
app.run(API_HOST, API_PORT)
def run(self):
print('代理池开始运行')
if TESTER_ENABLED:
tester_process = Process(target=self.schedule_tester)
tester_process.start()
if GETTER_ENABLED:
getter_process = Process(target=self.schedule_getter)
getter_process.start()
if API_ENABLED:
api_process = Process(target=self.schedule_api)
api_process.start()
if __name__ == "__main__":
sch = Scheduler()
sch.run()
time.sleep(SLEEP_TIME)