Skip to content

Commit

Permalink
Add CB naive chat (#644)
Browse files Browse the repository at this point in the history
Co-authored-by: Anastasiia Pnevskaia <anastasiia.pnevskaia@intel.com>
  • Loading branch information
Wovchena and popovaan authored Jul 24, 2024
1 parent 12f8e44 commit f9e45e1
Show file tree
Hide file tree
Showing 14 changed files with 411 additions and 50 deletions.
20 changes: 18 additions & 2 deletions src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "openvino/genai/tokenizer.hpp"
#include "openvino/genai/generation_config.hpp"
#include "openvino/genai/generation_handle.hpp"
#include "openvino/genai/llm_pipeline.hpp"
#include "openvino/genai/streamer_base.hpp"
#include "openvino/genai/visibility.hpp"

namespace ov::genai {
Expand Down Expand Up @@ -56,13 +58,27 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline {

PipelineMetrics get_metrics() const;

GenerationHandle add_request(uint64_t request_id, std::string prompt, ov::genai::GenerationConfig sampling_params);
GenerationHandle add_request(uint64_t request_id, const ov::Tensor& input_ids, const ov::genai::GenerationConfig& sampling_params);
GenerationHandle add_request(uint64_t request_id, const std::string& prompt, const ov::genai::GenerationConfig& sampling_params);

void step();

bool has_non_finished_requests();

// more high level interface, which can process multiple prompts in continuous batching manner
std::vector<GenerationResult> generate(const std::vector<std::string>& prompts, std::vector<ov::genai::GenerationConfig> sampling_params);
std::vector<EncodedGenerationResult> generate(const std::vector<ov::Tensor>& input_ids, const std::vector<ov::genai::GenerationConfig>& sampling_params, const ov::genai::StreamerVariant& streamer=std::monostate{});
std::vector<GenerationResult> generate(const std::vector<std::string>& prompts, const std::vector<ov::genai::GenerationConfig>& sampling_params, const ov::genai::StreamerVariant& streamer=std::monostate{});

/**
* @brief start chat with keeping history in kv cache.
*
* @param system_message optional system message.
*/
void start_chat(const std::string& system_message = "");

/**
* @brief finish chat and clear kv cache.
*/
void finish_chat();
};
}
15 changes: 15 additions & 0 deletions src/cpp/include/openvino/genai/generation_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@ enum class GenerationStatus {
DROPPED_BY_HANDLE = 4 // Status set when generation handle is dropped
};

struct EncodedGenerationResult {
// request ID - obsolete when handle API is approved as handle will connect results with prompts.
uint64_t m_request_id;

// in a generic case we have multiple generation results per initial prompt
// depending on sampling parameters (e.g. beam search or parallel sampling)
std::vector<std::vector<int64_t>> m_generation_ids;
// scores
std::vector<float> m_scores;

// Status of generation
GenerationStatus m_status = GenerationStatus::RUNNING;
};

struct GenerationResult {
// request ID - obsolete when handle API is approved as handle will connect results with prompts.
uint64_t m_request_id;
Expand Down Expand Up @@ -60,6 +74,7 @@ class OPENVINO_GENAI_EXPORTS GenerationHandleImpl {

bool can_read();

GenerationOutputs back();
// Reads result of a generation for single iteration
GenerationOutputs read();
// Reads all generated tokens for all sequences
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/include/openvino/genai/llm_pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
namespace ov {
namespace genai {

// Return flag corresponds whether generation should be stopped: false means continue generation, true means stop.
// Return flag correspods whether generation should be stopped: false means continue generation, true means stop.
using StreamerVariant = std::variant<std::function<bool(std::string)>, std::shared_ptr<StreamerBase>, std::monostate>;
using OptionalGenerationConfig = std::optional<GenerationConfig>;
using EncodedInputs = std::variant<ov::Tensor, TokenizedInputs>;
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/include/openvino/genai/scheduler_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ struct SchedulerConfig {
std::size_t num_kv_blocks = 0;

// total size of KV cache in GB
std::size_t cache_size = 0;
std::size_t cache_size = 1;

// block size for KV cache
std::size_t block_size = 32;
Expand Down
146 changes: 121 additions & 25 deletions src/cpp/src/continuous_batching_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,21 @@
#include <memory>

#include "openvino/genai/continuous_batching_pipeline.hpp"
#include "openvino/genai/generation_handle.hpp"
#include "openvino/genai/tokenizer.hpp"
#include "cache_manager.hpp"
#include "sampler.hpp"
#include "model_runner.hpp"
#include "scheduler.hpp"
#include "text_callback_streamer.hpp"
#include "timer.hpp"
#include "debug_utils.hpp"

using namespace ov::genai;

template<class... Ts> struct overloaded : Ts... {using Ts::operator()...;};
template<class... Ts> overloaded(Ts...) -> overloaded<Ts...>;

void apply_paged_attention_transformations(std::shared_ptr<ov::Model> model, DeviceConfig& device_config);

class ContinuousBatchingPipeline::Impl {
Expand Down Expand Up @@ -51,6 +56,8 @@ class ContinuousBatchingPipeline::Impl {
std::vector<SequenceGroup::Ptr> m_awaiting_requests;
// Mutex protecting access to m_awaiting_requests, so add_request and step methods can be called from different threads
std::mutex m_awaiting_requests_mutex;
bool m_is_chat_conversation = false;
ChatHistory m_history;


void _free_non_running_requests() {
Expand Down Expand Up @@ -120,18 +127,9 @@ class ContinuousBatchingPipeline::Impl {
return m_tokenizer;
}

GenerationHandle add_request(uint64_t request_id, std::string prompt, ov::genai::GenerationConfig sampling_params) {
GenerationHandle add_request(uint64_t request_id, const ov::Tensor& input_ids, ov::genai::GenerationConfig sampling_params) {
sampling_params.set_eos_token_id(m_tokenizer.get_eos_token_id());
sampling_params.validate();

ov::Tensor input_ids;
{
static ManualTimer timer("tokenize");
timer.start();
input_ids = m_tokenizer.encode(prompt).input_ids;
timer.end();
}

SequenceGroup::Ptr sequence_group = std::make_shared<SequenceGroup>(request_id, input_ids,
sampling_params, m_scheduler->get_config().block_size);
{
Expand All @@ -141,6 +139,14 @@ class ContinuousBatchingPipeline::Impl {
return std::make_unique<GenerationHandleImpl>(sequence_group->get_generation_stream(), sampling_params);
}

GenerationHandle add_request(uint64_t request_id, const std::string& prompt, ov::genai::GenerationConfig sampling_params) {
static ManualTimer timer("tokenize");
timer.start();
ov::Tensor input_ids = m_tokenizer.encode(prompt).input_ids;
timer.end();
return add_request(request_id, input_ids, sampling_params);
}

void step() {
static ManualTimer step_timer("step()");
step_timer.start();
Expand Down Expand Up @@ -238,25 +244,47 @@ class ContinuousBatchingPipeline::Impl {
return !m_awaiting_requests.empty() || !m_requests.empty();
}

std::vector<GenerationResult> generate(const std::vector<std::string> prompts, std::vector<ov::genai::GenerationConfig> sampling_params) {
std::vector<EncodedGenerationResult> generate(const std::vector<ov::Tensor>& input_ids, const std::vector<GenerationConfig>& sampling_params, const StreamerVariant& streamer) {
OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request");
OPENVINO_ASSERT(prompts.size() == sampling_params.size());
OPENVINO_ASSERT(input_ids.size() == sampling_params.size());
const std::shared_ptr<StreamerBase>& streamer_ptr = std::visit(overloaded{
[](std::monostate) -> std::shared_ptr<StreamerBase> {
return nullptr;
},
[](const std::shared_ptr<StreamerBase>& streamer) {
return streamer;
},
[this](const std::function<bool(std::string)>& streamer) -> std::shared_ptr<StreamerBase> {
return std::make_unique<TextCallbackStreamer>(m_tokenizer, streamer);
}
}, streamer);

std::vector<GenerationHandle> generations;
for (size_t request_id = 0; request_id < prompts.size(); ++request_id) {
generations.push_back(add_request(request_id, prompts[request_id], sampling_params[request_id]));
for (size_t request_id = 0; request_id < input_ids.size(); ++request_id) {
OPENVINO_ASSERT(1 == input_ids[request_id].get_shape().at(0), "Use multiple tensors to pass a batch.");
generations.push_back(add_request(request_id, input_ids[request_id], sampling_params[request_id]));
}

std::vector<GenerationResult> results;
std::vector<EncodedGenerationResult> results;
results.reserve(m_awaiting_requests.size());

while (has_non_finished_requests()) {
bool continue_generation = true;
while (has_non_finished_requests() && continue_generation) {
step();
if (streamer_ptr) {
std::unordered_map<uint64_t, GenerationOutput> token = generations.at(0).get()->back();
OPENVINO_ASSERT(1 == token.size());
OPENVINO_ASSERT(1 == token.begin()->second.generated_token_ids.size());
continue_generation = !streamer_ptr->put(token.begin()->second.generated_token_ids.at(0));
}
}
if (streamer_ptr) {
streamer_ptr->end();
}

for (size_t generation_idx = 0; generation_idx < generations.size(); ++generation_idx) {
const auto& generation = generations[generation_idx];
GenerationResult result;
EncodedGenerationResult result;
result.m_request_id = 1;
std::vector<GenerationOutput> generation_outputs = generation->read_all();
std::sort(generation_outputs.begin(), generation_outputs.end(), [=] (GenerationOutput& r1, GenerationOutput& r2) {
Expand All @@ -266,17 +294,69 @@ class ContinuousBatchingPipeline::Impl {
auto num_outputs = std::min(sampling_params[generation_idx].num_return_sequences, generation_outputs.size());
for (size_t generation_output_idx = 0; generation_output_idx < num_outputs; ++generation_output_idx) {
const auto& generation_output = generation_outputs[generation_output_idx];
std::string output_text = m_tokenizer.decode(generation_output.generated_token_ids);
result.m_generation_ids.push_back(output_text);
result.m_generation_ids.push_back(std::move(generation_output.generated_token_ids));
result.m_scores.push_back(generation_output.score);
}
result.m_status = generation->get_status();
results.push_back(result);
results.push_back(std::move(result));
}

OPENVINO_ASSERT(results.size() == prompts.size());
OPENVINO_ASSERT(results.size() == input_ids.size());
return results;
}

std::vector<GenerationResult> generate(const std::vector<std::string>& prompts, std::vector<ov::genai::GenerationConfig> sampling_params, const StreamerVariant& streamer) {
std::vector<ov::Tensor> input_ids;
static ManualTimer timer("tokenize");
if (m_is_chat_conversation) {
OPENVINO_ASSERT(1 == prompts.size(), "Can't chat with multiple prompts");
m_history.push_back({{"role", "user"}, {"content", prompts.at(0)}});
constexpr bool add_generation_prompt = true;
std::string history = m_tokenizer.apply_chat_template(m_history, add_generation_prompt);
timer.start();
input_ids.push_back(m_tokenizer.encode(history).input_ids);
timer.end();
} else {
input_ids.reserve(prompts.size());
for (const std::string& prompt : prompts) {
timer.start();
input_ids.push_back(m_tokenizer.encode(prompt).input_ids);
timer.end();
}
}
std::vector<EncodedGenerationResult> encoded = generate(input_ids, sampling_params, streamer);
std::vector<GenerationResult> decoded;
decoded.reserve(encoded.size());
for (EncodedGenerationResult& res : encoded) {
std::vector<std::string> generated;
generated.reserve(res.m_generation_ids.size());
for (size_t idx = 0; idx < res.m_generation_ids.size(); ++idx) {
generated.push_back(m_tokenizer.decode(res.m_generation_ids.at(idx)));
if (m_is_chat_conversation && 0 == idx) {
m_history.push_back({{"role", "assistant"}, {"content", generated.back()}});
}
}
decoded.push_back(GenerationResult{
res.m_request_id,
std::move(generated),
std::move(res.m_scores),
res.m_status
});
}
return decoded;
}

void start_chat(const std::string& system_message) {
if (!system_message.empty()) {
m_history.push_back({{"role", "system"}, {"content", system_message}});
}
m_is_chat_conversation = true;
};

void finish_chat() {
m_is_chat_conversation = false;
m_history.clear();
};
};

ContinuousBatchingPipeline::ContinuousBatchingPipeline( const std::string& models_path,
Expand Down Expand Up @@ -307,10 +387,14 @@ PipelineMetrics ContinuousBatchingPipeline::get_metrics() const{
return m_impl->get_metrics();
}

GenerationHandle ContinuousBatchingPipeline::add_request(uint64_t request_id, std::string prompt, ov::genai::GenerationConfig sampling_params) {
GenerationHandle ContinuousBatchingPipeline::add_request(uint64_t request_id, const std::string& prompt, const ov::genai::GenerationConfig& sampling_params) {
return m_impl->add_request(request_id, prompt, sampling_params);
}

GenerationHandle ContinuousBatchingPipeline::add_request(uint64_t request_id, const ov::Tensor& input_ids, const ov::genai::GenerationConfig& sampling_params) {
return m_impl->add_request(request_id, input_ids, sampling_params);
}

void ContinuousBatchingPipeline::step() {
m_impl->step();
}
Expand All @@ -319,6 +403,18 @@ bool ContinuousBatchingPipeline::has_non_finished_requests() {
return m_impl->has_non_finished_requests();
}

std::vector<GenerationResult> ContinuousBatchingPipeline::generate(const std::vector<std::string>& prompts, std::vector<ov::genai::GenerationConfig> sampling_params) {
return m_impl->generate(prompts, sampling_params);
}
std::vector<EncodedGenerationResult> ContinuousBatchingPipeline::generate(const std::vector<ov::Tensor>& input_ids, const std::vector<ov::genai::GenerationConfig>& sampling_params, const StreamerVariant& streamer) {
return m_impl->generate(input_ids, sampling_params, streamer);
}

std::vector<GenerationResult> ContinuousBatchingPipeline::generate(const std::vector<std::string>& prompts, const std::vector<ov::genai::GenerationConfig>& sampling_params, const StreamerVariant& streamer) {
return m_impl->generate(prompts, sampling_params, streamer);
}

void ContinuousBatchingPipeline::start_chat(const std::string& system_message) {
m_impl->start_chat(system_message);
};

void ContinuousBatchingPipeline::finish_chat() {
m_impl->finish_chat();
};
4 changes: 4 additions & 0 deletions src/cpp/src/generation_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ bool GenerationHandleImpl::can_read() {
return m_generation_stream->can_read();
}

std::unordered_map<uint64_t, GenerationOutput> GenerationHandleImpl::back() {
return m_generation_stream->back();
}

std::unordered_map<uint64_t, GenerationOutput> GenerationHandleImpl::read() {
return m_generation_stream->read();
}
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/src/generation_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class GenerationStream {
}

// Retriving vector of pairs <sequence_id, token_id> as we can generate multiple outputs for a single prompt
GenerationOutputs back() {
return m_output_queue.back();
}
GenerationOutputs read() {
return m_output_queue.pull();
}
Expand Down
Loading

0 comments on commit f9e45e1

Please sign in to comment.