From ed37e8d2340a0f049ecb8ac68213774610c0f894 Mon Sep 17 00:00:00 2001 From: nguyenhoangthuan99 Date: Thu, 8 Aug 2024 01:50:55 +0000 Subject: [PATCH 1/2] fix engine race condition --- examples/server/server.cc | 2 +- scripts/benchmark.py | 90 +++++++++++++++++++++++++------------ src/llama_server_context.cc | 80 ++++++++++++++++++++------------- 3 files changed, 110 insertions(+), 62 deletions(-) diff --git a/examples/server/server.cc b/examples/server/server.cc index acae5b7..6011254 100644 --- a/examples/server/server.cc +++ b/examples/server/server.cc @@ -241,7 +241,7 @@ int main(int argc, char** argv) { LOG_INFO << "HTTP server listening: " << hostname << ":" << port; svr->new_task_queue = [] { - return new httplib::ThreadPool(5); + return new httplib::ThreadPool(64); }; // run the HTTP server in a thread - see comment below std::thread t([&]() { diff --git a/scripts/benchmark.py b/scripts/benchmark.py index 25c7e9e..22fc06f 100644 --- a/scripts/benchmark.py +++ b/scripts/benchmark.py @@ -7,10 +7,14 @@ start_time = time.time() SERVER_ENDPOINT = "http://localhost:3928" -TOTAL_REQUESTS = 16 -N_PARALLEL = 4 -MAX_CTX_FOR_ONE_SEQUENCE = 512 -N_CTX = MAX_CTX_FOR_ONE_SEQUENCE*N_PARALLEL # this number related to reserve GPU memory for kv cache +TOTAL_USERS = 40 +NUM_ROUNDS = 10 +MAX_TOKENS = 500 +N_PARALLEL = 32 +MAX_CTX_FOR_ONE_SEQUENCE = 1000 +# this number related to reserve GPU memory for kv cache +N_CTX = MAX_CTX_FOR_ONE_SEQUENCE*N_PARALLEL + def start_server(): import subprocess @@ -21,80 +25,108 @@ def start_server(): def load_model(): headers = {"Content-Type": "application/json"} - data = {"llama_model_path": "/mnt/nas/gguf-models/meta-llama3.1-8b-instruct-q4km.gguf", "model_alias": "meta-llama3.1-8b-instruct", - "model": "meta-llama3.1-8b-instruct", "ctx_len": N_CTX,"n_batch":2048, "ngl": 300, "model_type": "llm", "n_parallel": N_PARALLEL} + data = {"llama_model_path": "/mnt/nas/gguf-models/meta-llama3.1-8b-instruct-q4km.gguf", "model_alias": "meta-llama3.1-8b-instruct","engine": "cortex.llamacpp", + "model": "meta-llama3.1-8b-instruct", "ctx_len": N_CTX, "ngl": 300, "model_type": "llm", "n_parallel": N_PARALLEL} + result = requests.post(SERVER_ENDPOINT+"/loadmodel", headers=headers, json=data) + # result = requests.post(SERVER_ENDPOINT+"/inferences/server/loadmodel", + # headers=headers, json=data) print(result.json()) -async def send_request(session, prompt): +async def send_request(session, prompt,sleep = 0): + await asyncio.sleep(sleep) headers = {"Content-Type": "application/json"} - data = {"model": "meta-llama3.1-8b-instruct", + data = {"model": "meta-llama3.1-8b-instruct", "max_tokens": MAX_TOKENS, "stop": ["<|eom_id|>", "<|end_of_text|>", "<|eot_id|>"],"engine": "cortex.llamacpp", "messages": [{"role": "user", "content": prompt},]} async with session.post(SERVER_ENDPOINT+"/v1/chat/completions", headers=headers, json=data) as resp: result = await resp.json() return result +async def one_user(session, prompt): + tasks = [send_request(session, prompt,random.random()*0.2+ i ) for i in range(NUM_ROUNDS)] + results = await asyncio.gather(*tasks) + return results + async def send_request_sequence(): # warm up - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout()) as session: res = await send_request(session, "What is GPU?") start = time.time() total_token_processed = 0 - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout()) as session: tasks = [] prompts = ["What is GPU?", "Who won the world cup 2022?", "Tell me some dad's joke", "Write a quick sort function", "What is the price of Nvidia H100?", "Who won the world series in 2020?"] - for number in range(TOTAL_REQUESTS): + for number in range(TOTAL_USERS): res = await send_request(session, random.choice(prompts)) if res.get("usage"): total_token_processed += res["usage"]["total_tokens"] else: print(res) - + end = time.time() print("Finished in", end-start, "s") print("Total token:", total_token_processed) - print("Throughput when run in sequence:", total_token_processed/(end-start), "tokens/s") + print("Throughput when run in sequence:", + total_token_processed/(end-start), "tokens/s") print("------------------------------------------------------------------------") async def main(): # warm up - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout()) as session: res = await send_request(session, "What is GPU?") start = time.time() total_token_processed = 0 - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout()) as session: tasks = [] - prompts = ["What is GPU?", "Who won the world cup 2022?", "Tell me some dad's joke", - "Write a quick sort function", "What is the price of Nvidia H100?", "Who won the world series in 2020?"] - for number in range(TOTAL_REQUESTS): + prompts = [ + "What is GPU?", + "Who won the world cup 2022?", + "Tell me so many dad's joke,", + "Write a quick sort function,", + "What is the price of Nvidia H100?", + "Who won the world series in 2020?", + "Tell me a very long story,", + "Who is the best football player in the world?", + "Tell me about compiler,", + "Tell me about AI,"] + for number in range(TOTAL_USERS): tasks.append(asyncio.ensure_future( - send_request(session, random.choice(prompts)))) - - results = await asyncio.gather(*tasks) - for res in results: - # print(res) - if res.get("usage"): - total_token_processed += res["usage"]["total_tokens"] - else: - print(res) + one_user(session, random.choice(prompts)))) + + list_results = await asyncio.gather(*tasks) + for results in list_results: + for res in results: + # print(res) + if res.get("usage"): + total_token_processed += res["usage"]["total_tokens"] + else: + print(res) end = time.time() print("Finished in", end-start, "s") print("Total token:", total_token_processed) - print("Throughput when run parallel:", total_token_processed/(end-start), "tokens/s") + print("Throughput when run parallel:", + total_token_processed/(end-start), "tokens/s") print("------------------------------------------------------------------------") + with open("result.log","w") as writer: + for results in list_results: + for res in results: + try: + writer.write(res["choices"][0]["message"]["content"] + "\n\n") + except: + continue # start_server() load_model() asyncio.run(main()) -asyncio.run(send_request_sequence()) +# asyncio.run(send_request_sequence()) print("--- %s seconds ---" % (time.time() - start_time)) diff --git a/src/llama_server_context.cc b/src/llama_server_context.cc index bedb8a0..46ff330 100644 --- a/src/llama_server_context.cc +++ b/src/llama_server_context.cc @@ -834,6 +834,7 @@ void LlamaServerContext::SendError(int id_task, int id_multi, res.stop = false; res.error = true; res.result_json = {{"content", error}}; + LOG_ERROR << "Internel error catched " << error; { std::lock_guard lock(mutex_results); queue_results.push_back(res); @@ -1142,50 +1143,65 @@ int LlamaServerContext::SplitMultipromptTask(TaskServer& multiprompt_task) { } void LlamaServerContext::ProcessTasks() { + bool has_available_slot = false; while (true) { std::unique_lock l(mutex_tasks); if (queue_tasks.empty()) { l.unlock(); break; } - TaskServer task = queue_tasks.front(); - queue_tasks.erase(queue_tasks.begin()); - l.unlock(); - switch (task.type) { - case TaskType::kCompletionTask: { - LlamaClientSlot* slot = GetSlot(json_value(task.data, "slot_id", -1)); - if (slot == nullptr) { - LOG_WARN << "slot unavailable"; - // send error result - SendError(task, "slot unavailable"); - return; - } - if (task.data.contains("system_prompt")) { - ProcessSystemPromptData(task.data["system_prompt"]); - } - - slot->Reset(); + TaskServer task = queue_tasks.front(); - slot->infill = task.infill_mode; - slot->embedding = task.embedding_mode; - slot->task_id = task.id; - slot->multitask_id = task.multitask_id; + if (task.type == TaskType::kCancelTask) { + queue_tasks.erase(queue_tasks.begin()); - if (!LaunchSlotWithData(slot, task.data)) { - // send error result - SendError(task, "internal_error"); + for (auto& slot : slots) { + if (slot.task_id == task.target_id) { + slot.Release(); break; } - } break; - case TaskType::kCancelTask: { // release slot linked with the task id - for (auto& slot : slots) { - if (slot.task_id == task.target_id) { - slot.Release(); - break; - } + } + l.unlock(); + } else if (task.type == TaskType::kCompletionTask) { + LlamaClientSlot* slot = nullptr; + int64_t t_last = ggml_time_us(); + for (LlamaClientSlot& slot_ : slots) { + if (slot_.Available() && slot_.t_last_used < t_last) { + has_available_slot = true; + slot = GetSlot(json_value(task.data, "slot_id", -1)); + break; } - } break; + } + if (!has_available_slot || slot == nullptr) { + l.unlock(); + return; + } + queue_tasks.erase(queue_tasks.begin()); + l.unlock(); + if (slot == nullptr) { + LOG_WARN << "slot unavailable"; + // send error result + SendError(task, "slot unavailable"); + return; + } + + if (task.data.contains("system_prompt")) { + ProcessSystemPromptData(task.data["system_prompt"]); + } + + slot->Reset(); + + slot->infill = task.infill_mode; + slot->embedding = task.embedding_mode; + slot->task_id = task.id; + slot->multitask_id = task.multitask_id; + + if (!LaunchSlotWithData(slot, task.data)) { + // send error result + SendError(task, "internal_error"); + return; + } } } From c0a25b0aa1187d7497ca302bdaa51d059eae698a Mon Sep 17 00:00:00 2001 From: nguyenhoangthuan99 Date: Thu, 8 Aug 2024 02:38:29 +0000 Subject: [PATCH 2/2] update logic --- src/llama_server_context.cc | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/src/llama_server_context.cc b/src/llama_server_context.cc index 46ff330..c3bc807 100644 --- a/src/llama_server_context.cc +++ b/src/llama_server_context.cc @@ -1143,7 +1143,6 @@ int LlamaServerContext::SplitMultipromptTask(TaskServer& multiprompt_task) { } void LlamaServerContext::ProcessTasks() { - bool has_available_slot = false; while (true) { std::unique_lock l(mutex_tasks); if (queue_tasks.empty()) { @@ -1164,16 +1163,8 @@ void LlamaServerContext::ProcessTasks() { } l.unlock(); } else if (task.type == TaskType::kCompletionTask) { - LlamaClientSlot* slot = nullptr; - int64_t t_last = ggml_time_us(); - for (LlamaClientSlot& slot_ : slots) { - if (slot_.Available() && slot_.t_last_used < t_last) { - has_available_slot = true; - slot = GetSlot(json_value(task.data, "slot_id", -1)); - break; - } - } - if (!has_available_slot || slot == nullptr) { + LlamaClientSlot* slot = GetSlot(json_value(task.data, "slot_id", -1)); + if (slot == nullptr) { l.unlock(); return; } @@ -1196,7 +1187,7 @@ void LlamaServerContext::ProcessTasks() { slot->embedding = task.embedding_mode; slot->task_id = task.id; slot->multitask_id = task.multitask_id; - + if (!LaunchSlotWithData(slot, task.data)) { // send error result SendError(task, "internal_error");