Skip to content

Commit

Permalink
i#3230 split traces: refactor for buffer-based use (#3300)
Browse files Browse the repository at this point in the history
Refactors raw2trace_t for trace-buffer-driven use rather than file-driven use:
+ Separates out a new function process_next_thread_buffer() from
  process_thread_file().
+ check_thread_file() now puts the header back, so it can be
  universally processed in process_next_thread_buffer() (this seek is
  just once per traced thread so there are no performance concerns
  like in PR #2749).
+ The thread footer is now written out in on_thread_end(), leaving
  process_thread_file() only performing file-specific actions.
+ Removes {un,}read_from_thread_file(), merging them into
  {unread,get}_next_entry().
+ Marks key functions as virtual and protected for subclass support.

Issue: #3230
  • Loading branch information
derekbruening authored Dec 7, 2018
1 parent 6c9491d commit be62e6a
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 163 deletions.
250 changes: 129 additions & 121 deletions clients/drcachesim/tracer/raw2trace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,98 +406,30 @@ module_mapper_t::find_mapped_trace_address(app_pc trace_address)
return nullptr;
}

/***************************************************************************
* Disassembly to fill in instr and memref entries
*/

// We do our own buffering to avoid performance problems for some istreams where
// seekg is slow. We expect just 1 entry peeked and put back the vast majority of the
// time, but we use a vector for generality. We expect our overall performance to
// be i/o bound (or ISA decode bound) and aren't worried about some extra copies
// from the vector.
bool
raw2trace_t::read_from_thread_file(void *tls, offline_entry_t *dest, size_t count,
OUT size_t *num_read)
{
auto tdata = reinterpret_cast<raw2trace_thread_data_t *>(tls);
size_t from_buf = 0;
if (!tdata->pre_read.empty()) {
from_buf = (std::min)(tdata->pre_read.size(), count);
memcpy(dest, &tdata->pre_read[0], from_buf * sizeof(*dest));
tdata->pre_read.erase(tdata->pre_read.begin(),
tdata->pre_read.begin() + from_buf);
dest += from_buf;
count -= from_buf;
}
if (count > 0) {
if (!tdata->thread_file->read((char *)dest, count * sizeof(*dest))) {
if (num_read != nullptr)
*num_read = from_buf + (size_t)tdata->thread_file->gcount();
return false;
}
}
if (num_read != nullptr)
*num_read = from_buf + count;
return true;
}

void
raw2trace_t::unread_from_thread_file(void *tls, offline_entry_t *dest, size_t count)
{
auto tdata = reinterpret_cast<raw2trace_thread_data_t *>(tls);
// We expect 1 the vast majority of the time, 2 occasionally.
for (size_t i = 0; i < count; ++i)
tdata->pre_read.push_back(*(dest + i));
}

bool
raw2trace_t::thread_file_at_eof(void *tls)
{
auto tdata = reinterpret_cast<raw2trace_thread_data_t *>(tls);
return tdata->pre_read.empty() && tdata->thread_file->eof();
}

std::string
raw2trace_t::append_delayed_branch(void *tls)
{
auto tdata = reinterpret_cast<raw2trace_thread_data_t *>(tls);
if (tdata->delayed_branch.empty())
return "";
VPRINT(4, "Appending delayed branch for thread %d\n", tdata->index);
if (!tdata->out_file->write(&tdata->delayed_branch[0], tdata->delayed_branch.size()))
return "Failed to write to output file";
tdata->delayed_branch.clear();
return "";
}

/***************************************************************************
* Top-level
*/

std::string
raw2trace_t::process_thread_file(raw2trace_thread_data_t *tdata)
raw2trace_t::process_header(raw2trace_thread_data_t *tdata)
{
offline_entry_t in_entry;
bool last_bb_handled = true;

trace_entry_t entry;
entry.type = TRACE_TYPE_HEADER;
entry.size = 0;
entry.addr = TRACE_ENTRY_VERSION;
if (!tdata->out_file->write((char *)&entry, sizeof(entry))) {
tdata->error = "Failed to write header to output file";
return tdata->error;
}
if (!tdata->out_file->write((char *)&entry, sizeof(entry)))
return "Failed to write header to output file";

// First read the tid and pid entries which precede any timestamps.
trace_header_t header = { static_cast<process_id_t>(INVALID_PROCESS_ID),
INVALID_THREAD_ID, 0 };
tdata->error = read_header(tdata, &header);
if (!tdata->error.empty())
return tdata->error;
std::string error = read_header(tdata, &header);
if (!error.empty())
return error;
VPRINT(2, "File %u is thread %u\n", tdata->index, (uint)header.tid);
VPRINT(2, "File %u is process %u\n", tdata->index, (uint)header.pid);
thread_id_t tid = header.tid;
tdata->tid = tid;
process_id_t pid = header.pid;
DR_ASSERT(tid != INVALID_THREAD_ID);
DR_ASSERT(pid != (process_id_t)INVALID_PROCESS_ID);
Expand All @@ -510,37 +442,45 @@ raw2trace_t::process_thread_file(raw2trace_thread_data_t *tdata)
buf += trace_metadata_writer_t::write_timestamp(buf, (uintptr_t)header.timestamp);
// We have to write this now before we append any bb entries.
CHECK((uint)(buf - buf_base) < WRITE_BUFFER_SIZE, "Too many entries");
if (!tdata->out_file->write((char *)buf_base, buf - buf_base)) {
tdata->error = "Failed to write to output file";
return tdata->error;
}
if (!tdata->out_file->write((char *)buf_base, buf - buf_base))
return "Failed to write to output file";
return "";
}

std::string
raw2trace_t::process_next_thread_buffer(raw2trace_thread_data_t *tdata,
OUT bool *end_of_record)
{
// We now convert each offline entry into a trace_entry_t.
// We fill in instr entries and memref type and size.
bool end_of_file = false;
while (!end_of_file) {
VPRINT(4, "About to read thread %d at pos %d\n", (uint)tid,
(int)tdata->thread_file->tellg());
if (!read_from_thread_file(tdata, &in_entry, 1)) {
if (thread_file_at_eof(tdata)) {
// Rather than a fatal error we try to continue to provide partial
// results in case the disk was full or there was some other issue.
WARN("Input file for thread %d is truncated", (uint)tid);
in_entry.extended.type = OFFLINE_TYPE_EXTENDED;
in_entry.extended.ext = OFFLINE_EXT_TYPE_FOOTER;
} else {
std::stringstream ss;
ss << "Failed to read from file for thread " << (uint)tid;
tdata->error = ss.str();
const offline_entry_t *in_entry = get_next_entry(tdata);
if (!tdata->saw_header) {
// We look for the initial header here rather than the top of
// process_thread_file() to support use cases where buffers are passed from
// another source.
tdata->saw_header =
trace_metadata_reader_t::is_thread_start(in_entry, &tdata->error);
if (!tdata->error.empty())
return tdata->error;
if (tdata->saw_header) {
tdata->error = process_header(tdata);
if (!tdata->error.empty())
return tdata->error;
}
}
if (in_entry.timestamp.type == OFFLINE_TYPE_TIMESTAMP) {
VPRINT(2, "Thread %u timestamp 0x" ZHEX64_FORMAT_STRING "\n", (uint)tid,
(uint64)in_entry.timestamp.usec);
buf = buf_base +
trace_metadata_writer_t::write_timestamp(
buf_base, (uintptr_t)in_entry.timestamp.usec);
in_entry = get_next_entry(tdata);
}
byte *buf_base = reinterpret_cast<byte *>(get_write_buffer(tdata));
bool last_bb_handled = true;
for (; in_entry != nullptr; in_entry = get_next_entry(tdata)) {
// Make a copy to avoid clobbering the entry we pass to process_offline_entry()
// when it calls get_next_entry() on its own.
offline_entry_t entry = *in_entry;
if (entry.timestamp.type == OFFLINE_TYPE_TIMESTAMP) {
VPRINT(2, "Thread %u timestamp 0x" ZHEX64_FORMAT_STRING "\n",
(uint)tdata->tid, (uint64)entry.timestamp.usec);
byte *buf = buf_base +
trace_metadata_writer_t::write_timestamp(buf_base,
(uintptr_t)entry.timestamp.usec);
CHECK((uint)(buf - buf_base) < WRITE_BUFFER_SIZE, "Too many entries");
if (!tdata->out_file->write((char *)buf_base, buf - buf_base)) {
tdata->error = "Failed to write to output file";
Expand All @@ -550,25 +490,52 @@ raw2trace_t::process_thread_file(raw2trace_thread_data_t *tdata)
}
// Append any delayed branch, but not until we output all markers to
// ensure we group them all with the timestamp for this thread segment.
if (in_entry.extended.type != OFFLINE_TYPE_EXTENDED ||
in_entry.extended.ext != OFFLINE_EXT_TYPE_MARKER) {
if (entry.extended.type != OFFLINE_TYPE_EXTENDED ||
entry.extended.ext != OFFLINE_EXT_TYPE_MARKER) {
tdata->error = append_delayed_branch(tdata);
if (!tdata->error.empty())
return tdata->error;
}
tdata->error =
process_offline_entry(tdata, &in_entry, tid, &end_of_file, &last_bb_handled);
tdata->error = process_offline_entry(tdata, &entry, tdata->tid, end_of_record,
&last_bb_handled);
if (!tdata->error.empty())
return tdata->error;
}
tdata->error = "";
return "";
}

entry.type = TRACE_TYPE_FOOTER;
entry.size = 0;
entry.addr = 0;
if (!tdata->out_file->write((char *)&entry, sizeof(entry))) {
tdata->error = "Failed to write footer to output file";
return tdata->error;
std::string
raw2trace_t::process_thread_file(raw2trace_thread_data_t *tdata)
{
bool end_of_file = false;
while (!end_of_file) {
VPRINT(4, "About to read thread #%d==%d at pos %d\n", tdata->index,
(uint)tdata->tid, (int)tdata->thread_file->tellg());
tdata->error = process_next_thread_buffer(tdata, &end_of_file);
if (!tdata->error.empty()) {
if (thread_file_at_eof(tdata)) {
// Rather than a fatal error we try to continue to provide partial
// results in case the disk was full or there was some other issue.
WARN("Input file for thread %d is truncated", (uint)tdata->tid);
offline_entry_t entry;
entry.extended.type = OFFLINE_TYPE_EXTENDED;
entry.extended.ext = OFFLINE_EXT_TYPE_FOOTER;
bool last_bb_handled = true;
tdata->error = process_offline_entry(tdata, &entry, tdata->tid,
&end_of_file, &last_bb_handled);
CHECK(end_of_file, "Synthetic footer failed");
if (!tdata->error.empty())
return tdata->error;
} else {
std::stringstream ss;
ss << "Failed to process file for thread " << (uint)tdata->tid;
tdata->error = ss.str();
return tdata->error;
}
}
}
// The footer is written out by on_thread_end().
tdata->error = "";
return "";
}
Expand All @@ -581,6 +548,8 @@ raw2trace_t::check_thread_file(std::istream *f)
if (!f->read((char *)&ver_entry, sizeof(ver_entry))) {
return "Unable to read thread log file";
}
// Put it back.
f->seekg(-(std::streamoff)sizeof(ver_entry), f->cur);
return trace_metadata_reader_t::check_entry_thread_start(&ver_entry);
}

Expand Down Expand Up @@ -741,17 +710,48 @@ instr_summary_t::construct(void *dcontext, INOUT app_pc *pc, app_pc orig_pc,
const offline_entry_t *
raw2trace_t::get_next_entry(void *tls)
{
// We do our own buffering to avoid performance problems for some istreams where
// seekg is slow. We expect just 1 entry peeked and put back the vast majority of the
// time, but we use a vector for generality. We expect our overall performance to
// be i/o bound (or ISA decode bound) and aren't worried about some extra copies
// from the vector.
auto tdata = reinterpret_cast<raw2trace_thread_data_t *>(tls);
if (!read_from_thread_file(tdata, &tdata->last_entry, 1))
return nullptr;
if (!tdata->pre_read.empty()) {
tdata->last_entry = tdata->pre_read[0];
tdata->pre_read.erase(tdata->pre_read.begin(), tdata->pre_read.begin() + 1);
} else {
if (!tdata->thread_file->read((char *)&tdata->last_entry,
sizeof(tdata->last_entry)))
return nullptr;
}
return &tdata->last_entry;
}

void
raw2trace_t::unread_last_entry(void *tls)
{
auto tdata = reinterpret_cast<raw2trace_thread_data_t *>(tls);
unread_from_thread_file(tdata, &tdata->last_entry, 1);
tdata->pre_read.push_back(tdata->last_entry);
}

bool
raw2trace_t::thread_file_at_eof(void *tls)
{
auto tdata = reinterpret_cast<raw2trace_thread_data_t *>(tls);
return tdata->pre_read.empty() && tdata->thread_file->eof();
}

std::string
raw2trace_t::append_delayed_branch(void *tls)
{
auto tdata = reinterpret_cast<raw2trace_thread_data_t *>(tls);
if (tdata->delayed_branch.empty())
return "";
VPRINT(4, "Appending delayed branch for thread %d\n", tdata->index);
if (!tdata->out_file->write(&tdata->delayed_branch[0], tdata->delayed_branch.size()))
return "Failed to write to output file";
tdata->delayed_branch.clear();
return "";
}

trace_entry_t *
Expand Down Expand Up @@ -782,14 +782,26 @@ raw2trace_t::write_delayed_branches(void *tls, const trace_entry_t *start,
return "";
}

std::string
raw2trace_t::write_footer(void *tls)
{
auto tdata = reinterpret_cast<raw2trace_thread_data_t *>(tls);
trace_entry_t entry;
entry.type = TRACE_TYPE_FOOTER;
entry.size = 0;
entry.addr = 0;
if (!tdata->out_file->write((char *)&entry, sizeof(entry)))
return "Failed to write footer to output file";
return "";
}

std::string
raw2trace_t::on_thread_end(void *tls)
{
auto tdata = reinterpret_cast<raw2trace_thread_data_t *>(tls);
offline_entry_t entry;
if (read_from_thread_file(tdata, &entry, 1) || !thread_file_at_eof(tdata))
if (get_next_entry(tdata) != nullptr || !thread_file_at_eof(tdata))
return "Footer is not the final entry";
return "";
return write_footer(tdata);
}

void
Expand Down Expand Up @@ -840,12 +852,8 @@ raw2trace_t::raw2trace_t(const char *module_map_in,
thread_data.resize(thread_files_in.size());
for (size_t i = 0; i < thread_data.size(); ++i) {
thread_data[i].index = static_cast<int>(i);
thread_data[i].worker = 0;
thread_data[i].thread_file = thread_files_in[i];
thread_data[i].out_file = out_files_in[i];
thread_data[i].prev_instr_was_rep_string = false;
thread_data[i].last_decode_pc = nullptr;
thread_data[i].last_summary = nullptr;
}
// Since we know the traced-thread count up front, we use a simple round-robin
// static work assigment. This won't be as load balanced as a dynamic work
Expand Down
Loading

0 comments on commit be62e6a

Please sign in to comment.