Skip to content

Commit

Permalink
i#5843 scheduler: Add time-based scheduling quanta
Browse files Browse the repository at this point in the history
Removes the original (but never implemented) report_time() heartbeat
design in favor of the simulator passing the current time to a new
version of next_record().

Implements QUANTUM_TIME by recording the start time of each input when
it is first scheduled and comparing to the new time in next_record().
Switches are only done at instruction boundaries for simplicity of
interactions with record-replay and skipping.

Adds 2 unit tests.

Adds time support with wall-clock time to the scheduler_launcher.
This was tested manually on some sample traces.  For threadsig traces,
with DEPENDENCY_TIMESTAMPS, the quanta doesn't make a huge differences
as the timestamp ordering imposes significant constraints.  I added an
option to ignore the timestamps ("-no_honor_stamps") and there we
really see the effects of the smaller quanta with more context switches.

===========================================================================
With timestamp deps and a 2ms quantum (compare to 2ms w/o deps below):
$ clients/bin64/scheduler_launcher -trace_dir drmemtrace.*.dir/trace -num_cores 4 -sched_quantum 2000 -sched_time -verbose 1 -honor_stamps
Core #0: 15 12 1 15 1 15 7 12 15 7 6 9 5
Core #1: 13 10 11 15 12 10 15 12 10 15 10 11 10 15 10 8 2
Core #2: 16 11 15 10 11 15 11 15 11 15 4 7 12 4 0 14
Core #3: 3 1 15 12 10 15 12 10 1 12 15 7 15 4 15

===========================================================================
Without, but a long quantum of 20ms:
$ clients/bin64/scheduler_launcher -trace_dir drmemtrace.*.dir/trace -num_cores 4 -sched_quantum 20000 -sched_time -verbose 1 -no_honor_stamps
Core #0: 0 5 8 12 16 4 11 15
Core #1: 1 4 9 14 0 7 9 0
Core #2: 2 6 10 13 1 6 13 1
Core #3: 3 7 11 15 2 3 8 10 14 16

===========================================================================
Without, but a smaller quantum of 2ms:
$ clients/bin64/scheduler_launcher -trace_dir drmemtrace.*.dir/trace -num_cores 4 -sched_quantum 2000 -sched_time -verbose 1 -no_honor_stamps
Core #0: 0 5 9 13 1 7 9 13 0 4 8 11 15 3 7 9 13 3 5 10 13 1 7 6 12 14 7 8 11 0 7 8 10 16 3 4 9 15 14 2 6 11 0 1 5 10 16 7 8 12 13 3 8 6 15 0 9 11 13
Core #1: 1 4 8 12 16 2 6 11 15 1 5 10 14 2 8 11 16 1 7 9 15 0 4 9 15 0 2 6 12 16 3 5 12 13 1 5 10 16 7 8 12 13 3 4 9 15 0 1 5 10 16 7 2 9 13 1 15
Core #2: 2 7 10 14 0 4 8 12 16 2 6 12 16 1 5 10 15 0 4 6 12 14 2 8 11 16 3 5 10 13 1 4 9 15 14 2 6 11 0 1 5 10 16 7 8 12 13 3 4 9 11 14 4 10 11 14 4 16 0
Core #3: 3 6 11 15 3 5 10 14 3 7 9 13 0 4 6 12 14 2 8 11 16 3 5 10 13 1 4 9 15 14 2 6 11 0 7 8 12 13 3 4 9 15 14 2 6 11 14 2 6 15 0 1 5 12 16 2 12 1

===========================================================================
Without, but a tiny quantum of 200us:
$ clients/bin64/scheduler_launcher -trace_dir drmemtrace.*.dir/trace -num_cores 4 -sched_quantum 200 -sched_time -verbose 1 -no_honor_stamps
Core #0: 0 4 7 11 15 2 6 10 14 1 7 9 12 16 5 10 12 4 8 3 11 12 1 8 11 16 7 8 15 12 0 6 12 7 13 10 2 8 15 16 2 3 15 6 11 7 13 6 10 1 8 5 10 1 3 6 14 11 7 15 2 4 12 13 5 9 10 15 6 9 10 7 6 2 1 11 3 14 16 12 13 5 1 11 6 8 2 10 0 7 5 10 0 3 14 1 15 6 7 5 4 0 12 14 9 10 16 14 8 10 11 13 7 9 0 13 3 9 1 13 3 5 2 16 14 4 15 0 6 4 15 0 13 12 8 10 1 3 4 15 2 14 3 5 11 16 13 6 15 10 2 12 6 4 9 12 6 15 10 1 7 8 11 2 14 13 5 10 1 12 8 5 0 14 8 3 4 16 6 15 11 2 1 8 3 4 0 14 7 4 2 6 14 7 11 10 1 9 13 2 14 12 3 5 10 0 9 13 11 8 12 7 16 10 0 3 5 10 0 3 13 11 15 12 13 11 8 3 13 15 9 12 7 2 8 0 4 6 15 9 3 6 15 14 12 5 15 8 0 3 6 2 0 1 6 13 0 3 6 2 11 9 4 2 0 10 4 2 0 10 4 13 0 10 4 13 0 1 15 2 12 1 15 2 0 11 15 6 13 1 15 4 16 14 11 4 16 14 10 4 16 14 11 5 2 13 9 3 4 6 1 11 7 2 16 15 12 4 5 1 12 10 6 13 9 4 2 5 15 3 10 5 15 12 11 16 1 14 7 2 13 9 4 10 6 8 14 3 2 15 7 4 0 6 8 4 0 6 7 12 3 16 6 1 4 16 15 9 12 3 5 13 8 11 0 6 1 4 16 2 7 14 10 5 13 12 3 0 9 8 11 10 6 1 14 16 2 7 11 0 9 1 4 3 5 13 12 3 5 13 4 11 10 6 8 15 11 5 8 4 11 5 13 15 3 6 8 15 11 16 2 7 12 3 5 8 4 14 16 2 15 12 0 5 13 4 14 3 2 4 14 3 8 10 1 16 6 13 4 7 5 13 10 16 11 9 2 12 3 5 6 10 1 7 0 15 12 14 8 2 10 3 11 9 15 16 7 13 2 12 3 13 2 0 16 14 5 6 16 14 5 15 4 1 11 9 6 10 14 2 0 16 14 9 6 12 14 8 4 10 9 0 13 1 12 8 15
Core #1: 1 5 ... <ommitted rest for space but all are as long as Core #0>
===========================================================================

Issue: #5843
  • Loading branch information
derekbruening committed Aug 11, 2023
1 parent 5bf1c14 commit d49883f
Show file tree
Hide file tree
Showing 4 changed files with 311 additions and 58 deletions.
69 changes: 50 additions & 19 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,18 @@ scheduler_tmpl_t<trace_entry_t, record_reader_t>::print_record(
template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::stream_t::next_record(RecordType &record)
{
return next_record(record, 0);
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::stream_t::next_record(RecordType &record,
uint64_t cur_time)
{
input_info_t *input = nullptr;
sched_type_t::stream_status_t res = scheduler_->next_record(ordinal_, record, input);
sched_type_t::stream_status_t res =
scheduler_->next_record(ordinal_, record, input, cur_time);
if (res != sched_type_t::STATUS_OK)
return res;

Expand Down Expand Up @@ -421,13 +430,6 @@ scheduler_tmpl_t<RecordType, ReaderType>::stream_t::next_record(RecordType &reco
return sched_type_t::STATUS_OK;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::stream_t::report_time(uint64_t cur_time)
{
return sched_type_t::STATUS_NOT_IMPLEMENTED;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::stream_t::start_speculation(
Expand Down Expand Up @@ -644,9 +646,6 @@ scheduler_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
set_cur_input(0, static_cast<input_ordinal_t>(min_input));
}
} else {
// TODO i#5843: Implement time-based quanta.
if (options_.quantum_unit != QUANTUM_INSTRUCTIONS)
return STATUS_ERROR_NOT_IMPLEMENTED;
// Assign initial inputs.
if (options_.deps == DEPENDENCY_TIMESTAMPS) {
sched_type_t::scheduler_status_t res = get_initial_timestamps();
Expand Down Expand Up @@ -1422,10 +1421,12 @@ template <typename RecordType, typename ReaderType>
void
scheduler_tmpl_t<RecordType, ReaderType>::add_to_ready_queue(input_info_t *input)
{
VPRINT(this, 4,
"add_to_ready_queue: input %d priority %d timestamp delta %" PRIu64 "\n",
input->index, input->priority,
input->reader->get_last_timestamp() - input->base_timestamp);
VPRINT(
this, 4,
"add_to_ready_queue (pre-size %zu): input %d priority %d timestamp delta %" PRIu64
"\n",
ready_priority_.size(), input->index, input->priority,
input->reader->get_last_timestamp() - input->base_timestamp);
input->queue_counter = ++ready_counter_;
ready_priority_.push(input);
}
Expand All @@ -1452,9 +1453,9 @@ scheduler_tmpl_t<RecordType, ReaderType>::pop_from_ready_queue(
ready_priority_.push(save);
if (res != nullptr) {
VPRINT(this, 4,
"pop_from_ready_queue[%d]: input %d priority %d timestamp delta %" PRIu64
"\n",
for_output, res->index, res->priority,
"pop_from_ready_queue[%d] (post-size %zu): input %d priority %d timestamp "
"delta %" PRIu64 "\n",
for_output, ready_priority_.size(), res->index, res->priority,
res->reader->get_last_timestamp() - res->base_timestamp);
}
return res;
Expand Down Expand Up @@ -1491,6 +1492,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::set_cur_input(output_ordinal_t output,
return STATUS_OK;
std::lock_guard<std::mutex> lock(*inputs_[input].lock);
inputs_[input].instrs_in_quantum = 0;
inputs_[input].start_time_in_quantum = outputs_[output].cur_time;
if (options_.schedule_record_ostream != nullptr) {
uint64_t instr_ord = inputs_[input].reader->get_instruction_ordinal();
if (!inputs_[input].recorded_in_schedule && instr_ord == 1) {
Expand Down Expand Up @@ -1715,8 +1717,10 @@ template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
RecordType &record,
input_info_t *&input)
input_info_t *&input,
uint64_t cur_time)
{
outputs_[output].cur_time = cur_time;
if (!outputs_[output].active)
return sched_type_t::STATUS_WAIT;
if (outputs_[output].waiting) {
Expand All @@ -1732,6 +1736,10 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
}
input = &inputs_[outputs_[output].cur_input];
auto lock = std::unique_lock<std::mutex>(*input->lock);
// Since we do not ask for a start time, we have to check for the first record from
// each input and set the time here.
if (input->start_time_in_quantum == 0)
input->start_time_in_quantum = cur_time;
if (!outputs_[output].speculation_stack.empty()) {
outputs_[output].prev_speculate_pc = outputs_[output].speculate_pc;
error_string_ = outputs_[output].speculator.next_record(
Expand Down Expand Up @@ -1833,6 +1841,29 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
// over timestamp ordering.
need_new_input = true;
}
} else if (options_.quantum_unit == QUANTUM_TIME) {
// The above logic is for non-instrs, except the blocking syscall
// next instr which is already switching: so an else{} works here.
if (cur_time == 0 || cur_time < input->start_time_in_quantum) {
VPRINT(this, 1,
"next_record[%d]: invalid time %" PRIu64 " vs start %" PRIu64
"\n",
output, cur_time, input->start_time_in_quantum);
return sched_type_t::STATUS_INVALID;
}
if (cur_time - input->start_time_in_quantum >=
options_.quantum_duration &&
// We only switch on instruction boundaries. We could possibly switch
// in between (e.g., scatter/gather long sequence of reads/writes) by
// setting input->switching_pre_instruction.
record_type_is_instr(record)) {
VPRINT(this, 4,
"next_record[%d]: hit end of time quantum after %" PRIu64
" (%" PRIu64 " - %" PRIu64 ")\n",
output, cur_time - input->start_time_in_quantum, cur_time,
input->start_time_in_quantum);
need_new_input = true;
}
}
}
if (options_.deps == DEPENDENCY_TIMESTAMPS &&
Expand Down
21 changes: 13 additions & 8 deletions clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,8 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
/** Uses the instruction count as the quantum. */
QUANTUM_INSTRUCTIONS,
/**
* Uses the user's notion of time as the quantum.
* This must be supplied by the user by calling
* dynamorio::drmemtrace::scheduler_tmpl_t::stream_t::report_time()
* periodically.
* Uses the user's notion of time as the quantum. This must be supplied by the
* user by calling the next_record() variant that takes in the current time.
*/
QUANTUM_TIME,
};
Expand Down Expand Up @@ -536,11 +534,13 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
next_record(RecordType &record);

/**
* Reports the current time to the scheduler. This is unitless: it just needs to
* be called regularly and consistently. This is used for #QUANTUM_TIME.
* Advances to the next record in the stream. Returns a status code on whether
* and how to continue. Supplies the current time for #QUANTUM_TIME. The time is
* unitless but needs to be a globally consistent increasing value across all
* output streams. A 0 value for "cur_time" is not allowed.
*/
virtual stream_status_t
report_time(uint64_t cur_time);
next_record(RecordType &record, uint64_t cur_time);

/**
* Begins a diversion from the regular inputs to a side stream of records
Expand Down Expand Up @@ -886,6 +886,8 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
bool processing_blocking_syscall = false;
// Used to switch before we've read the next instruction.
bool switching_pre_instruction = false;
// Used for time-based quanta.
uint64_t start_time_in_quantum = 0;
};

// Format for recording a schedule to disk. A separate sequence of these records
Expand Down Expand Up @@ -972,6 +974,8 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
int record_index = 0;
bool waiting = false;
bool active = true;
// Used for time-based quanta.
uint64_t cur_time = 0;
};

// Called just once at initialization time to set the initial input-to-output
Expand Down Expand Up @@ -1007,7 +1011,8 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {

// Advances the 'output_ordinal'-th output stream.
stream_status_t
next_record(output_ordinal_t output, RecordType &record, input_info_t *&input);
next_record(output_ordinal_t output, RecordType &record, input_info_t *&input,
uint64_t cur_time = 0);

// Skips ahead to the next region of interest if necessary.
// The caller must hold the input.lock.
Expand Down
57 changes: 49 additions & 8 deletions clients/drcachesim/tests/scheduler_launcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
# define _UNICODE
# define WIN32_LEAN_AND_MEAN
# include <windows.h>
#else
# include <sys/time.h>
#endif

#include "droption.h"
Expand Down Expand Up @@ -87,8 +89,17 @@ droption_t<int> op_num_cores(DROPTION_SCOPE_ALL, "num_cores", 4, 0, 8192,
"Number of cores", "Number of cores");

droption_t<int64_t> op_sched_quantum(DROPTION_SCOPE_ALL, "sched_quantum", 1 * 1000 * 1000,
"Scheduling quantum in instructions",
"Scheduling quantum in instructions");
"Scheduling quantum",
"Scheduling quantum: in instructions by default; in "
"miroseconds if -sched_time is set.");

droption_t<bool> op_sched_time(DROPTION_SCOPE_ALL, "sched_time", false,
"Whether to use time for the scheduling quantum",
"Whether to use time for the scheduling quantum");

droption_t<bool> op_honor_stamps(DROPTION_SCOPE_ALL, "honor_stamps", true,
"Whether to honor recorded timestamps for ordering",
"Whether to honor recorded timestamps for ordering");

#ifdef HAS_ZIP
droption_t<std::string> op_record_file(DROPTION_SCOPE_FRONTEND, "record_file", "",
Expand All @@ -104,15 +115,38 @@ droption_t<std::string>
"Path with stored as-traced schedule for replay.");
#endif

uint64_t
get_current_microseconds()
{
#ifdef UNIX
struct timeval time;
if (gettimeofday(&time, nullptr) != 0)
return 0;
return time.tv_sec * 1000000 + time.tv_usec;
#else
SYSTEMTIME sys_time;
GetSystemTime(&sys_time);
FILETIME file_time;
if (!SystemTimeToFileTime(&sys_time, &file_time))
return 0;
return = file_time.dwLowDateTime +
(static_cast<uint64_t>(file_time.dwHighDateTime) << 32);
#endif
}

void
simulate_core(int ordinal, scheduler_t::stream_t *stream, const scheduler_t &scheduler,
std::vector<scheduler_t::input_ordinal_t> &thread_sequence)
{
memref_t record;
uint64_t micros = op_sched_time.get_value() ? get_current_microseconds() : 0;
// Thread ids can be duplicated, so use the input ordinals to distinguish.
scheduler_t::input_ordinal_t prev_input = scheduler_t::INVALID_INPUT_ORDINAL;
for (scheduler_t::stream_status_t status = stream->next_record(record);
status != scheduler_t::STATUS_EOF; status = stream->next_record(record)) {
for (scheduler_t::stream_status_t status = stream->next_record(record, micros);
status != scheduler_t::STATUS_EOF;
status = stream->next_record(record, micros)) {
if (op_sched_time.get_value())
micros = get_current_microseconds();
if (status == scheduler_t::STATUS_WAIT) {
std::this_thread::yield();
continue;
Expand Down Expand Up @@ -162,9 +196,12 @@ simulate_core(int ordinal, scheduler_t::stream_t *stream, const scheduler_t &sch
.get_input_stream_interface(stream->get_input_stream_ordinal())
->get_instruction_ordinal()
<< " instrs, time " << std::setw(16)
<< scheduler
.get_input_stream_interface(stream->get_input_stream_ordinal())
->get_last_timestamp()
<< (op_sched_time.get_value()
? micros
: scheduler
.get_input_stream_interface(
stream->get_input_stream_ordinal())
->get_last_timestamp())
<< " == thread " << record.instr.tid << "\n";
std::cerr << line.str();
}
Expand Down Expand Up @@ -198,9 +235,13 @@ _tmain(int argc, const TCHAR *targv[])
std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(op_trace_dir.get_value());
scheduler_t::scheduler_options_t sched_ops(
scheduler_t::MAP_TO_ANY_OUTPUT, scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::MAP_TO_ANY_OUTPUT,
op_honor_stamps.get_value() ? scheduler_t::DEPENDENCY_TIMESTAMPS
: scheduler_t::DEPENDENCY_IGNORE,
scheduler_t::SCHEDULER_DEFAULTS, op_verbose.get_value());
sched_ops.quantum_duration = op_sched_quantum.get_value();
if (op_sched_time.get_value())
sched_ops.quantum_unit = scheduler_t::QUANTUM_TIME;
#ifdef HAS_ZIP
std::unique_ptr<zipfile_ostream_t> record_zip;
std::unique_ptr<zipfile_istream_t> replay_zip;
Expand Down
Loading

0 comments on commit d49883f

Please sign in to comment.