-
Notifications
You must be signed in to change notification settings - Fork 0
/
tools.py
293 lines (246 loc) · 9.81 KB
/
tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
import os
import re
from typing import List, Dict, Callable, Set
import yaml
from jinja2.exceptions import TemplateError
import torch
import torch.nn.functional as F
from torch import Tensor
from transformers import AutoTokenizer, AutoModel, AutoModelForCausalLM
def create_run(top_k):
run = {"query":{f"q_{idx}":1-score for idx, score in zip(top_k[1][0],top_k[0][0])}}
return run
def last_token_pool(last_hidden_states: Tensor,
attention_mask: Tensor) -> Tensor:
left_padding = (attention_mask[:, -1].sum() == attention_mask.shape[0])
if left_padding:
return last_hidden_states[:, -1]
else:
sequence_lengths = attention_mask.sum(dim=1) - 1
batch_size = last_hidden_states.shape[0]
return last_hidden_states[torch.arange(batch_size, device=last_hidden_states.device), sequence_lengths]
def _load_model(model_name: str, bit4: bool = False) -> Callable:
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name, load_in_4bit=bit4, trust_remote_code=True)
model.eval()
return model, tokenizer
CONFIG_PATH = os.path.join(os.path.dirname(__file__), "config")
def load_config(config_name):
with open(os.path.join(CONFIG_PATH, config_name)) as file:
configs = yaml.safe_load_all(file)
list_config = []
for config in configs:
list_config.append(config)
return list_config
def search(query: str,
model,
index,
collection: List[str],
k: int=2,
tokenizer = None,
rankx: bool = False) -> str:
"""
Parameters
----------
query: str
the query to retrieve the document in the index
model: sentence_transformers.SentenceTransformer
the encoding model to encode the query. Should be the same
used for the faiss index. Currently we only support
SentenceTransformer
index: faiss.Index
the faiss index where from which to retrieve the documents
collection: list
the list containing the documents in the index
k: int
how many documents to retrieve
tokenizer transformers.Tokenizer
if using transformer model, then add tokenizer to
encode the sentences via the transformers API
rankx bool
If true, return the result in the rankx format
Returns
--------
retrieval: list
the retrieved documents from the index
"""
k = len(collection) if rankx else k # when evaluating get all the results
if tokenizer is None:
query_vector = model.encode([query])
else:
batch_dict = tokenizer([query], padding=True, truncation=True, return_tensors="pt")
batch_dict = {k:v.to(model.device) for k, v in batch_dict.items()}
outputs = model(**batch_dict)
query_vector = last_token_pool(outputs.last_hidden_state, batch_dict['attention_mask'])
query_vector = query_vector.detach().cpu().numpy()
top_k = index.search(query_vector, k) # top3 only
if rankx:
return create_run(top_k)
return [collection[_id] for _id in top_k[1].tolist()[0]]
def summarise_question(
model,
tokenizer,
question: str,
prompt: str,
argument: str,
device: str="cpu",
no_arg: bool=False,
no_role: bool=True,
dir_gen: bool=False,
max_len: int=5000
) -> str:
"""
code mostly from https://huggingface.co/mistralai/Mistral-7B-Instruct-v0.2
Parameters
----------
model: transformers.AutoModel
the LLM to be used for answering the question
tokenizer:transformers.AutoTokenizer
the tokenizer associated with the above LLM
question: str
the text to answer (text row from the test dataset)
prompt: str
the prompt, i.e. how you ask the LLM to answer. LLMs are
extremely sensitive to how they are asked to solve a task
and different LLMs answer differently to the same prompt.
device: str
the device to use: "cpu", for CPU, or "cuda", for GPU. The
smallest LLMs (7B) need at least 16GB of GPU (i.e. a single
A100 or 2 T4). If using Google Colab with a single T4, then,
use CPU (which it should take anyway around 6 second per question).
no_arg bool
If True, do not add the topic to the prompt. Use for ablation studies.
no_role bool
If True, do not add the role to the prompt. Use for ablation studies.
dir_gen bool
If True, does not add any additional element to the prompt (ignores \
the "question" argument)
Returns
--------
decoded: str
the answer as output by the LLM.
prompt: str
prompt after inclusion of additional elements (i.e. role and topic)
status: str
one of "success" or "fail" according to whether to code run correctly
"""
question = re.sub("\n", " ", question)
if dir_gen:
messages = [
{"role": "user", "content": prompt}
]
elif no_role and no_arg:
messages = [
{"role": "user", "content": f"{prompt}\nText: {question}"}
]
elif no_role:
messages = [
{"role": "user", "content": f"{prompt} with respect to the topic: {argument}\nText: {question}"}
]
elif no_arg:
messages = [
{"role": "system", "content": "You are an assistant to policy-makers.",
"role": "user", "content": f"{prompt}\nText: {question}"}
]
else:
role_message = "You are an assistant to policy-makers."
messages = [
{"role": "system", "content": role_message},
{"role": "user", "content": f"{prompt} with respect to the topic: {argument}\nText: {question}"}
]
try:
prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
if no_role:
# Qwen 1.5 automatically adds the role, if you don't want it: delete it
prompt = re.sub("<\|im_start\|>system\nYou are a helpful assistant<\|im_end\|>\n",
"",
prompt)
except TemplateError:
# if not supported, do not include system message
prompt = tokenizer.apply_chat_template([messages[-1]], tokenize=False, add_generation_prompt=True)
encodeds = tokenizer.encode(prompt, add_special_tokens=False, return_tensors="pt")
prompt = tokenizer.batch_decode(encodeds)[0]
model_inputs = encodeds.to(device)
try:
model.to(device)
except ValueError:
pass
try:
generated_ids = model.generate(model_inputs, max_new_tokens=max_len, do_sample=False)
decoded = tokenizer.batch_decode(generated_ids)[0]
status = "success"
except:
decoded = prompt + "\nFAILED!"
status = "fail"
return decoded, prompt, status
def azure_summarise(
client,
model: str,
question: str,
prompt: str,
argument: str,
pricing: Dict[str, List],
no_arg: bool=False,
no_role: bool=True,
max_len: int=10000
) -> str:
"""
code mostly from https://huggingface.co/mistralai/Mistral-7B-Instruct-v0.2
Parameters
----------
client openai.AzureOpenAI
the instance of the Azure client class from which the API
call is made.
model: str
the model name, among the ones available from Azure API
question: str
the text to answer (text row from the test dataset)
prompt: str
the prompt, i.e. how you ask the LLM to answer. LLMs are
extremely sensitive to how they are asked to solve a task
and different LLMs answer differently to the same prompt.
pricing: dict
Dictionary of the form {"model_name":[input_price, output_price]}
for tracking the overall spenditure.
no_arg bool
If True, do not add the topic to the prompt. Use for ablation studies.
no_role bool
If True, do not add the role to the prompt. Use for ablation studies.
Returns
--------
decoded: str
the answer as output by the LLM.
prompt: str
prompt after inclusion of additional elements (i.e. role and topic)
price: float
the total price of the API call
"""
question = re.sub("\n", " ", question)
if no_role and no_arg:
messages = [
{"role": "user", "content": f"{prompt}\nText: {question}"}
]
elif no_role:
messages = [
{"role": "user", "content": f"{prompt} with respect to the topic: {argument}\nText: {question}"}
]
elif no_arg:
messages = [
{"role": "system", "content": "You are an assistant to policy-makers.",
"role": "user", "content": f"{prompt}\nText: {question}"}
]
else:
role_message = "You are an assistant to policy-makers."
messages = [
{"role": "system", "content": role_message},
{"role": "user", "content": f"{prompt} with respect to the topic: {argument}\nText: {question}"}
]
response = client.chat.completions.create(
model=model,
messages=messages)
prompt = " ".join([msg["content"] for msg in messages])
decoded = response.choices[0].message.content
in_price = pricing[model][0]*response.usage.prompt_tokens
out_price = pricing[model][0]*response.usage.completion_tokens
price = in_price + out_price
return decoded, prompt, price