Skip to content

Commit

Permalink
Merge pull request #7 from ReCodEx/job-metadata
Browse files Browse the repository at this point in the history
Support sending metadata with evaluation requests
  • Loading branch information
janbuchar authored Dec 1, 2019
2 parents b0c367a + 4bd5a8a commit 0edcf2b
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 26 deletions.
18 changes: 15 additions & 3 deletions src/handlers/broker_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,15 @@ void broker_handler::process_client_eval(
std::string job_id = message.at(1);
request::headers_t headers;

request::metadata_t metadata;
const static std::string metadata_key_prefix = "meta.";

// Load headers terminated by an empty frame
auto it = std::begin(message) + 2;

while (true) {
// End of headers
if (it->size() == 0) {
if (it->empty()) {
++it;
break;
}
Expand All @@ -116,7 +119,16 @@ void broker_handler::process_client_eval(
std::size_t pos = it->find('=');
std::size_t value_size = it->size() - (pos + 1);

headers.emplace(it->substr(0, pos), it->substr(pos + 1, value_size));
const auto &key = it->substr(0, pos);
const auto &value = it->substr(pos + 1, value_size);

// Headers that start with the metadata prefix get stored in the separate metadata field
if (key.substr(0, metadata_key_prefix.size()) == metadata_key_prefix) {
metadata.emplace(key.substr(metadata_key_prefix.size()), value);
} else {
headers.emplace(key, value);
}

++it;
}

Expand All @@ -138,7 +150,7 @@ void broker_handler::process_client_eval(
job_request_data request_data(job_id, additional_data);
logger_->debug(" - incoming job {}", job_id);

auto eval_request = std::make_shared<request>(headers, request_data);
auto eval_request = std::make_shared<request>(headers, metadata, request_data);
enqueue_result result = queue_->enqueue_request(eval_request);

if (result.enqueued) {
Expand Down
9 changes: 8 additions & 1 deletion src/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,15 @@ struct request {
/** Data structure type for holding all headers. */
using headers_t = std::multimap<std::string, std::string>;

/** Data structure for holding job metadata */
using metadata_t = std::map<std::string, std::string>;

/** Headers that specify requirements on the machine that processes the request. */
const headers_t headers;

/** Optional job metadata that can be used for job scheduling and routing */
const metadata_t metadata;

/** The data of the request. */
const job_request_data data;

Expand All @@ -93,7 +99,8 @@ struct request {
* @param headers Request headers that specify requirements on workers.
* @param data Body of the request.
*/
request(const headers_t &headers, const job_request_data &data) : headers(headers), data(data)
request(const headers_t &headers, const metadata_t &metadata, const job_request_data &data)
: headers(headers), metadata(metadata), data(data)
{
}

Expand Down
81 changes: 70 additions & 11 deletions tests/broker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,58 @@ TEST(broker, queuing)
messages.clear();
}

class spying_queue_manager : public multi_queue_manager
{
public:
std::vector<request_ptr> received_requests;

enqueue_result enqueue_request(request_ptr request) override
{
received_requests.push_back(request);
return multi_queue_manager::enqueue_request(request);
}
};

TEST(broker, request_metadata)
{
auto config = std::make_shared<NiceMock<mock_broker_config>>();
auto workers = std::make_shared<worker_registry>();
auto queue = std::make_shared<spying_queue_manager>();

// There is already a worker in the registry
auto worker_1 = std::make_shared<worker>("identity_1", "group_1", worker_headers_t{{"env", "c"}});
workers->add_worker(worker_1);
queue->add_worker(worker_1);

// Dummy response callback
std::vector<message_container> messages;
handler_interface::response_cb respond = [&messages](const message_container &msg) { messages.push_back(msg); };

// The test code
broker_handler handler(config, workers, queue, nullptr);

std::string client_id = "client_foo";

// A client requests an evaluation
handler.on_request(
message_container(broker_connect::KEY_CLIENTS,
client_id,
{"eval", "job1", "env=c", "meta.user_id=user_asdf", "meta.exercise_id=exercise_asdf", "", "1", "2"}),
respond);

// The job should be assigned to our worker immediately, metadata should not be passed
ASSERT_THAT(messages,
UnorderedElementsAre(
message_container(broker_connect::KEY_WORKERS, worker_1->identity, {"eval", "job1", "1", "2"}),
message_container(broker_connect::KEY_CLIENTS, client_id, {"ack"}),
message_container(broker_connect::KEY_CLIENTS, client_id, {"accept"})));

ASSERT_THAT(queue->received_requests[0]->metadata,
UnorderedElementsAre(Pair("user_id", "user_asdf"), Pair("exercise_id", "exercise_asdf")));

messages.clear();
}

TEST(broker, freeze)
{
auto config = std::make_shared<NiceMock<mock_broker_config>>();
Expand Down Expand Up @@ -306,7 +358,8 @@ TEST(broker, worker_expiration)
// There is already a worker in the registry and it has a job
auto worker_1 = std::make_shared<worker>("identity_1", "group_1", worker_headers_t{{"env", "c"}});
worker_1->liveness = 1;
auto request_1 = std::make_shared<request>(request::headers_t{{"env", "c"}}, job_request_data("job_id", {}));
auto request_1 = std::make_shared<request>(
request::headers_t{{"env", "c"}}, request::metadata_t{{}}, job_request_data("job_id", {}));
workers->add_worker(worker_1);
queue->add_worker(worker_1, request_1);

Expand Down Expand Up @@ -372,7 +425,8 @@ TEST(broker, worker_state_message)

// There is already a worker in the registry and it has a job
auto worker_1 = std::make_shared<worker>("identity_1", "group_1", worker_headers_t{{"env", "c"}});
auto request_1 = std::make_shared<request>(request::headers_t{{"env", "c"}}, job_request_data("job_id", {}));
auto request_1 = std::make_shared<request>(
request::headers_t{{"env", "c"}}, request::metadata_t{{}}, job_request_data("job_id", {}));
worker_1->liveness = 1;
workers->add_worker(worker_1);
queue->add_worker(worker_1, request_1);
Expand Down Expand Up @@ -401,7 +455,8 @@ TEST(broker, worker_job_failed)

// There is already a worker in the registry and it has a job
auto worker_1 = std::make_shared<worker>("identity_1", "group_1", worker_headers_t{{"env", "c"}});
auto request_1 = std::make_shared<request>(request::headers_t{{"env", "c"}}, job_request_data("job_id", {}));
auto request_1 = std::make_shared<request>(
request::headers_t{{"env", "c"}}, request::metadata_t{{}}, job_request_data("job_id", {}));
worker_1->liveness = 1;
workers->add_worker(worker_1);
queue->add_worker(worker_1, request_1);
Expand Down Expand Up @@ -436,8 +491,10 @@ TEST(broker, worker_job_failed_queueing)

// There is already a worker in the registry and it has two jobs
auto worker_1 = std::make_shared<worker>("identity_1", "group_1", worker_headers_t{{"env", "c"}});
auto request_1 = std::make_shared<request>(request::headers_t{{"env", "c"}}, job_request_data("job_id_1", {}));
auto request_2 = std::make_shared<request>(request::headers_t{{"env", "c"}}, job_request_data("job_id_2", {}));
auto request_1 = std::make_shared<request>(
request::headers_t{{"env", "c"}}, request::metadata_t{{}}, job_request_data("job_id_1", {}));
auto request_2 = std::make_shared<request>(
request::headers_t{{"env", "c"}}, request::metadata_t{{}}, job_request_data("job_id_2", {}));
worker_1->liveness = 1;
workers->add_worker(worker_1);
queue->add_worker(worker_1);
Expand Down Expand Up @@ -484,7 +541,8 @@ TEST(broker, worker_job_done)

// There is already a worker in the registry and it has a job
auto worker_1 = std::make_shared<worker>("identity_1", "group_1", worker_headers_t{{"env", "c"}});
auto request_1 = std::make_shared<request>(request::headers_t{{"env", "c"}}, job_request_data("job_id", {}));
auto request_1 = std::make_shared<request>(
request::headers_t{{"env", "c"}}, request::metadata_t{{}}, job_request_data("job_id", {}));
worker_1->liveness = 1;
workers->add_worker(worker_1);
queue->add_worker(worker_1, request_1);
Expand Down Expand Up @@ -550,7 +608,8 @@ TEST(broker, worker_job_internal_failure)

// There is already a worker in the registry and it has a job
auto worker_1 = std::make_shared<worker>("identity_1", "group_1", worker_headers_t{{"env", "c"}});
auto request_1 = std::make_shared<request>(request::headers_t{{"env", "c"}}, job_request_data("job_id", {}));
auto request_1 = std::make_shared<request>(
request::headers_t{{"env", "c"}}, request::metadata_t{{}}, job_request_data("job_id", {}));
worker_1->liveness = 1;
workers->add_worker(worker_1);
queue->add_worker(worker_1, request_1);
Expand Down Expand Up @@ -632,8 +691,8 @@ TEST(broker, worker_expiration_reassign_job)

// There are two workers in the registry, one of them has a job and will die
auto worker_1 = std::make_shared<worker>("identity_1", "group_1", worker_headers_t{{"env", "c"}});
auto request_1 =
std::make_shared<request>(request::headers_t{{"env", "c"}}, job_request_data("job_id", {"whatever"}));
auto request_1 = std::make_shared<request>(
request::headers_t{{"env", "c"}}, request::metadata_t{{}}, job_request_data("job_id", {"whatever"}));
auto worker_2 = std::make_shared<worker>("identity_2", "group_1", worker_headers_t{{"env", "c"}});
worker_1->liveness = 1;
worker_2->liveness = 100;
Expand Down Expand Up @@ -720,8 +779,8 @@ TEST(broker, worker_expiration_cancel_job)

// There are two workers in the registry, one of them has a job that expired too many times
auto worker_1 = std::make_shared<worker>("identity_1", "group_1", worker_headers_t{{"env", "c"}});
auto request_1 =
std::make_shared<request>(request::headers_t{{"env", "c"}}, job_request_data("job_id", {"whatever"}));
auto request_1 = std::make_shared<request>(
request::headers_t{{"env", "c"}}, request::metadata_t{{}}, job_request_data("job_id", {"whatever"}));
request_1->failure_count = 1;
auto worker_2 = std::make_shared<worker>("identity_2", "group_1", worker_headers_t{{"env", "c"}});
worker_1->liveness = 1;
Expand Down
22 changes: 11 additions & 11 deletions tests/multi_queue_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ TEST(multi_queue_manager, add_worker)
multi_queue_manager manager;

auto worker_1 = std::make_shared<worker>("identity1", "group_1", headers);
auto request_1 = std::make_shared<request>(headers, data);
auto request_1 = std::make_shared<request>(headers, request::metadata_t{{}}, data);

manager.add_worker(worker_1, request_1);
ASSERT_EQ(request_1, manager.get_current_request(worker_1));
Expand All @@ -30,8 +30,8 @@ TEST(multi_queue_manager, basic_queueing)
multi_queue_manager manager;

auto worker_1 = std::make_shared<worker>("identity1", "group_1", headers);
auto request_1 = std::make_shared<request>(headers, data);
auto request_2 = std::make_shared<request>(headers, data);
auto request_1 = std::make_shared<request>(headers, request::metadata_t{{}}, data);
auto request_2 = std::make_shared<request>(headers, request::metadata_t{{}}, data);

manager.add_worker(worker_1);

Expand Down Expand Up @@ -63,9 +63,9 @@ TEST(multi_queue_manager, terminate_basic)

auto worker_1 = std::make_shared<worker>("identity1", "group_1", headers);
manager.add_worker(worker_1);
auto request_1 = std::make_shared<request>(headers, data);
auto request_2 = std::make_shared<request>(headers, data);
auto request_3 = std::make_shared<request>(headers, data);
auto request_1 = std::make_shared<request>(headers, request::metadata_t{{}}, data);
auto request_2 = std::make_shared<request>(headers, request::metadata_t{{}}, data);
auto request_3 = std::make_shared<request>(headers, request::metadata_t{{}}, data);

manager.enqueue_request(request_1);
manager.enqueue_request(request_2);
Expand All @@ -86,8 +86,8 @@ TEST(multi_queue_manager, terminate_no_current)

auto worker_1 = std::make_shared<worker>("identity1", "group_1", headers);
manager.add_worker(worker_1);
auto request_1 = std::make_shared<request>(headers, data);
auto request_2 = std::make_shared<request>(headers, data);
auto request_1 = std::make_shared<request>(headers, request::metadata_t{{}}, data);
auto request_2 = std::make_shared<request>(headers, request::metadata_t{{}}, data);

manager.enqueue_request(request_1);
manager.enqueue_request(request_2);
Expand All @@ -105,7 +105,7 @@ TEST(multi_queue_manager, terminate_empty)

auto worker_1 = std::make_shared<worker>("identity1", "group_1", headers);
manager.add_worker(worker_1);
auto request_1 = std::make_shared<request>(headers, data);
auto request_1 = std::make_shared<request>(headers, request::metadata_t{{}}, data);

manager.enqueue_request(request_1);
manager.worker_finished(worker_1);
Expand All @@ -126,8 +126,8 @@ TEST(multi_queue_manager, load_balancing)

request::headers_t headers = {{"env", "c"}};
job_request_data data("", {});
auto request_1 = std::make_shared<request>(headers, data);
auto request_2 = std::make_shared<request>(headers, data);
auto request_1 = std::make_shared<request>(headers, request::metadata_t{{}}, data);
auto request_2 = std::make_shared<request>(headers, request::metadata_t{{}}, data);

enqueue_result result_1 = manager.enqueue_request(request_1);
enqueue_result result_2 = manager.enqueue_request(request_2);
Expand Down

0 comments on commit 0edcf2b

Please sign in to comment.