Skip to content

Commit

Permalink
Merge master, without py tests
Browse files Browse the repository at this point in the history
  • Loading branch information
iefode committed May 28, 2024
2 parents f992591 + 2c2799f commit dbae0bf
Show file tree
Hide file tree
Showing 23 changed files with 609 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@

#include "continuous_batching_pipeline.hpp"

void print_generation_result(const GenerationResult& generation_result) {
for (size_t output_id = 0; output_id < generation_result.m_generation_ids.size(); ++output_id) {
std::cout << "Answer " << output_id << " (" << generation_result.m_scores[output_id] << ") : " << generation_result.m_generation_ids[output_id] << std::endl;
}
}

int main(int argc, char* argv[]) try {
// Command line options

Expand Down Expand Up @@ -71,7 +77,6 @@ int main(int argc, char* argv[]) try {
.dynamic_split_fuse = dynamic_split_fuse,
// vLLM specific params
.max_num_seqs = 2,
.max_paddings = 8,
};

ContinuousBatchingPipeline pipe(models_path, scheduler_config);
Expand All @@ -81,8 +86,27 @@ int main(int argc, char* argv[]) try {
const GenerationResult & generation_result = generation_results[request_id];

std::cout << "Question: " << prompts[request_id] << std::endl;
for (size_t output_id = 0; output_id < generation_result.m_generation_ids.size(); ++output_id) {
std::cout << "Answer " << output_id << " (" << generation_result.m_scores[output_id] << ") : " << generation_result.m_generation_ids[output_id] << std::endl;
switch (generation_result.m_status)
{
case GenerationResultStatus::FINISHED:
print_generation_result(generation_result);
break;
case GenerationResultStatus::IGNORED:
std::cout << "Request was ignored due to lack of memory." <<std::endl;
if (generation_result.m_generation_ids.size() > 0) {
std::cout << "Partial result:" << std::endl;
print_generation_result(generation_result);
}
break;
case GenerationResultStatus::ABORTED:
std::cout << "Request was aborted." <<std::endl;
if (generation_result.m_generation_ids.size() > 0) {
std::cout << "Partial result:" << std::endl;
print_generation_result(generation_result);
}
break;
default:
break;
}
std::cout << std::endl;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,13 @@ int main(int argc, char* argv[]) try {
.block_size = 32,
.dynamic_split_fuse = dynamic_split_fuse,
.max_num_seqs = 256, // not used if dynamic_split_fuse=True
.max_paddings = 256, // not used if dynamic_split_fuse=True
};

std::cout << "Benchmarking parameters: " << std::endl;
std::cout << "\tMax number of batched tokens: " << scheduler_config.max_num_batched_tokens << std::endl;
std::cout << "\tScheduling type: " << (scheduler_config.dynamic_split_fuse ? "dynamic split-fuse" : "vLLM") << std::endl;
if (!scheduler_config.dynamic_split_fuse) {
std::cout << "\tMax number of batched sequences: " << scheduler_config.max_num_seqs << std::endl;
std::cout << "\tMax number of padding tokens within prompt batch: " << scheduler_config.max_paddings << std::endl;
}
std::cout << "Dataset parameters: " << std::endl;
std::cout << "\tNum prompts: " << num_prompts << std::endl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ FetchContent_MakeAvailable(googletest)


set(TEST_TARGET_NAME "tests_continuous_batching")
add_executable(${TEST_TARGET_NAME} "src/tests/scheduler.cpp" "src/tests/block_manager.cpp" "src/tests/logit_filtering.cpp")
add_executable(${TEST_TARGET_NAME} "src/tests/scheduler.cpp" "src/tests/block_manager.cpp" "src/tests/logit_filtering.cpp" "src/tests/cache_manager.cpp")
target_link_libraries(${TEST_TARGET_NAME} PUBLIC ${TARGET_NAME} openvino::runtime gtest_main)
target_include_directories(${TEST_TARGET_NAME} PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/src/"
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include")
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@
#include "tokenizer.hpp"
#include "generation_config.hpp"

enum class GenerationResultStatus {
FINISHED = 0,
IGNORED = 1,
ABORTED = 2 // Currently not used, TODO: implement abort functionality
};

struct GenerationResult {
// request ID
uint64_t m_request_id;
Expand All @@ -18,6 +24,9 @@ struct GenerationResult {
std::vector<std::string> m_generation_ids;
// scores
std::vector<float> m_scores;

// Status of generation
GenerationResultStatus m_status;
};

class ContinuousBatchingPipeline {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ struct SchedulerConfig {
// TODO: benchmark this value and understand a required value to ensure inference is not memory bound
std::size_t max_num_batched_tokens = 16;

// TODO: specify size in GBs instead of number of KV blocks
// total number of KV blocks available to scheduler logic
std::size_t num_kv_blocks = 500;
std::size_t num_kv_blocks = 0;

// total size of KV cache in GB
std::size_t cache_size = 0;

// block size for KV cache
std::size_t block_size = 32;
Expand All @@ -27,8 +29,4 @@ struct SchedulerConfig {

// max number of scheduled sequences (you can think of it as "max batch size")
std::size_t max_num_seqs = 256;
// max number of padding tokens applied when we schedule a prompt phase
// e.g. if total number of padded tokens within a batch a greater than this value, then
// new sequnce is not added to batch
std::size_t max_paddings = 256;
};
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ GenerationResult from_sequence_group(std::shared_ptr<Tokenizer> tokenizer, Seque
}
}

if (sequence_group->has_finished()) {
result.m_status = GenerationResultStatus::FINISHED;
}
else if (sequence_group->out_of_memory()) {
result.m_status = GenerationResultStatus::IGNORED;
}
else {
result.m_status = GenerationResultStatus::ABORTED;
}
return result;
}

Expand Down Expand Up @@ -83,9 +92,9 @@ class ContinuousBatchingPipeline::Impl {
// current requests to process
std::vector<SequenceGroup::Ptr> m_requests;

void _free_finished_requests() {
void _free_non_running_requests() {
auto new_end = std::remove_if(m_requests.begin(), m_requests.end(), [] (SequenceGroup::CPtr seq_group) -> bool {
return seq_group->has_finished();
return seq_group->has_finished() || seq_group->out_of_memory();
});
m_requests.erase(new_end, m_requests.end());
}
Expand All @@ -111,9 +120,15 @@ class ContinuousBatchingPipeline::Impl {
infer_request.set_input_tensor(2 + decoder_layer_id * 2 + 1, m_cache_manager->get_value_cache(decoder_layer_id));
}

m_scheduler = std::make_shared<Scheduler>(scheduler_config);
SchedulerConfig updated_config = scheduler_config;
// update KV number in scheduler config
if (scheduler_config.num_kv_blocks != device_config.get_num_kv_blocks()) {
updated_config.num_kv_blocks = device_config.get_num_kv_blocks();
}

m_scheduler = std::make_shared<Scheduler>(updated_config);
// and finally create model runner
m_model_runner = std::make_shared<ModelRunner>(infer_request, scheduler_config);
m_model_runner = std::make_shared<ModelRunner>(infer_request, updated_config);
m_sampler = std::make_shared<Sampler>();
m_sampler->set_seed(m_generation_config.rng_seed);

Expand Down Expand Up @@ -163,6 +178,21 @@ class ContinuousBatchingPipeline::Impl {
timer.end();
}

// if no tokens were scheduled, we are out of memory
if (scheduler_output.m_total_num_scheduled_tokens == 0) {

// return partial results
std::vector<GenerationResult> pertial_results;

for (size_t i = 0; i < m_requests.size(); ++i) {
SequenceGroup::CPtr sequence_group = m_requests[i];
pertial_results.push_back(from_sequence_group(m_tokenizer, sequence_group));
}

_free_non_running_requests();
return pertial_results;
}

ov::Tensor logits;
{
static ManualTimer timer("forward");
Expand Down Expand Up @@ -224,7 +254,7 @@ class ContinuousBatchingPipeline::Impl {
}
}

_free_finished_requests();
_free_non_running_requests();

timer.end();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ class DeviceConfig {
ov::element::Type m_kv_cache_type;
ov::Shape m_key_cache_shape, m_value_cache_shape;
ov::Shape::value_type m_num_kv_heads, m_head_size, m_num_decoder_layers;
size_t m_num_kv_blocks, m_block_size;
size_t m_num_kv_blocks = 0;
size_t m_block_size = 0;
size_t m_cache_size = 0;
std::string m_device;

public:
DeviceConfig(ov::Core& core, const SchedulerConfig& scheduling_config, const std::string& device) {
m_device = device;

// keep information about blocsk
m_num_kv_blocks = scheduling_config.num_kv_blocks;
m_block_size = scheduling_config.block_size;

if (m_device == "CPU") {
Expand All @@ -32,13 +33,28 @@ class DeviceConfig {
} else {
OPENVINO_THROW(m_device, " is not supported by OpenVINO Continuous Batching");
}

OPENVINO_ASSERT(scheduling_config.num_kv_blocks > 0 || scheduling_config.cache_size > 0, "num_kv_blocks or cache_size should be more than zero.");
if (scheduling_config.num_kv_blocks > 0) {
m_num_kv_blocks = scheduling_config.num_kv_blocks;
}
else {
m_cache_size = scheduling_config.cache_size;

}
}

void set_model_params(size_t num_kv_heads, size_t head_size, size_t num_decoder_layers) {
m_num_kv_heads = num_kv_heads;
m_head_size = head_size;
m_num_decoder_layers = num_decoder_layers;

if (m_num_kv_blocks == 0) {
OPENVINO_ASSERT(m_cache_size > 0, "num_kv_blocks or cache_size should be more than zero.");
size_t size_in_bytes = m_cache_size * 1024 * 1024 * 1024;
m_num_kv_blocks = size_in_bytes / (m_num_decoder_layers * 2 * m_num_kv_heads * m_block_size * m_head_size * m_kv_cache_type.size());
}

m_key_cache_shape = m_value_cache_shape = ov::Shape{m_num_kv_blocks,
m_num_kv_heads,
m_block_size,
Expand Down Expand Up @@ -66,4 +82,8 @@ class DeviceConfig {
OPENVINO_ASSERT(!m_value_cache_shape.empty());
return m_value_cache_shape;
}

size_t get_num_kv_blocks() const {
return m_num_kv_blocks;
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ GenerationConfig GenerationConfig::beam_search() {
GenerationConfig GenerationConfig::multinomial() {
GenerationConfig multinomial;
multinomial.do_sample = true;
multinomial.temperature = 0.8f;
multinomial.top_p = 0.8;
multinomial.temperature = 0.95f;
multinomial.top_p = 0.95;
multinomial.top_k = 20;
multinomial.num_return_sequences = 3;
return multinomial;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ void apply_paged_attention_transformations(std::shared_ptr<ov::Model> model, Dev
const ov::op::util::VariableVector& variables = model->get_variables();
OPENVINO_ASSERT(!variables.empty(), "Model is supposed to be stateful");

// number of variables is 2 (K and V) multiplied by number of decoder layers
size_t num_layers = variables.size() >> 1;

ov::pass::Manager manager;
manager.register_pass<ov::pass::SDPAToPagedAttention>();
manager.run_passes(model);
ov::pass::SDPAToPagedAttention().run_on_model(model);

const ov::ParameterVector& parameters = model->get_parameters();

size_t num_layers = std::count_if(parameters.begin(), parameters.end(), [](std::shared_ptr<ov::op::v0::Parameter> parameter) {
return parameter->get_friendly_name().find("key_cache.") == 0;
});

// extract num_kv_heads and head_size
size_t kv_caches_inputs_offset = 2;
ov::PartialShape k_shape = parameters[kv_caches_inputs_offset]->get_partial_shape();
Expand Down
Loading

0 comments on commit dbae0bf

Please sign in to comment.