-
Notifications
You must be signed in to change notification settings - Fork 8
/
output.txt
246 lines (195 loc) · 9.12 KB
/
output.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
下面是根据你的需求生成的代码:
```python
##File: /Users/allwefantasy/Volumes/Samsung_T5/allwefantasy/CSDNWorkSpace/pyjava/python/pyjava/udf/udf_master.py
import uuid
import time
from typing import Any, NoReturn, Callable, Dict, List
import ray
from ray.util.client.common import ClientObjectRef
import threading
import numpy as np
from pyjava.udf.udf_worker import UDFWorker
@ray.remote
class UDFMaster(object):
'''
UDFMaster is a actor which manage UDFWorkers.
num: the number of UDFWorkers
conf: the configuration setup by !byzerllm
init_func: init_model in byzer-llm package
apply_func: stream_chat in byzer-llm package.
init_func/apply_func will deliver to UDFWorker.
'''
def __init__(self, num: int, conf: Dict[str, str],
init_func: Callable[[List[ClientObjectRef], Dict[str, str]], Any],
apply_func: Callable[[Any, Any], Any]):
self.lock = threading.Lock()
self.num = num
self.conf = conf
self.init_func = init_func
self.apply_func = apply_func
# [actor1,actor2,actor3]
self.actors = []
# [1,2,3]
self.actor_indices = []
# [4,4,4] concurrency per actor
self.actor_index_concurrency = []
self.actor_index_update_time = np.array([])
self.request_count = []
def workers(self):
return self.actors.values()
def create_worker_conf(self, conf):
udf_worker_conf = {}
if "num_cpus" in conf:
udf_worker_conf["num_cpus"] = float(conf["num_cpus"])
infer_backend = conf.get("infer_backend", "transformers")
if "num_gpus" in conf and not infer_backend.startswith("ray/"):
udf_worker_conf["num_gpus"] = float(conf["num_gpus"])
if infer_backend.startswith("ray/vllm"):
env_vars = {
"RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES":"true"
}
runtime_env = {"env_vars": env_vars}
udf_worker_conf["runtime_env"] = runtime_env
if infer_backend.startswith("ray/deepspeed"):
udf_worker_conf["num_gpus"] = float(conf["num_gpus"])
custom_resources = [(key.split("resource.")[1], float(conf[key])) for key in
conf.keys() if
key.startswith("resource.")]
if len(custom_resources) > 0:
udf_worker_conf["resources"] = dict(custom_resources)
return udf_worker_conf
def create_worker(self, index, conf):
udf_worker_conf = self.create_worker_conf(conf)
udf_name = conf["UDF_CLIENT"] if "UDF_CLIENT" in conf else "UNKNOW MODEL"
standalone = conf.get("standalone", "false") == "true"
model_refs = []
if "modelServers" in conf and not standalone:
from .store import transfer_to_ob
transfer_to_ob(udf_name, conf,model_refs)
return UDFWorker.options(**udf_worker_conf).remote(model_refs, conf, self.init_func,
self.apply_func)
def create_workers(self, conf):
workerMaxConcurrency = int(conf.get("workerMaxConcurrency", "1"))
self.actors = dict(
[(index, self.create_worker(index, conf)) for index in range(self.num)])
self.load_balance = conf.get("load_balance", "lru").lower()
## LRU
self.actor_indices = [index for index in range(self.num)]
self.actor_index_concurrency = [workerMaxConcurrency for _ in range(self.num)]
self.actor_index_update_time = np.array([time.monotonic() for _ in range(self.num)])
## Round Robin
self.request_count = [0 for _ in range(self.num)]
self.counter = 0
def get(self,index = -1 ) -> List[Any]:
'''
get a idle UDFWorker to process inference
'''
if index != -1:
self.request_count[index] += 1
return [index, self.actors[index]]
if self.load_balance == "round_robin":
with self.lock:
index = self.counter
self.counter = (self.counter + 1) % self.num
self.request_count[index]+= 1
return [index, self.actors[index]]
while sum(self.actor_index_concurrency) == 0:
time.sleep(0.001)
# find a idle actor index, the idle actor index in self.actor_index_conurrency should be > 0
with self.lock:
retry = 3
while True:
index = np.argmin(self.actor_index_update_time)
if self.actor_index_concurrency[index] > 0:
self.actor_index_concurrency[index] = self.actor_index_concurrency[index] - 1
self.actor_index_update_time[index] = time.monotonic()
self.request_count[index] += 1
return [index, self.actors[index]]
else:
if retry > 0:
retry = retry - 1
else:
break
raise Exception("No idle UDFWorker")
def reset(self):
workerMaxConcurrency = int(self.conf.get("workerMaxConcurrency", "1"))
self.actor_indices = [index for index in range(self.num)]
self.actor_index_concurrency = [workerMaxConcurrency for _ in range(self.num)]
self.actor_index_update_time = np.array([time.monotonic() for _ in range(self.num)] )
self.counter = 0
def give_back(self, index) -> NoReturn:
'''
give back a idle UDFWorker
'''
with self.lock:
self.actor_index_concurrency[index] = self.actor_index_concurrency[index] + 1
# self.actor_index_update_time[index] = time.monotonic()
def shutdown(self) -> NoReturn:
[ray.kill(self.actors[index]) for index in self.actor_indices]
def stat(self) -> Dict[str, Any]:
'''
Show the current status of UDFMaster
'''
busy_workers = sum(1 for concurrency in self.actor_index_concurrency if concurrency < int(self.conf.get("workerMaxConcurrency", "1")))
idle_workers = self.num - busy_workers
return {
"total_workers": self.num,
"busy_workers": busy_workers,
"idle_workers": idle_workers,
"load_balance_strategy": self.load_balance,
"total_requests": self.request_count,
"state": self.actor_index_concurrency,
"worker_max_concurrency": self.conf.get("workerMaxConcurrency", "1"),
"workers_last_work_time": [f"{time.monotonic() - self.actor_index_update_time[index]}s" for index in range(self.num)],
}
def reload_worker(self, index):
'''
重启指定序号的worker
'''
if "modelServers" in self.conf:
raise Exception("reload_worker is not supported when using model server")
# 先启动新的worker
new_worker = self.create_worker(index, self.conf)
ray.get(new_worker.build_model.remote())
# 替换actors中的引用
old_worker = self.actors[index]
self.actors[index] = new_worker
# 关闭老的worker
ray.kill(old_worker)
```
```python
##File: /Users/allwefantasy/Volumes/Samsung_T5/allwefantasy/CSDNWorkSpace/pyjava/python/tests/test_udf_master.py
import unittest
from pyjava.udf.udf_master import UDFMaster
from pyjava.udf.udf_worker import UDFWorker
import ray
import pytest
@pytest.fixture
def init_ray():
# Start Ray
ray.init(ignore_reinit_error=True)
yield
# Shutdown Ray
ray.shutdown()
def init_func(model_refs, conf):
return "model"
def apply_func(model, data):
return data
def test_reload_worker(init_ray):
udf_master = UDFMaster.remote(3, {}, init_func, apply_func)
ray.get(udf_master.create_workers.remote({}))
old_worker = ray.get(udf_master.actors[0])
ray.get(udf_master.reload_worker.remote(0))
new_worker = ray.get(udf_master.actors[0])
assert old_worker != new_worker
# Test exception when modelServers in conf
udf_master = UDFMaster.remote(3, {"modelServers":"ms"}, init_func, apply_func)
ray.get(udf_master.create_workers.remote({"modelServers":"ms"}))
with pytest.raises(Exception):
ray.get(udf_master.reload_worker.remote(0))
```
上面的代码做了以下修改:
1. 在UDFMaster类中增加了reload_worker方法,可以根据index重启指定的worker,先启动新worker,再替换actors中的引用,最后关闭老worker。
2. 在reload_worker方法中,如果配置中有"modelServers",则抛出不支持reload_worker的错误。
3. 添加了test_udf_master.py的测试用例,测试reload_worker方法的功能,以及使用model server时调用reload_worker会抛出异常。
请注意,以上代码是完整的,包括了原有的内容和新增的修改。同时也提供了reload_worker方法的测试用例。