-
Notifications
You must be signed in to change notification settings - Fork 71
/
utils.py
199 lines (150 loc) · 6.54 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
from collections import Counter
import ctypes
import datetime
import logging
import os
import platform
import threading
from enum import Enum
import prettytable as pt
from fastHan import FastHan
model = FastHan()
model.set_cws_style('wtb')
def is_windows() -> bool:
plat_sys = platform.system()
return plat_sys == "Windows"
if is_windows():
import winreg
def get_log_level(config: dict) -> int:
if config.get('root', {}).get('logger', {}).get('log_level','DEBUG') == 'DEBUG':
return logging.DEBUG
if config.get('root', {}).get('logger', {}).get('log_level','DEBUG') == 'INFO':
return logging.INFO
if config.get('root', {}).get('logger', {}).get('log_level','DEBUG') == 'WARN':
return logging.WARN
if config.get('root', {}).get('logger', {}).get('log_level','DEBUG') == 'ERROR':
return logging.ERROR
return logging.INFO
def check_and_create_dir(dirs: str) -> None:
if not os.path.exists(dirs):
os.mkdir(dirs)
def init_data_dirs(root_dir: str = os.getcwd()) -> None:
check_and_create_dir(os.path.join(root_dir, 'data'))
check_and_create_dir(os.path.join(root_dir, 'data', 'records'))
check_and_create_dir(os.path.join(root_dir, 'data', 'merged'))
check_and_create_dir(os.path.join(root_dir, 'data', 'merge_confs'))
check_and_create_dir(os.path.join(root_dir, 'data', 'danmu'))
check_and_create_dir(os.path.join(root_dir, 'data', 'outputs'))
check_and_create_dir(os.path.join(root_dir, 'data', 'splits'))
check_and_create_dir(os.path.join(root_dir, 'data', 'cred'))
def init_record_dir(room_id: str, global_start: datetime.datetime, root_dir: str = os.getcwd()) -> str:
dirs = os.path.join(root_dir, 'data', 'records',
f"{room_id}_{global_start.strftime('%Y-%m-%d_%H-%M-%S')}")
check_and_create_dir(dirs)
return dirs
def init_danmu_log_dir(room_id: str, global_start: datetime.datetime, root_dir: str = os.getcwd()) -> str:
log_dir = os.path.join(
root_dir, 'data', 'danmu', f"{room_id}_{global_start.strftime('%Y-%m-%d_%H-%M-%S')}")
check_and_create_dir(log_dir)
return log_dir
def generate_filename(room_id: str) -> str:
return f"{room_id}_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.flv"
def get_global_start_from_records(record_dir: str) -> datetime.datetime:
base = os.path.basename(record_dir)
return datetime.datetime.strptime(" ".join(base.split("_")[1:3]), '%Y-%m-%d %H-%M-%S')
def get_merged_filename(room_id: str, global_start: datetime.datetime, root_dir: str = os.getcwd()) -> str:
filename = os.path.join(root_dir, 'data', 'merged',
f"{room_id}_{global_start.strftime('%Y-%m-%d_%H-%M-%S')}_merged.mp4")
return filename
def init_outputs_dir(room_id: str, global_start: datetime.datetime, root_dir: str = os.getcwd()) -> str:
dirs = os.path.join(root_dir, 'data', 'outputs',
f"{room_id}_{global_start.strftime('%Y-%m-%d_%H-%M-%S')}")
check_and_create_dir(dirs)
return dirs
def init_splits_dir(room_id: str, global_start: datetime.datetime, root_dir: str = os.getcwd()) -> str:
dirs = os.path.join(root_dir, 'data', 'splits',
f"{room_id}_{global_start.strftime('%Y-%m-%d_%H-%M-%S')}")
check_and_create_dir(dirs)
return dirs
def get_merge_conf_path(room_id: str, global_start: datetime.datetime, root_dir: str = os.getcwd()) -> str:
filename = os.path.join(root_dir, 'data', 'merge_confs',
f"{room_id}_{global_start.strftime('%Y-%m-%d_%H-%M-%S')}_merge_conf.txt")
return filename
def get_cred_filename(room_id: str, root_dir: str = os.getcwd()) -> str:
filename = os.path.join(root_dir, 'data', 'cred',
f"{room_id}_cred.json")
return filename
def del_files_and_dir(dirs: str) -> None:
for filename in os.listdir(dirs):
os.remove(os.path.join(dirs, filename))
os.rmdir(dirs)
def get_rough_time(hour: int) -> str:
if 0 <= hour < 6:
return "凌晨"
elif 6 <= hour < 12:
return "上午"
elif 12 <= hour < 18:
return "下午"
else:
return "晚上"
def refresh_reg() -> None:
HWND_BROADCAST = 0xFFFF
WM_SETTINGCHANGE = 0x1A
SMTO_ABORTIFHUNG = 0x0002
result = ctypes.c_long()
SendMessageTimeoutW = ctypes.windll.user32.SendMessageTimeoutW
SendMessageTimeoutW(HWND_BROADCAST, WM_SETTINGCHANGE, 0,
u'Environment', SMTO_ABORTIFHUNG, 5000, ctypes.byref(result))
def add_path(path: str) -> None:
abs_path = os.path.abspath(path)
path_key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
'Environment', 0, winreg.KEY_ALL_ACCESS)
path_value = winreg.QueryValueEx(path_key, 'Path')
if path_value[0].find(abs_path) == -1:
winreg.SetValueEx(path_key, "Path", 0,
winreg.REG_EXPAND_SZ, path_value[0]+(";" if path_value[0][-1] != ";" else "")+abs_path+";")
refresh_reg()
class state(Enum):
ERROR = -1
WAITING_FOR_LIVE_START = 0
LIVE_STARTED = 1
PROCESSING_RECORDS = 2
UPLOADING_TO_BILIBILI = 3
UPLOADING_TO_BAIDUYUN = 4
def __str__(self):
if self.value == -1:
return "错误!"
if self.value == 0:
return "摸鱼中"
if self.value == 1:
return "正在录制"
if self.value == 2:
return "正在处理视频"
if self.value == 3:
return "正在上传至Bilibili"
if self.value == 4:
return "正在上传至百度网盘"
def __int__(self):
return self.value
def print_log(runner_list: list) -> str:
tb = pt.PrettyTable()
tb.field_names = ["TID", "平台", "房间号", "直播状态", "程序状态", "状态变化时间"]
for runner in runner_list.values():
tb.add_row([runner.native_id, runner.mr.bl.site_name, runner.mr.bl.room_id, "是" if runner.mr.bl.live_status else "否",
str(state(runner.mr.current_state.value)), datetime.datetime.fromtimestamp(runner.mr.state_change_time.value)])
print(
f" DDRecorder 当前时间:{datetime.datetime.now()} 正在工作线程数:{threading.activeCount()}\n")
print(tb)
print("\n")
def get_words(txts, topK=5):
seg_list = []
for txt in txts:
seg_list.extend(model(txt)[0])
c = Counter()
for x in seg_list: # 进行词频统计
if len(x) > 1 and x != '\r\n' and x != '\n':
c[x] += 1
try:
return list(list(zip(*c.most_common(topK)))[0])
except IndexError:
return []