Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
  • Loading branch information
abcdabcd987 committed Dec 19, 2023
1 parent 20e008f commit ab7f0c9
Show file tree
Hide file tree
Showing 11 changed files with 852 additions and 14 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ target_link_libraries(run_nexus PUBLIC bench_dispatcher_obj)

###### tools/run_shepherd ######
add_executable(run_shepherd
tools/shepherd/common.cpp
tools/shepherd/fake_shepherd_backend.cpp
tools/shepherd/fake_shepherd_frontend.cpp
tools/shepherd/flex_scheduler.cpp
Expand Down
26 changes: 26 additions & 0 deletions tools/shepherd/common.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#include "shepherd/common.h"

#include <gflags/gflags.h>

using nexus::shepherd::ShepherdConfig;

DEFINE_uint32(shepherd_dctrl,
ShepherdConfig::Default().ctrl_latency.count() / 1000,
"Shepherd: control plane latency in microseconds.");
DEFINE_uint32(shepherd_ddata,
ShepherdConfig::Default().data_latency.count() / 1000,
"Shepherd: data plane latency in microseconds.");
DEFINE_double(shepherd_preempt_lambda, ShepherdConfig::Default().preempt_lambda,
"Shepherd: lambda for preemptive scheduling.");

namespace nexus::shepherd {

ShepherdConfig ShepherdConfig::FromFlags() {
ShepherdConfig config;
config.ctrl_latency = std::chrono::microseconds(FLAGS_shepherd_dctrl);
config.data_latency = std::chrono::microseconds(FLAGS_shepherd_ddata);
config.preempt_lambda = FLAGS_shepherd_preempt_lambda;
return config;
}

} // namespace nexus::shepherd
22 changes: 22 additions & 0 deletions tools/shepherd/common.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <chrono>
#include <vector>

#include "nexus/common/time_util.h"
Expand All @@ -16,10 +17,31 @@ struct BatchPlan {
std::vector<int> query_ids;
TimePoint exec_at;
TimePoint finish_at;

long exec_time_ns() const { return exec_at.time_since_epoch().count(); }
long expected_finish_time_ns() const {
return finish_at.time_since_epoch().count();
}
};

enum class Preemption { kNo, kYes };

struct ShepherdConfig {
std::chrono::duration<long, std::nano> ctrl_latency;
std::chrono::duration<long, std::nano> data_latency;
float preempt_lambda;

static ShepherdConfig Default() {
ShepherdConfig config;
config.ctrl_latency = std::chrono::microseconds(100);
config.data_latency = std::chrono::microseconds(50);
config.preempt_lambda = 3.03;
return config;
}

static ShepherdConfig FromFlags();
};

class BackendStub {
public:
virtual void RunBatch(BatchPlan plan, Preemption preempt) = 0;
Expand Down
24 changes: 24 additions & 0 deletions tools/shepherd/fake_accessor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once
#include <memory>
#include <unordered_map>

namespace nexus::shepherd {

class FakeShepherdFrontend;

class FakeObjectAccessor {
public:
std::shared_ptr<FakeShepherdFrontend> GetFrontend(int model_id) {
return frontends_.at(model_id);
}

void AddFrontend(int model_id,
std::shared_ptr<FakeShepherdFrontend> frontend) {
frontends_.emplace(model_id, frontend);
}

private:
std::unordered_map<int, std::shared_ptr<FakeShepherdFrontend>> frontends_;
};

} // namespace nexus::shepherd
126 changes: 125 additions & 1 deletion tools/shepherd/fake_shepherd_backend.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,127 @@
#include "shepherd/fake_shepherd_backend.h"

namespace nexus::shepherd {} // namespace nexus::shepherd
#include <glog/logging.h>

#include "shepherd/common.h"
#include "shepherd/fake_shepherd_frontend.h"

namespace nexus::shepherd {

namespace {

bool HeapOrderBatchPlanByExecTimeASC(const BatchPlan& lhs,
const BatchPlan& rhs) {
return lhs.exec_at > rhs.exec_at;
}

bool BatchPlanIntersects(const BatchPlan& a, const BatchPlan& b) {
if (a.finish_at <= b.exec_at) return false;
if (b.finish_at <= a.exec_at) return false;
return true;
}

} // namespace

FakeShepherdBackend::FakeShepherdBackend(boost::asio::io_context* io_context,
FakeObjectAccessor* accessor,
int gpu_id, bool save_archive)
: io_context_(*CHECK_NOTNULL(io_context)),
accessor_(*CHECK_NOTNULL(accessor)),
gpu_id_(gpu_id),
timer_(io_context_),
save_archive_(save_archive) {}

void FakeShepherdBackend::Stop() {
timer_.cancel();
DrainBatchPlans();
}

void FakeShepherdBackend::RunBatch(BatchPlan request, Preemption preempt) {
TimePoint now = Clock::now();
auto now_ns = now.time_since_epoch().count();

CHECK_LE(request.exec_time_ns(), request.expected_finish_time_ns())
<< "Incorrect finish time.";
CHECK_LE(now_ns, request.exec_time_ns()) << "BatchPlan too late.";

std::lock_guard lock(mutex_);
if (preempt == Preemption::kYes) {
CHECK(!batchplans_.empty()) << "Cannot preempt. No current plan.";
auto old_plan = batchplans_.front();
CHECK_LE(now_ns, old_plan.expected_finish_time_ns())
<< "Cannot preempt. Current plan is not running.";
std::pop_heap(batchplans_.begin(), batchplans_.end(),
HeapOrderBatchPlanByExecTimeASC);
batchplans_.pop_back();
}

for (const auto& plan : batchplans_) {
CHECK(!BatchPlanIntersects(plan, request))
<< "Batchplan intersects.\n"
<< "existing plan: exec_time=base"
<< " finish_time=base+"
<< (plan.expected_finish_time_ns() - plan.exec_time_ns()) << "\n"
<< "new plan: exec_time=base+"
<< (request.exec_time_ns() - plan.exec_time_ns())
<< " finish_time=base+"
<< (request.expected_finish_time_ns() - plan.exec_time_ns());
}
batchplans_.emplace_back(std::move(request));
std::push_heap(batchplans_.begin(), batchplans_.end(),
HeapOrderBatchPlanByExecTimeASC);
SetupTimer();
}

void FakeShepherdBackend::SetupTimer() {
if (!batchplans_.empty()) {
auto finish_at = batchplans_[0].finish_at;
if (timer_.expiry() != finish_at) {
timer_.expires_at(finish_at);
timer_.async_wait([this](boost::system::error_code ec) { OnTimer(ec); });
}
}
}

void FakeShepherdBackend::DrainBatchPlans() {
for (auto& plan : batchplans_) {
OnBatchFinish(plan);
SaveBatchPlan(std::move(plan));
}
batchplans_.clear();
}

void FakeShepherdBackend::OnBatchFinish(const BatchPlan& plan) {
auto frontend = accessor_.GetFrontend(plan.model_id);
frontend->GotBatchReply(plan);
}

void FakeShepherdBackend::OnTimer(boost::system::error_code ec) {
if (ec) return;
TimePoint now = Clock::now();
auto now_ns = now.time_since_epoch().count();
std::vector<BatchPlan> finished_plans;
std::unique_lock lock(mutex_);
while (!batchplans_.empty()) {
if (batchplans_[0].expected_finish_time_ns() > now_ns) {
break;
}
finished_plans.emplace_back(std::move(batchplans_[0]));
std::pop_heap(batchplans_.begin(), batchplans_.end(),
HeapOrderBatchPlanByExecTimeASC);
batchplans_.pop_back();
}
SetupTimer();
lock.unlock();
for (auto& plan : finished_plans) {
OnBatchFinish(plan);
SaveBatchPlan(std::move(plan));
}
}

void FakeShepherdBackend::SaveBatchPlan(BatchPlan plan) {
if (save_archive_) {
batchplan_archive_.emplace_back(std::move(plan));
}
}

} // namespace nexus::shepherd
32 changes: 32 additions & 0 deletions tools/shepherd/fake_shepherd_backend.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,43 @@
#pragma once
#include <boost/asio/io_context.hpp>
#include <boost/asio/system_timer.hpp>
#include <boost/system/error_code.hpp>
#include <deque>
#include <vector>

#include "shepherd/common.h"
#include "shepherd/fake_accessor.h"

namespace nexus::shepherd {

class FakeShepherdBackend : public BackendStub {
public:
FakeShepherdBackend(boost::asio::io_context* io_context,
FakeObjectAccessor* accessor, int gpu_id,
bool save_archive);
int gpu_id() const { return gpu_id_; }
const std::deque<BatchPlan>& batchplan_archive() const {
return batchplan_archive_;
}

void RunBatch(BatchPlan plan, Preemption preempt) override;
void DrainBatchPlans();
void Stop();

private:
void OnBatchFinish(const BatchPlan& plan);
void OnTimer(boost::system::error_code ec);
void SetupTimer();
void SaveBatchPlan(BatchPlan plan);

boost::asio::io_context& io_context_;
FakeObjectAccessor& accessor_;
int gpu_id_;
bool save_archive_;
boost::asio::system_timer timer_;
std::mutex mutex_;
std::vector<BatchPlan> batchplans_;
std::deque<BatchPlan> batchplan_archive_;
};

} // namespace nexus::shepherd
41 changes: 40 additions & 1 deletion tools/shepherd/fake_shepherd_frontend.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,42 @@
#include "shepherd/fake_shepherd_frontend.h"

namespace nexus::shepherd {} // namespace nexus::shepherd
namespace nexus::shepherd {

FakeShepherdFrontend::FakeShepherdFrontend(int model_id, int slo_ms,
size_t workload_idx,
size_t reserved_size)
: model_id_(model_id),
slo_ms_(slo_ms),
workload_idx_(workload_idx),
reserved_size_(reserved_size) {
queries_.reset(new QueryContext[reserved_size_]);
}

void FakeShepherdFrontend::MarkQueryDropped(int query_id) {
auto& qctx = queries_[query_id];
qctx.status = QueryStatus::kDropped;
++cnt_bad_;
}

void FakeShepherdFrontend::ReceivedQuery(int query_id,
int64_t frontend_recv_ns) {
auto& qctx = queries_[query_id];
qctx.status = QueryStatus::kPending;
qctx.frontend_recv_ns = frontend_recv_ns;
++cnt_total_;
}

void FakeShepherdFrontend::GotBatchReply(const BatchPlan& plan) {
for (auto query_id : plan.query_ids) {
auto& qctx = queries_[query_id];
auto deadline_ns = qctx.frontend_recv_ns + slo_ms_ * 1000 * 1000;
if (plan.finish_at.time_since_epoch().count() < deadline_ns) {
qctx.status = QueryStatus::kSuccess;
} else {
qctx.status = QueryStatus::kTimeout;
++cnt_bad_;
}
}
}

} // namespace nexus::shepherd
34 changes: 34 additions & 0 deletions tools/shepherd/fake_shepherd_frontend.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,45 @@
#pragma once
#include <cstddef>
#include <cstdint>
#include <memory>

#include "shepherd/common.h"

namespace nexus::shepherd {

class FakeShepherdFrontend : public FrontendStub {
public:
enum class QueryStatus {
kPending,
kDropped,
kTimeout,
kSuccess,
};

struct QueryContext {
QueryStatus status;
int64_t frontend_recv_ns;
};

FakeShepherdFrontend(int model_id, int slo_ms, size_t workload_idx,
size_t reserved_size);
const QueryContext* queries() const { return queries_.get(); }
size_t reserved_size() const { return reserved_size_; }
size_t cnt_bad() const { return cnt_bad_; }
size_t cnt_total() const { return cnt_total_; }

void MarkQueryDropped(int query_id) override;
void ReceivedQuery(int query_id, int64_t frontend_recv_ns);
void GotBatchReply(const BatchPlan& plan);

private:
int model_id_;
int slo_ms_;
size_t workload_idx_;
size_t reserved_size_;
std::unique_ptr<QueryContext[]> queries_;
size_t cnt_bad_ = 0;
size_t cnt_total_ = 0;
};

} // namespace nexus::shepherd
Loading

0 comments on commit ab7f0c9

Please sign in to comment.