Skip to content
This repository was archived by the owner on Jul 4, 2025. It is now read-only.

fix engine race condition #181

Merged
merged 2 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ int main(int argc, char** argv) {

LOG_INFO << "HTTP server listening: " << hostname << ":" << port;
svr->new_task_queue = [] {
return new httplib::ThreadPool(5);
return new httplib::ThreadPool(64);
};
// run the HTTP server in a thread - see comment below
std::thread t([&]() {
Expand Down
90 changes: 61 additions & 29 deletions scripts/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@

start_time = time.time()
SERVER_ENDPOINT = "http://localhost:3928"
TOTAL_REQUESTS = 16
N_PARALLEL = 4
MAX_CTX_FOR_ONE_SEQUENCE = 512
N_CTX = MAX_CTX_FOR_ONE_SEQUENCE*N_PARALLEL # this number related to reserve GPU memory for kv cache
TOTAL_USERS = 40
NUM_ROUNDS = 10
MAX_TOKENS = 500
N_PARALLEL = 32
MAX_CTX_FOR_ONE_SEQUENCE = 1000
# this number related to reserve GPU memory for kv cache
N_CTX = MAX_CTX_FOR_ONE_SEQUENCE*N_PARALLEL


def start_server():
import subprocess
Expand All @@ -21,80 +25,108 @@ def start_server():

def load_model():
headers = {"Content-Type": "application/json"}
data = {"llama_model_path": "/mnt/nas/gguf-models/meta-llama3.1-8b-instruct-q4km.gguf", "model_alias": "meta-llama3.1-8b-instruct",
"model": "meta-llama3.1-8b-instruct", "ctx_len": N_CTX,"n_batch":2048, "ngl": 300, "model_type": "llm", "n_parallel": N_PARALLEL}
data = {"llama_model_path": "/mnt/nas/gguf-models/meta-llama3.1-8b-instruct-q4km.gguf", "model_alias": "meta-llama3.1-8b-instruct","engine": "cortex.llamacpp",
"model": "meta-llama3.1-8b-instruct", "ctx_len": N_CTX, "ngl": 300, "model_type": "llm", "n_parallel": N_PARALLEL}

result = requests.post(SERVER_ENDPOINT+"/loadmodel",
headers=headers, json=data)
# result = requests.post(SERVER_ENDPOINT+"/inferences/server/loadmodel",
# headers=headers, json=data)
print(result.json())


async def send_request(session, prompt):
async def send_request(session, prompt,sleep = 0):
await asyncio.sleep(sleep)
headers = {"Content-Type": "application/json"}
data = {"model": "meta-llama3.1-8b-instruct",
data = {"model": "meta-llama3.1-8b-instruct", "max_tokens": MAX_TOKENS, "stop": ["<|eom_id|>", "<|end_of_text|>", "<|eot_id|>"],"engine": "cortex.llamacpp",
"messages": [{"role": "user", "content": prompt},]}
async with session.post(SERVER_ENDPOINT+"/v1/chat/completions", headers=headers, json=data) as resp:
result = await resp.json()
return result

async def one_user(session, prompt):
tasks = [send_request(session, prompt,random.random()*0.2+ i ) for i in range(NUM_ROUNDS)]
results = await asyncio.gather(*tasks)
return results


async def send_request_sequence():
# warm up
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout()) as session:
res = await send_request(session, "What is GPU?")

start = time.time()
total_token_processed = 0
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout()) as session:

tasks = []
prompts = ["What is GPU?", "Who won the world cup 2022?", "Tell me some dad's joke",
"Write a quick sort function", "What is the price of Nvidia H100?", "Who won the world series in 2020?"]
for number in range(TOTAL_REQUESTS):
for number in range(TOTAL_USERS):
res = await send_request(session, random.choice(prompts))
if res.get("usage"):
total_token_processed += res["usage"]["total_tokens"]
else:
print(res)

end = time.time()
print("Finished in", end-start, "s")
print("Total token:", total_token_processed)
print("Throughput when run in sequence:", total_token_processed/(end-start), "tokens/s")
print("Throughput when run in sequence:",
total_token_processed/(end-start), "tokens/s")
print("------------------------------------------------------------------------")


async def main():
# warm up
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout()) as session:
res = await send_request(session, "What is GPU?")

start = time.time()
total_token_processed = 0
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout()) as session:

tasks = []
prompts = ["What is GPU?", "Who won the world cup 2022?", "Tell me some dad's joke",
"Write a quick sort function", "What is the price of Nvidia H100?", "Who won the world series in 2020?"]
for number in range(TOTAL_REQUESTS):
prompts = [
"What is GPU?",
"Who won the world cup 2022?",
"Tell me so many dad's joke,",
"Write a quick sort function,",
"What is the price of Nvidia H100?",
"Who won the world series in 2020?",
"Tell me a very long story,",
"Who is the best football player in the world?",
"Tell me about compiler,",
"Tell me about AI,"]
for number in range(TOTAL_USERS):
tasks.append(asyncio.ensure_future(
send_request(session, random.choice(prompts))))

results = await asyncio.gather(*tasks)
for res in results:
# print(res)
if res.get("usage"):
total_token_processed += res["usage"]["total_tokens"]
else:
print(res)
one_user(session, random.choice(prompts))))

list_results = await asyncio.gather(*tasks)
for results in list_results:
for res in results:
# print(res)
if res.get("usage"):
total_token_processed += res["usage"]["total_tokens"]
else:
print(res)
end = time.time()
print("Finished in", end-start, "s")
print("Total token:", total_token_processed)
print("Throughput when run parallel:", total_token_processed/(end-start), "tokens/s")
print("Throughput when run parallel:",
total_token_processed/(end-start), "tokens/s")
print("------------------------------------------------------------------------")
with open("result.log","w") as writer:
for results in list_results:
for res in results:
try:
writer.write(res["choices"][0]["message"]["content"] + "\n\n")
except:
continue
# start_server()
load_model()

asyncio.run(main())

asyncio.run(send_request_sequence())
# asyncio.run(send_request_sequence())
print("--- %s seconds ---" % (time.time() - start_time))
71 changes: 39 additions & 32 deletions src/llama_server_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,7 @@ void LlamaServerContext::SendError(int id_task, int id_multi,
res.stop = false;
res.error = true;
res.result_json = {{"content", error}};
LOG_ERROR << "Internel error catched " << error;
{
std::lock_guard<std::mutex> lock(mutex_results);
queue_results.push_back(res);
Expand Down Expand Up @@ -1148,44 +1149,50 @@ void LlamaServerContext::ProcessTasks() {
l.unlock();
break;
}

TaskServer task = queue_tasks.front();
queue_tasks.erase(queue_tasks.begin());
l.unlock();
switch (task.type) {
case TaskType::kCompletionTask: {
LlamaClientSlot* slot = GetSlot(json_value(task.data, "slot_id", -1));
if (slot == nullptr) {
LOG_WARN << "slot unavailable";
// send error result
SendError(task, "slot unavailable");
return;
}

if (task.data.contains("system_prompt")) {
ProcessSystemPromptData(task.data["system_prompt"]);
if (task.type == TaskType::kCancelTask) {
queue_tasks.erase(queue_tasks.begin());

for (auto& slot : slots) {
if (slot.task_id == task.target_id) {
slot.Release();
break;
}
}
l.unlock();
} else if (task.type == TaskType::kCompletionTask) {
LlamaClientSlot* slot = GetSlot(json_value(task.data, "slot_id", -1));
if (slot == nullptr) {
l.unlock();
return;
}
queue_tasks.erase(queue_tasks.begin());
l.unlock();
if (slot == nullptr) {
LOG_WARN << "slot unavailable";
// send error result
SendError(task, "slot unavailable");
return;
}

slot->Reset();
if (task.data.contains("system_prompt")) {
ProcessSystemPromptData(task.data["system_prompt"]);
}

slot->infill = task.infill_mode;
slot->embedding = task.embedding_mode;
slot->task_id = task.id;
slot->multitask_id = task.multitask_id;
slot->Reset();

if (!LaunchSlotWithData(slot, task.data)) {
// send error result
SendError(task, "internal_error");
break;
}
} break;
case TaskType::kCancelTask: { // release slot linked with the task id
for (auto& slot : slots) {
if (slot.task_id == task.target_id) {
slot.Release();
break;
}
}
} break;
slot->infill = task.infill_mode;
slot->embedding = task.embedding_mode;
slot->task_id = task.id;
slot->multitask_id = task.multitask_id;

if (!LaunchSlotWithData(slot, task.data)) {
// send error result
SendError(task, "internal_error");
return;
}
}
}

Expand Down
Loading