Skip to content

Commit

Permalink
i#5694 core-sharded: Add get_shard_index() + get_tid() (#6568)
Browse files Browse the repository at this point in the history
Adds 2 new memtrace_stream_t interfaces to simplify generalizing tools
to handle either thread or core sharded operation:

+ get_shard_index() returns a 0-based shard ordinal regardless of
whether core-sharded or thread-sharded.
+ get_tid() returns the thread id of the current input. This is a
convenience method for use in parallel_shard_init_stream() prior to
access to any memref_t records.

For online analysis where there's a single input, the scheduler
remembers and returns the last memref.data.tid for get_tid() and uses
the dynamic tid discovery order for get_shard_index().

Changes an existing interface:

+ Guarantees that the shard_index passed to parallel_shard_init_stream()
is a 0-based ordinal.

Implements the 2 new interfaces in the scheduler and adds two new
interface there:

+ get_output_stream_ordinal() to get the underlying output when using
single_lockstep_output.
+ get_output_cpuid(ord) taking in an ordinal so the analyzer or other
user can get the cpuids statically when using single_lockstep_output.
Analysis tools must dynamically discover the cpuids (stopped short of
making this a memtrace_stream_t interface, as analysis tools in general
must dynamically discover most things already).

Removes dr$sim's manual mapping of cpuid to core index in favor of using
the new get_shard_index().

Updates all the analysis tools to use the new interfaces and to
generalize their code to either handle both thread and core shards
(reuse_time, reuse_distance, basic_counts, histogram, opcode_mix,
syscall_mix, record_filter) or explicitly return an error for
core-sharded modes (func_view, invariant_checker). (schedule_stats and
record_filter needed no changes.)

Updates several unit tests to handle these changes:
+ Expands the default_memtrace_stream_t to be suitable as a mock stream
for unit tests with the new interfaces.
+ Skips invariant stream checks for the mock stream by checking its
input interface, since the stream itself is no longer null.
+ Fixes drcachesim unit tests which were not initializing tid.   

Adds some sanity tests on the new interfaces.

Adds a new end-to-end test running the newly-updated tools as
-core_sharded. Limits the reuse_time histogram printing output to avoid
hanging CMake's regex matcher in this test.

Issue: #5694
  • Loading branch information
derekbruening authored Jan 25, 2024
1 parent e982665 commit 910f82d
Show file tree
Hide file tree
Showing 30 changed files with 560 additions and 214 deletions.
5 changes: 3 additions & 2 deletions clients/drcachesim/analysis_tool.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* **********************************************************
* Copyright (c) 2016-2023 Google, Inc. All rights reserved.
* Copyright (c) 2016-2024 Google, Inc. All rights reserved.
* **********************************************************/

/*
Expand Down Expand Up @@ -387,7 +387,8 @@ template <typename RecordType> class analysis_tool_tmpl_t {
/**
* Invoked once for each trace shard prior to calling parallel_shard_memref() for
* that shard, this allows a tool to create data local to a shard. The \p
* shard_index is a unique identifier allowing shard data to be stored into a global
* shard_index is the 0-based ordinal of the shard, serving as a unique identifier
* allowing shard data to be stored into a global
* table if desired (typically for aggregation use in print_results()). The \p
* worker_data is the return value of parallel_worker_init() for the worker thread
* who will exclusively operate on this shard. The \p shard_stream allows tools to
Expand Down
7 changes: 4 additions & 3 deletions clients/drcachesim/analyzer.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* **********************************************************
* Copyright (c) 2016-2023 Google, Inc. All rights reserved.
* Copyright (c) 2016-2024 Google, Inc. All rights reserved.
* **********************************************************/

/*
Expand Down Expand Up @@ -257,8 +257,9 @@ analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler(
return false;
}
std::vector<typename sched_type_t::input_reader_t> readers;
// With no modifiers or only_threads the tid doesn't matter.
readers.emplace_back(std::move(reader), std::move(reader_end), /*tid=*/1);
// Use a sentinel for the tid so the scheduler will use the memref record tid.
readers.emplace_back(std::move(reader), std::move(reader_end),
/*tid=*/INVALID_THREAD_ID);
std::vector<typename sched_type_t::range_t> regions;
if (skip_instrs_ > 0)
regions.emplace_back(skip_instrs_ + 1, 0);
Expand Down
78 changes: 75 additions & 3 deletions clients/drcachesim/common/memtrace_stream.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* **********************************************************
* Copyright (c) 2022-2023 Google, Inc. All rights reserved.
* Copyright (c) 2022-2024 Google, Inc. All rights reserved.
* **********************************************************/

/*
Expand Down Expand Up @@ -47,6 +47,7 @@

#include <cstdint>
#include <string>
#include <unordered_map>

/**
* @file drmemtrace/memtrace_stream.h
Expand Down Expand Up @@ -155,10 +156,25 @@ class memtrace_stream_t {
return false;
}

/**
* Returns the 0-based ordinal for the current shard. For parallel analysis,
* this equals the \p shard_index passed to parallel_shard_init_stream().
* This is more useful for serial modes where there is no other convenience mechanism
* to determine such an index; it allows a tool to compute per-shard results even in
* serial mode. The shard orderings in serial mode may not always mach the ordering
* in parallel mode. If not implemented, -1 is returned.
*/
virtual int
get_shard_index() const
{
return -1;
}

/**
* Returns a unique identifier for the current "output cpu". Generally this only
* applies when using #SHARD_BY_CORE. For dynamic schedules, the identifier is
* typically an output cpu ordinal. For replaying an as-traced schedule, the
* typically an output cpu ordinal equal to get_shard_index(). For replaying an
* as-traced schedule, the
* identifier is typically the original input cpu which is now mapped directly
* to this output. If not implemented for the current mode, -1 is returned.
*/
Expand Down Expand Up @@ -192,6 +208,17 @@ class memtrace_stream_t {
return -1;
}

/**
* Returns the thread identifier for the current input trace.
* This is a convenience method for use in parallel_shard_init_stream()
* prior to access to any #memref_t records.
*/
virtual int64_t
get_tid() const
{
return -1;
}

/**
* Returns the stream interface for the current input trace. This differs from
* "this" for #SHARD_BY_CORE where multiple inputs are interleaved on one
Expand Down Expand Up @@ -284,8 +311,53 @@ class default_memtrace_stream_t : public memtrace_stream_t {
return 0;
}

void
set_output_cpuid(int64_t cpuid)
{
cpuid_ = cpuid;
}
int64_t
get_output_cpuid() const override
{
return cpuid_;
}
void
set_shard_index(int index)
{
shard_ = index;
}
int
get_shard_index() const override
{
return shard_;
}
// Also sets the shard index to the dynamic-discovery-order tid ordinal.
void
set_tid(int64_t tid)
{
tid_ = tid;
auto exists = tid2shard_.find(tid);
if (exists == tid2shard_.end()) {
int index = static_cast<int>(tid2shard_.size());
tid2shard_[tid] = index;
set_shard_index(index);
} else {
set_shard_index(exists->second);
}
}
int64_t
get_tid() const override
{
return tid_;
}

private:
uint64_t *record_ordinal_;
uint64_t *record_ordinal_ = nullptr;
int64_t cpuid_ = 0;
int shard_ = 0;
int64_t tid_ = 0;
// To let a test set just the tid and get a shard index for free.
std::unordered_map<int64_t, int> tid2shard_;
};

} // namespace drmemtrace
Expand Down
40 changes: 39 additions & 1 deletion clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1373,6 +1373,43 @@ scheduler_tmpl_t<RecordType, ReaderType>::get_input_ordinal(output_ordinal_t out
return outputs_[output].cur_input;
}

template <typename RecordType, typename ReaderType>
int64_t
scheduler_tmpl_t<RecordType, ReaderType>::get_tid(output_ordinal_t output)
{
int index = outputs_[output].cur_input;
if (index < 0)
return -1;
if (inputs_[index].is_combined_stream())
return inputs_[index].last_record_tid;
return inputs_[index].tid;
}

template <typename RecordType, typename ReaderType>
int
scheduler_tmpl_t<RecordType, ReaderType>::get_shard_index(output_ordinal_t output)
{
if (output < 0 || output >= static_cast<output_ordinal_t>(outputs_.size()))
return -1;
if (TESTANY(sched_type_t::SCHEDULER_USE_INPUT_ORDINALS |
sched_type_t::SCHEDULER_USE_SINGLE_INPUT_ORDINALS,
options_.flags)) {
if (inputs_.size() == 1 && inputs_[0].is_combined_stream()) {
int index;
memref_tid_t tid = get_tid(output);
auto exists = tid2shard_.find(tid);
if (exists == tid2shard_.end()) {
index = static_cast<int>(tid2shard_.size());
tid2shard_[tid] = index;
} else
index = exists->second;
return index;
}
return get_input_ordinal(output);
}
return output;
}

template <typename RecordType, typename ReaderType>
int
scheduler_tmpl_t<RecordType, ReaderType>::get_workload_ordinal(output_ordinal_t output)
Expand All @@ -1398,7 +1435,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::is_record_synthetic(output_ordinal_t o

template <typename RecordType, typename ReaderType>
int64_t
scheduler_tmpl_t<RecordType, ReaderType>::get_output_cpuid(output_ordinal_t output)
scheduler_tmpl_t<RecordType, ReaderType>::get_output_cpuid(output_ordinal_t output) const
{
if (options_.replay_as_traced_istream != nullptr)
return outputs_[output].as_traced_cpuid;
Expand Down Expand Up @@ -2575,6 +2612,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
VDO(this, 4, print_record(record););

outputs_[output].last_record = record;
record_type_has_tid(record, input->last_record_tid);
return sched_type_t::STATUS_OK;
}

Expand Down
83 changes: 78 additions & 5 deletions clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* **********************************************************
* Copyright (c) 2023 Google, Inc. All rights reserved.
* Copyright (c) 2023-2024 Google, Inc. All rights reserved.
* **********************************************************/

/*
Expand Down Expand Up @@ -342,6 +342,11 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
* must be specified.
* The original as-traced cpuid that is mapped to each output stream can be
* obtained by calling the get_output_cpuid() function on each stream.
*
* An alternative use of this mapping is with a single output to interleave
* inputs in a strict timestamp order, as with make_scheduler_serial_options(),
* without specifying a schedule file and without recreating core mappings:
* only timestamps are honored.
*/
MAP_TO_RECORDED_OUTPUT,
/**
Expand Down Expand Up @@ -435,14 +440,17 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
* Causes the get_record_ordinal() and get_instruction_ordinal() results
* for an output stream to equal those values for the current input stream
* for that output, rather than accumulating across inputs.
* This also changes the behavior of get_shard_index() as documented under that
* function.
*/
SCHEDULER_USE_INPUT_ORDINALS = 0x4,
// This was added for the analyzer view tool on a single trace specified via
// a directory where the analyzer isn't listing the dir so it doesn't know
// whether to request SCHEDULER_USE_INPUT_ORDINALS.
/**
* If there is just one input and just one output stream, this sets
* #SCHEDULER_USE_INPUT_ORDINALS; otherwise, it has no effect.
* #SCHEDULER_USE_INPUT_ORDINALS. In all cases, this changes the behavior
* of get_shard_index() as documented under that function.
*/
SCHEDULER_USE_SINGLE_INPUT_ORDINALS = 0x8,
// TODO i#5843: Add more speculation flags for other strategies.
Expand Down Expand Up @@ -919,6 +927,16 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
return static_cast<int64_t>(get_input_stream_ordinal());
}

/**
* Returns the thread identifier for the current input stream feeding this
* output stream.
*/
int64_t
get_tid() const override
{
return scheduler_->get_tid(ordinal_);
}

/**
* Returns the #dynamorio::drmemtrace::memtrace_stream_t interface for the
* current input stream feeding this output stream.
Expand All @@ -929,6 +947,34 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
return scheduler_->get_input_stream_interface(get_input_stream_ordinal());
}

/**
* Returns the ordinal for the current output stream. If
* #dynamorio::drmemtrace::scheduler_tmpl_t::scheduler_options_t::
* single_lockstep_output
* is set to true, this returns the ordinal of the currently active "inner"
* output stream. Otherwise, this returns the constant ordinal for this output
* stream as there is no concept of inner or outer streams.
*/
output_ordinal_t
get_output_stream_ordinal() const
{
return ordinal_;
}

/**
* For #SCHEDULER_USE_INPUT_ORDINALS or
* #SCHEDULER_USE_SINGLE_INPUT_ORDINALS, returns the input stream ordinal, except
* for the case of a single combined-stream input with the passed-in thread id
* set to INVALID_THREAD_ID (the serial analysis mode for analyzer tools) in
* which case the last trace record's tid is returned; otherwise returns the
* output stream ordinal.
*/
int
get_shard_index() const override
{
return scheduler_->get_shard_index(ordinal_);
}

/**
* Returns whether the current record is from a part of the trace corresponding
* to kernel execution.
Expand Down Expand Up @@ -1016,6 +1062,15 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
return inputs_[input].reader->get_stream_name();
}

/**
* Returns the get_output_cpuid() value for the given output.
* This interface is exported so that a user can get the cpuids at initialization
* time when using single_lockstep_output where there is just one output stream
* even with multiple output cpus.
*/
int64_t
get_output_cpuid(output_ordinal_t output) const;

/** Returns a string further describing an error code. */
std::string
get_error_string() const
Expand All @@ -1041,6 +1096,12 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
: lock(new std::mutex)
{
}
bool
is_combined_stream()
{
// If the tid is invalid, this is a combined stream (online analysis mode).
return tid == INVALID_THREAD_ID;
}
int index = -1; // Position in inputs_ vector.
std::unique_ptr<ReaderType> reader;
std::unique_ptr<ReaderType> reader_end;
Expand All @@ -1053,7 +1114,9 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
// A tid can be duplicated across workloads so we need the pair of
// workload index + tid to identify the original input.
int workload = -1;
// If left invalid, this is a combined stream (online analysis mode).
memref_tid_t tid = INVALID_THREAD_ID;
memref_tid_t last_record_tid = INVALID_THREAD_ID;
// If non-empty these records should be returned before incrementing the reader.
// This is used for read-ahead and inserting synthetic records.
// We use a deque so we can iterate over it.
Expand Down Expand Up @@ -1380,6 +1443,16 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
input_ordinal_t
get_input_ordinal(output_ordinal_t output);

// Returns the thread identifier for the current input stream scheduled on
// the 'output_ordinal'-th output stream.
int64_t
get_tid(output_ordinal_t output);

// Returns the shard index for the current input stream scheduled on
// the 'output_ordinal'-th output stream.
int
get_shard_index(output_ordinal_t output);

// Returns the workload ordinal value for the current input stream scheduled on
// the 'output_ordinal'-th output stream.
int
Expand All @@ -1390,9 +1463,6 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
bool
is_record_synthetic(output_ordinal_t output);

int64_t
get_output_cpuid(output_ordinal_t output);

// Returns the direct handle to the current input stream interface for the
// 'output_ordinal'-th output stream.
memtrace_stream_t *
Expand Down Expand Up @@ -1528,6 +1598,9 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
switch_sequence_;
// For single_lockstep_output.
std::unique_ptr<stream_t> global_stream_;
// For online where we currently have to map dynamically observed thread ids
// to the 0-based shard index.
std::unordered_map<memref_tid_t, int> tid2shard_;
};

/** See #dynamorio::drmemtrace::scheduler_tmpl_t. */
Expand Down
Loading

0 comments on commit 910f82d

Please sign in to comment.