diff --git a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp index f5f8c5330..626a51c5d 100644 --- a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp +++ b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp @@ -10,6 +10,8 @@ #include "openvino/genai/tokenizer.hpp" #include "openvino/genai/generation_config.hpp" #include "openvino/genai/generation_handle.hpp" +#include "openvino/genai/llm_pipeline.hpp" +#include "openvino/genai/streamer_base.hpp" #include "openvino/genai/visibility.hpp" namespace ov::genai { @@ -56,13 +58,27 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { PipelineMetrics get_metrics() const; - GenerationHandle add_request(uint64_t request_id, std::string prompt, ov::genai::GenerationConfig sampling_params); + GenerationHandle add_request(uint64_t request_id, const ov::Tensor& input_ids, const ov::genai::GenerationConfig& sampling_params); + GenerationHandle add_request(uint64_t request_id, const std::string& prompt, const ov::genai::GenerationConfig& sampling_params); void step(); bool has_non_finished_requests(); // more high level interface, which can process multiple prompts in continuous batching manner - std::vector generate(const std::vector& prompts, std::vector sampling_params); + std::vector generate(const std::vector& input_ids, const std::vector& sampling_params, const ov::genai::StreamerVariant& streamer=std::monostate{}); + std::vector generate(const std::vector& prompts, const std::vector& sampling_params, const ov::genai::StreamerVariant& streamer=std::monostate{}); + + /** + * @brief start chat with keeping history in kv cache. + * + * @param system_message optional system message. + */ + void start_chat(const std::string& system_message = ""); + + /** + * @brief finish chat and clear kv cache. + */ + void finish_chat(); }; } diff --git a/src/cpp/include/openvino/genai/generation_handle.hpp b/src/cpp/include/openvino/genai/generation_handle.hpp index d0ddbc3a3..8d00ae0e9 100644 --- a/src/cpp/include/openvino/genai/generation_handle.hpp +++ b/src/cpp/include/openvino/genai/generation_handle.hpp @@ -18,6 +18,20 @@ enum class GenerationStatus { DROPPED_BY_HANDLE = 4 // Status set when generation handle is dropped }; +struct EncodedGenerationResult { + // request ID - obsolete when handle API is approved as handle will connect results with prompts. + uint64_t m_request_id; + + // in a generic case we have multiple generation results per initial prompt + // depending on sampling parameters (e.g. beam search or parallel sampling) + std::vector> m_generation_ids; + // scores + std::vector m_scores; + + // Status of generation + GenerationStatus m_status = GenerationStatus::RUNNING; +}; + struct GenerationResult { // request ID - obsolete when handle API is approved as handle will connect results with prompts. uint64_t m_request_id; @@ -60,6 +74,7 @@ class OPENVINO_GENAI_EXPORTS GenerationHandleImpl { bool can_read(); + GenerationOutputs back(); // Reads result of a generation for single iteration GenerationOutputs read(); // Reads all generated tokens for all sequences diff --git a/src/cpp/include/openvino/genai/llm_pipeline.hpp b/src/cpp/include/openvino/genai/llm_pipeline.hpp index 84dc02bd5..abd4ee5a4 100644 --- a/src/cpp/include/openvino/genai/llm_pipeline.hpp +++ b/src/cpp/include/openvino/genai/llm_pipeline.hpp @@ -14,7 +14,7 @@ namespace ov { namespace genai { -// Return flag corresponds whether generation should be stopped: false means continue generation, true means stop. +// Return flag correspods whether generation should be stopped: false means continue generation, true means stop. using StreamerVariant = std::variant, std::shared_ptr, std::monostate>; using OptionalGenerationConfig = std::optional; using EncodedInputs = std::variant; diff --git a/src/cpp/include/openvino/genai/scheduler_config.hpp b/src/cpp/include/openvino/genai/scheduler_config.hpp index 787060d07..9d808fd42 100644 --- a/src/cpp/include/openvino/genai/scheduler_config.hpp +++ b/src/cpp/include/openvino/genai/scheduler_config.hpp @@ -16,7 +16,7 @@ struct SchedulerConfig { std::size_t num_kv_blocks = 0; // total size of KV cache in GB - std::size_t cache_size = 0; + std::size_t cache_size = 1; // block size for KV cache std::size_t block_size = 32; diff --git a/src/cpp/src/continuous_batching_pipeline.cpp b/src/cpp/src/continuous_batching_pipeline.cpp index 55100f3cb..a66a88cad 100644 --- a/src/cpp/src/continuous_batching_pipeline.cpp +++ b/src/cpp/src/continuous_batching_pipeline.cpp @@ -6,16 +6,21 @@ #include #include "openvino/genai/continuous_batching_pipeline.hpp" +#include "openvino/genai/generation_handle.hpp" #include "openvino/genai/tokenizer.hpp" #include "cache_manager.hpp" #include "sampler.hpp" #include "model_runner.hpp" #include "scheduler.hpp" +#include "text_callback_streamer.hpp" #include "timer.hpp" #include "debug_utils.hpp" using namespace ov::genai; +template struct overloaded : Ts... {using Ts::operator()...;}; +template overloaded(Ts...) -> overloaded; + void apply_paged_attention_transformations(std::shared_ptr model, DeviceConfig& device_config); class ContinuousBatchingPipeline::Impl { @@ -51,6 +56,8 @@ class ContinuousBatchingPipeline::Impl { std::vector m_awaiting_requests; // Mutex protecting access to m_awaiting_requests, so add_request and step methods can be called from different threads std::mutex m_awaiting_requests_mutex; + bool m_is_chat_conversation = false; + ChatHistory m_history; void _free_non_running_requests() { @@ -120,18 +127,9 @@ class ContinuousBatchingPipeline::Impl { return m_tokenizer; } - GenerationHandle add_request(uint64_t request_id, std::string prompt, ov::genai::GenerationConfig sampling_params) { + GenerationHandle add_request(uint64_t request_id, const ov::Tensor& input_ids, ov::genai::GenerationConfig sampling_params) { sampling_params.set_eos_token_id(m_tokenizer.get_eos_token_id()); sampling_params.validate(); - - ov::Tensor input_ids; - { - static ManualTimer timer("tokenize"); - timer.start(); - input_ids = m_tokenizer.encode(prompt).input_ids; - timer.end(); - } - SequenceGroup::Ptr sequence_group = std::make_shared(request_id, input_ids, sampling_params, m_scheduler->get_config().block_size); { @@ -141,6 +139,14 @@ class ContinuousBatchingPipeline::Impl { return std::make_unique(sequence_group->get_generation_stream(), sampling_params); } + GenerationHandle add_request(uint64_t request_id, const std::string& prompt, ov::genai::GenerationConfig sampling_params) { + static ManualTimer timer("tokenize"); + timer.start(); + ov::Tensor input_ids = m_tokenizer.encode(prompt).input_ids; + timer.end(); + return add_request(request_id, input_ids, sampling_params); + } + void step() { static ManualTimer step_timer("step()"); step_timer.start(); @@ -238,25 +244,47 @@ class ContinuousBatchingPipeline::Impl { return !m_awaiting_requests.empty() || !m_requests.empty(); } - std::vector generate(const std::vector prompts, std::vector sampling_params) { + std::vector generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request"); - OPENVINO_ASSERT(prompts.size() == sampling_params.size()); + OPENVINO_ASSERT(input_ids.size() == sampling_params.size()); + const std::shared_ptr& streamer_ptr = std::visit(overloaded{ + [](std::monostate) -> std::shared_ptr { + return nullptr; + }, + [](const std::shared_ptr& streamer) { + return streamer; + }, + [this](const std::function& streamer) -> std::shared_ptr { + return std::make_unique(m_tokenizer, streamer); + } + }, streamer); std::vector generations; - for (size_t request_id = 0; request_id < prompts.size(); ++request_id) { - generations.push_back(add_request(request_id, prompts[request_id], sampling_params[request_id])); + for (size_t request_id = 0; request_id < input_ids.size(); ++request_id) { + OPENVINO_ASSERT(1 == input_ids[request_id].get_shape().at(0), "Use multiple tensors to pass a batch."); + generations.push_back(add_request(request_id, input_ids[request_id], sampling_params[request_id])); } - std::vector results; + std::vector results; results.reserve(m_awaiting_requests.size()); - while (has_non_finished_requests()) { + bool continue_generation = true; + while (has_non_finished_requests() && continue_generation) { step(); + if (streamer_ptr) { + std::unordered_map token = generations.at(0).get()->back(); + OPENVINO_ASSERT(1 == token.size()); + OPENVINO_ASSERT(1 == token.begin()->second.generated_token_ids.size()); + continue_generation = !streamer_ptr->put(token.begin()->second.generated_token_ids.at(0)); + } + } + if (streamer_ptr) { + streamer_ptr->end(); } for (size_t generation_idx = 0; generation_idx < generations.size(); ++generation_idx) { const auto& generation = generations[generation_idx]; - GenerationResult result; + EncodedGenerationResult result; result.m_request_id = 1; std::vector generation_outputs = generation->read_all(); std::sort(generation_outputs.begin(), generation_outputs.end(), [=] (GenerationOutput& r1, GenerationOutput& r2) { @@ -266,17 +294,69 @@ class ContinuousBatchingPipeline::Impl { auto num_outputs = std::min(sampling_params[generation_idx].num_return_sequences, generation_outputs.size()); for (size_t generation_output_idx = 0; generation_output_idx < num_outputs; ++generation_output_idx) { const auto& generation_output = generation_outputs[generation_output_idx]; - std::string output_text = m_tokenizer.decode(generation_output.generated_token_ids); - result.m_generation_ids.push_back(output_text); + result.m_generation_ids.push_back(std::move(generation_output.generated_token_ids)); result.m_scores.push_back(generation_output.score); } result.m_status = generation->get_status(); - results.push_back(result); + results.push_back(std::move(result)); } - OPENVINO_ASSERT(results.size() == prompts.size()); + OPENVINO_ASSERT(results.size() == input_ids.size()); return results; } + + std::vector generate(const std::vector& prompts, std::vector sampling_params, const StreamerVariant& streamer) { + std::vector input_ids; + static ManualTimer timer("tokenize"); + if (m_is_chat_conversation) { + OPENVINO_ASSERT(1 == prompts.size(), "Can't chat with multiple prompts"); + m_history.push_back({{"role", "user"}, {"content", prompts.at(0)}}); + constexpr bool add_generation_prompt = true; + std::string history = m_tokenizer.apply_chat_template(m_history, add_generation_prompt); + timer.start(); + input_ids.push_back(m_tokenizer.encode(history).input_ids); + timer.end(); + } else { + input_ids.reserve(prompts.size()); + for (const std::string& prompt : prompts) { + timer.start(); + input_ids.push_back(m_tokenizer.encode(prompt).input_ids); + timer.end(); + } + } + std::vector encoded = generate(input_ids, sampling_params, streamer); + std::vector decoded; + decoded.reserve(encoded.size()); + for (EncodedGenerationResult& res : encoded) { + std::vector generated; + generated.reserve(res.m_generation_ids.size()); + for (size_t idx = 0; idx < res.m_generation_ids.size(); ++idx) { + generated.push_back(m_tokenizer.decode(res.m_generation_ids.at(idx))); + if (m_is_chat_conversation && 0 == idx) { + m_history.push_back({{"role", "assistant"}, {"content", generated.back()}}); + } + } + decoded.push_back(GenerationResult{ + res.m_request_id, + std::move(generated), + std::move(res.m_scores), + res.m_status + }); + } + return decoded; + } + + void start_chat(const std::string& system_message) { + if (!system_message.empty()) { + m_history.push_back({{"role", "system"}, {"content", system_message}}); + } + m_is_chat_conversation = true; + }; + + void finish_chat() { + m_is_chat_conversation = false; + m_history.clear(); + }; }; ContinuousBatchingPipeline::ContinuousBatchingPipeline( const std::string& models_path, @@ -307,10 +387,14 @@ PipelineMetrics ContinuousBatchingPipeline::get_metrics() const{ return m_impl->get_metrics(); } -GenerationHandle ContinuousBatchingPipeline::add_request(uint64_t request_id, std::string prompt, ov::genai::GenerationConfig sampling_params) { +GenerationHandle ContinuousBatchingPipeline::add_request(uint64_t request_id, const std::string& prompt, const ov::genai::GenerationConfig& sampling_params) { return m_impl->add_request(request_id, prompt, sampling_params); } +GenerationHandle ContinuousBatchingPipeline::add_request(uint64_t request_id, const ov::Tensor& input_ids, const ov::genai::GenerationConfig& sampling_params) { + return m_impl->add_request(request_id, input_ids, sampling_params); +} + void ContinuousBatchingPipeline::step() { m_impl->step(); } @@ -319,6 +403,18 @@ bool ContinuousBatchingPipeline::has_non_finished_requests() { return m_impl->has_non_finished_requests(); } -std::vector ContinuousBatchingPipeline::generate(const std::vector& prompts, std::vector sampling_params) { - return m_impl->generate(prompts, sampling_params); -} \ No newline at end of file +std::vector ContinuousBatchingPipeline::generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { + return m_impl->generate(input_ids, sampling_params, streamer); +} + +std::vector ContinuousBatchingPipeline::generate(const std::vector& prompts, const std::vector& sampling_params, const StreamerVariant& streamer) { + return m_impl->generate(prompts, sampling_params, streamer); +} + +void ContinuousBatchingPipeline::start_chat(const std::string& system_message) { + m_impl->start_chat(system_message); +}; + +void ContinuousBatchingPipeline::finish_chat() { + m_impl->finish_chat(); +}; diff --git a/src/cpp/src/generation_handle.cpp b/src/cpp/src/generation_handle.cpp index a0187025e..26cc12604 100644 --- a/src/cpp/src/generation_handle.cpp +++ b/src/cpp/src/generation_handle.cpp @@ -20,6 +20,10 @@ bool GenerationHandleImpl::can_read() { return m_generation_stream->can_read(); } +std::unordered_map GenerationHandleImpl::back() { + return m_generation_stream->back(); +} + std::unordered_map GenerationHandleImpl::read() { return m_generation_stream->read(); } diff --git a/src/cpp/src/generation_stream.hpp b/src/cpp/src/generation_stream.hpp index 0d51897e8..1ac2eefef 100644 --- a/src/cpp/src/generation_stream.hpp +++ b/src/cpp/src/generation_stream.hpp @@ -31,6 +31,9 @@ class GenerationStream { } // Retriving vector of pairs as we can generate multiple outputs for a single prompt + GenerationOutputs back() { + return m_output_queue.back(); + } GenerationOutputs read() { return m_output_queue.pull(); } diff --git a/src/cpp/src/llm_pipeline.cpp b/src/cpp/src/llm_pipeline.cpp index 507d988a6..1d68d4c74 100644 --- a/src/cpp/src/llm_pipeline.cpp +++ b/src/cpp/src/llm_pipeline.cpp @@ -7,6 +7,7 @@ #include #include #include +#include "openvino/genai/continuous_batching_pipeline.hpp" #include "openvino/genai/generation_config.hpp" #include "openvino/genai/llm_pipeline.hpp" #include "llm_pipeline_base.hpp" @@ -114,6 +115,7 @@ class StatefulLLMPipeline final : public LLMPipelineImplBase { EncodedInputs encoded_input; if (auto input_vector = std::get_if>(&inputs)) { + OPENVINO_ASSERT(!is_chat_conversation, "Can't chat with multiple prompts"); encoded_input = m_tokenizer.encode(*input_vector); } else if (auto input_prompt = std::get_if(&inputs)) { std::string& prompt = *input_prompt; @@ -334,7 +336,151 @@ std::pair generation_config(const GenerationConfig& config) { } // namespace genai } // namespace ov -using namespace std; +namespace { +using namespace ov::genai; + +template struct overloaded : Ts... {using Ts::operator()...;}; +template overloaded(Ts...) -> overloaded; + +Tokenizer dont_construct() { + OPENVINO_THROW("Continuous Batching backend can't be constructed" + "from ireq because the model must be transformed"); +} + +class ContinuousBatchingAdapter final : public LLMPipelineImplBase { +public: + ContinuousBatchingPipeline m_impl; + + ContinuousBatchingAdapter( + const ov::InferRequest& request, + const Tokenizer& tokenizer, + OptionalGenerationConfig generation_config + ): LLMPipelineImplBase{dont_construct()}, m_impl{"", {}} {} + + ContinuousBatchingAdapter( + const std::filesystem::path& model_path, + const Tokenizer& tokenizer, + const std::string& device, + const ov::AnyMap& plugin_config + ): LLMPipelineImplBase{tokenizer}, m_impl{ + model_path.string(), + tokenizer, + SchedulerConfig{}, + device, + plugin_config + } {} + + ContinuousBatchingAdapter( + const std::filesystem::path& model_path, + const std::string& device, + const ov::AnyMap& plugin_config + ): LLMPipelineImplBase{Tokenizer(model_path.string())}, m_impl{ + model_path.string(), + m_tokenizer, + SchedulerConfig{}, + device, + plugin_config + } {} + + DecodedResults generate( + StringInputs inputs, + OptionalGenerationConfig generation_config, + StreamerVariant streamer + ) override { + std::vector prompts = std::visit(overloaded{ + [](const std::string& prompt) { + return std::vector{prompt}; + }, + [](std::vector& prompts) { + return prompts; + } + }, inputs); + const GenerationConfig& config = generation_config.has_value() ? *generation_config : m_generation_config; + // -1 == config.eos_token_id and config.validate() are handled in m_impl. + std::vector generated = m_impl.generate( + prompts, + std::vector{prompts.size(), config}, + streamer + ); + std::vector plain_replies; + std::vector plain_scores; + for (GenerationResult& res : generated) { + if (GenerationStatus::FINISHED != res.m_status) { + OPENVINO_THROW("Got unfinished GenerationStatus"); + } + std::move(res.m_generation_ids.begin(), res.m_generation_ids.end(), std::back_inserter(plain_replies)); + std::move(res.m_scores.begin(), res.m_scores.end(), std::back_inserter(plain_scores)); + } + return {std::move(plain_replies), std::move(plain_scores)}; + } + + EncodedResults generate( + const EncodedInputs& inputs, + OptionalGenerationConfig generation_config, + StreamerVariant streamer + ) override { + std::vector input_ids = std::visit(overloaded{ + [](const ov::Tensor& inp) { + size_t batch_size = inp.get_shape().at(0); + if (1 == batch_size) { + return std::vector{inp}; + } + std::vector input_ids; + input_ids.reserve(batch_size); + size_t max_len = inp.get_shape().at(1); + const int64_t* const source = inp.data(); + for (size_t batch_id = 0; batch_id < batch_size; ++batch_id) { + input_ids.emplace_back(ov::element::i64, ov::Shape(1, max_len)); + int64_t* destination = input_ids.back().data(); + std::copy_n(source + batch_id * max_len, max_len, destination); + } + return input_ids; + }, + [](const TokenizedInputs& inp) { + size_t batch_size = inp.input_ids.get_shape().at(0); + std::vector input_ids; + input_ids.reserve(batch_size); + size_t max_len = inp.input_ids.get_shape().at(1); + const int64_t* const source = inp.input_ids.data(); + const int64_t* const attention_mask = inp.attention_mask.data(); + for (size_t batch_id = 0; batch_id < batch_size; ++batch_id) { + input_ids.emplace_back(ov::element::i64, ov::Shape(1, max_len)); + int64_t* destination = input_ids.back().data(); + size_t copy_count = 0; + for (size_t idx = 0; idx < max_len; ++idx) { + if (1 == attention_mask[batch_id * max_len + idx]) { + destination[copy_count++] = source[batch_id * max_len + idx]; + } + } + input_ids.back().set_shape({1, copy_count}); + } + return input_ids; + } + }, inputs); + const GenerationConfig& config = generation_config.has_value() ? *generation_config : m_generation_config; + // -1 == config.eos_token_id and config.validate() are handled in m_impl. + std::vector generated = m_impl.generate(input_ids, std::vector{input_ids.size(), config}, streamer); + std::vector> plain_tokens; + std::vector plain_scores; + for (EncodedGenerationResult& res : generated) { + if (GenerationStatus::FINISHED != res.m_status) { + OPENVINO_THROW("Got unfinished GenerationStatus"); + } + std::move(res.m_generation_ids.begin(), res.m_generation_ids.end(), std::back_inserter(plain_tokens)); + std::move(res.m_scores.begin(), res.m_scores.end(), std::back_inserter(plain_scores)); + } + return {std::move(plain_tokens), std::move(plain_scores)}; + } + + void start_chat(const std::string& system_message) override { + m_impl.start_chat(); + }; + + void finish_chat() override { + m_impl.finish_chat(); + }; +}; +} ov::genai::LLMPipeline::LLMPipeline( const ov::InferRequest& request, @@ -349,25 +495,27 @@ ov::genai::LLMPipeline::LLMPipeline( const ov::genai::Tokenizer& tokenizer, const std::string& device, const ov::AnyMap& plugin_config -) { - if (device == "NPU") { - m_pimpl = make_unique(std::filesystem::path(model_path), tokenizer, device, plugin_config); - } else { - m_pimpl = make_unique(std::filesystem::path(model_path), tokenizer, device, plugin_config); +): m_pimpl{[&]() -> std::unique_ptr { + if ("CB" == device) { + return std::make_unique(model_path, tokenizer, "CPU", plugin_config); + } if ("NPU" == device) { + return std::make_unique(model_path, tokenizer, device, plugin_config); } -} + return std::make_unique(model_path, tokenizer, device, plugin_config); +}()} {} ov::genai::LLMPipeline::LLMPipeline( const std::string& path, const std::string& device, const ov::AnyMap& config -) { - if (device == "NPU") { - m_pimpl = make_unique(std::filesystem::path(path), device, config); - } else { - m_pimpl = make_unique(std::filesystem::path(path), device, config); +): m_pimpl{[&]() -> std::unique_ptr { + if ("CB" == device) { + return std::make_unique(path, "CPU", config); + } if ("NPU" == device) { + return std::make_unique(path, device, config); } -} + return std::make_unique(path, device, config); +}()} {} ov::genai::GenerationConfig ov::genai::LLMPipeline::get_generation_config() const { return m_pimpl->m_generation_config; @@ -386,7 +534,7 @@ void ov::genai::LLMPipeline::finish_chat() { } void ov::genai::LLMPipeline::set_generation_config(const GenerationConfig& config) { - int64_t default_eos_token_id = m_pimpl->m_generation_config.eos_token_id;; + int64_t default_eos_token_id = m_pimpl->m_generation_config.eos_token_id; m_pimpl->m_generation_config = config; // if eos_token_id was not provided in config forward from default config if (config.eos_token_id == -1) diff --git a/src/cpp/src/synchronized_queue.hpp b/src/cpp/src/synchronized_queue.hpp index 0c2cd3180..bd025f1b7 100644 --- a/src/cpp/src/synchronized_queue.hpp +++ b/src/cpp/src/synchronized_queue.hpp @@ -17,6 +17,12 @@ class SynchronizedQueue SynchronizedQueue(const SynchronizedQueue&&) = delete; SynchronizedQueue& operator=(const SynchronizedQueue&) = delete; + T back() { + std::unique_lock lock(m_mutex); + m_cv.wait(lock, [this]{return !m_queue.empty();}); + return m_queue.back(); + } + T pull() { std::unique_lock lock(m_mutex); m_cv.wait(lock, [this]{return !m_queue.empty();}); diff --git a/src/python/py_generate_pipeline.cpp b/src/python/py_generate_pipeline.cpp index 8a1a226bc..47f38788d 100644 --- a/src/python/py_generate_pipeline.cpp +++ b/src/python/py_generate_pipeline.cpp @@ -606,8 +606,22 @@ PYBIND11_MODULE(py_generate_pipeline, m) { }), py::arg("model_path"), py::arg("tokenizer"), py::arg("scheduler_config"), py::arg("device") = "CPU", py::arg("plugin_config") = ov::AnyMap({})) .def("get_tokenizer", &ContinuousBatchingPipeline::get_tokenizer) .def("get_config", &ContinuousBatchingPipeline::get_config) - .def("add_request", &ContinuousBatchingPipeline::add_request) + .def("add_request", py::overload_cast(&ContinuousBatchingPipeline::add_request)) + .def("add_request", py::overload_cast(&ContinuousBatchingPipeline::add_request)) .def("step", &ContinuousBatchingPipeline::step) .def("has_non_finished_requests", &ContinuousBatchingPipeline::has_non_finished_requests) - .def("generate", &ContinuousBatchingPipeline::generate); + .def( + "generate", + py::overload_cast&, const std::vector&, const ov::genai::StreamerVariant&>(&ContinuousBatchingPipeline::generate), + py::arg("input_ids"), + py::arg("sampling_params"), + py::arg("streamer") = std::monostate{} + ) + .def( + "generate", + py::overload_cast&, const std::vector&, const ov::genai::StreamerVariant&>(&ContinuousBatchingPipeline::generate), + py::arg("prompts"), + py::arg("sampling_params"), + py::arg("streamer") = std::monostate{} + ); } diff --git a/tests/python_tests/ov_genai_test_utils.py b/tests/python_tests/ov_genai_test_utils.py index ccd5d1397..edfadb098 100644 --- a/tests/python_tests/ov_genai_test_utils.py +++ b/tests/python_tests/ov_genai_test_utils.py @@ -218,3 +218,8 @@ def load_pipe(configs: List[Tuple], temp_path): with (temp_path / config_name).open('w') as f: json.dump(config_json, f) return ov_genai.LLMPipeline(str(temp_path)) + + +@functools.lru_cache(1) +def get_continuous_batching(path): + return ov_genai.LLMPipeline(str(path), ov_genai.Tokenizer(str(path)), 'CB') diff --git a/tests/python_tests/test_chat_generate_api.py b/tests/python_tests/test_chat_generate_api.py index 5a73d481d..bd1d45d18 100644 --- a/tests/python_tests/test_chat_generate_api.py +++ b/tests/python_tests/test_chat_generate_api.py @@ -1,6 +1,7 @@ # Copyright (C) 2023-2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +import math import openvino import openvino_tokenizers import openvino_genai as ov_genai @@ -12,7 +13,8 @@ read_model, load_tok, model_tmp_path, - get_chat_templates + get_chat_templates, + get_continuous_batching, ) @@ -167,3 +169,19 @@ def test_apply_chat_template(model_tmp_path, chat_config: Tuple[str, Dict]): print(f'hf reference: {full_history_str_hf}') print(f'ov_genai out: {full_history_str}') assert full_history_str == full_history_str_hf + + +@pytest.mark.parametrize("generation_config", configs[1:]) +@pytest.mark.parametrize("model_descr", get_chat_models_list()) +@pytest.mark.precommit +def test_chat_continuous_batching_vs_stateful(model_descr, generation_config: Dict): + model_id, path, tokenizer, model, stateful = read_model((model_descr[0], model_descr[1] / '_test_chat')) + cb = get_continuous_batching(path) + stateful.start_chat() + cb.start_chat() + for question in quenstions: + generated = cb.generate(question, **generation_config) + reference = stateful.generate(question, **generation_config) + assert generated == reference + # Test that finish_chat() doesn't fail just in case. + cb.finish_chat() diff --git a/tests/python_tests/test_generate_api.py b/tests/python_tests/test_generate_api.py index e2395cf8d..a18bc517d 100644 --- a/tests/python_tests/test_generate_api.py +++ b/tests/python_tests/test_generate_api.py @@ -11,6 +11,7 @@ import sys from pathlib import Path import torch +import math from ov_genai_test_utils import ( get_models_list, read_model, @@ -18,11 +19,11 @@ load_tok, model_tmp_path, STOP_CRITERIA_MAP, + get_continuous_batching, ) def run_hf_ov_genai_comparison_batched(model_descr, generation_config: Dict, prompts: Union[str, List[str]]): - device = 'CPU' model_id, path, tokenizer, model, pipe = model_descr config = generation_config.copy() # to avoid side effects num_beams = config['num_beams'] if 'num_beams' in config else 1 @@ -67,7 +68,6 @@ def run_hf_ov_genai_comparison_batched(model_descr, generation_config: Dict, pro assert hf_output == ov_output def run_hf_ov_genai_comparison(model_descr, generation_config: Dict, prompt: str): - device = 'CPU' model_id, path, tokenizer, model, pipe = model_descr config = generation_config.copy() # to avoid side effects @@ -75,7 +75,7 @@ def run_hf_ov_genai_comparison(model_descr, generation_config: Dict, prompt: str if 'do_sample' not in config: # Some HF models have default do_sample = True, and if we set beam search generation config # it conflicts with `diversity_penalty` and/or `num_beam_groups`. - # Need to set exlicitly to False, but only if test arguments omitted this arg. + # Need to set explicitly to False, but only if test arguments omitted this arg. # Do not apply 'repetition_penalty' if sampling is not used. config['do_sample'] = False config['repetition_penalty'] = None @@ -705,3 +705,39 @@ def test_left_pad(): models[2].pad_token = models[2].eos_token run_hf_ov_genai_comparison_batched(models, config, prompts) + + +@pytest.mark.parametrize("generation_config", test_configs) +@pytest.mark.parametrize("prompt", batched_prompts) +@pytest.mark.parametrize("model_descr", get_models_list()) +@pytest.mark.precommit +def test_continuous_batching_vs_stateful(model_descr, prompt, generation_config): + model_id, path, tokenizer, model, stateful = read_model(( + "TinyLlama/TinyLlama-1.1B-Chat-v1.0", + Path("TinyLlama-1.1B-Chat-v1.0") + )) + config = ov_genai.GenerationConfig() + config.max_new_tokens = 100 + cb = get_continuous_batching(path) + generated = cb.generate(prompt, **generation_config) + reference = stateful.generate(prompt, **generation_config) + assert generated.texts == reference.texts + if 1 != generation_config.get("num_return_sequences", 1): + # Stateful puts zeroes to generated.scores. Don't compare them. + for gen, ref in zip(generated.scores, reference.scores): + assert math.isclose(gen, ref, abs_tol=0.0003) + +@pytest.mark.parametrize("prompt", prompts) +@pytest.mark.parametrize("model_descr", get_models_list()) +@pytest.mark.precommit +def test_cb_streamer_vs_return_vs_stateful(model_descr, prompt): + model_id, path, tokenizer, model, stateful = read_model(( + "TinyLlama/TinyLlama-1.1B-Chat-v1.0", + Path("TinyLlama-1.1B-Chat-v1.0") + )) + cb = get_continuous_batching(path) + streamed = [] + generated = cb.generate(prompt, max_new_tokens=20, streamer=lambda subword: streamed.append(subword)) + reference = stateful.generate(prompt, max_new_tokens=20) + assert generated == "".join(streamed) + assert "".join(streamed) == reference diff --git a/thirdparty/openvino_tokenizers b/thirdparty/openvino_tokenizers index 04795c1b7..880d569cd 160000 --- a/thirdparty/openvino_tokenizers +++ b/thirdparty/openvino_tokenizers @@ -1 +1 @@ -Subproject commit 04795c1b78c61e3294d1744c78a8ebb5e129256c +Subproject commit 880d569cd2f5d52165b940542e2f9190172ed2cb