Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loggerd: remove recursive call to handle_encoder_msg #28873

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 62 additions & 56 deletions system/loggerd/loggerd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ void rotate_if_needed(LoggerdState *s) {
}

struct RemoteEncoder {
int write_encode_data(LoggerdState *s, const cereal::Event::Reader event, std::string &name, const EncoderInfo &encoder_info);

std::unique_ptr<VideoWriter> writer;
int encoderd_segment_offset;
int current_segment = -1;
Expand All @@ -58,15 +60,64 @@ struct RemoteEncoder {
bool seen_first_packet = false;
};

int RemoteEncoder::write_encode_data(LoggerdState *s, const cereal::Event::Reader event, std::string &name, const EncoderInfo &encoder_info) {
auto edata = (event.*(encoder_info.get_encode_data_func))();
const auto idx = edata.getIdx();
const bool is_key_frame = idx.getFlags() & V4L2_BUF_FLAG_KEYFRAME;

// if we aren't recording yet, try to start, since we are in the correct segment
if (!recording) {
if (is_key_frame) {
// only create on iframe
if (dropped_frames) {
// this should only happen for the first segment, maybe
LOGW("%s: dropped %d non iframe packets before init", name.c_str(), dropped_frames);
dropped_frames = 0;
}
// if we aren't actually recording, don't create the writer
if (encoder_info.record) {
writer.reset(new VideoWriter(s->segment_path,
encoder_info.filename, idx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C,
encoder_info.frame_width, encoder_info.frame_height, encoder_info.fps, idx.getType()));
// write the header
auto header = edata.getHeader();
writer->write((uint8_t *)header.begin(), header.size(), idx.getTimestampEof() / 1000, true, false);
}
recording = true;
} else {
// this is a sad case when we aren't recording, but don't have an iframe
// nothing we can do but drop the frame
++dropped_frames;
return 0;
}
}

// we have to be recording if we are here
assert(recording);

// if we are actually writing the video file, do so
if (writer) {
auto data = edata.getData();
writer->write((uint8_t *)data.begin(), data.size(), idx.getTimestampEof() / 1000, false, is_key_frame);
}

// put it in log stream as the idx packet
MessageBuilder msg;
auto evt = msg.initEvent(event.getValid());
evt.setLogMonoTime(event.getLogMonoTime());
(evt.*(encoder_info.set_encode_idx_func))(idx);
auto bytes = msg.toBytes();
logger_log(&s->logger, (uint8_t *)bytes.begin(), bytes.size(), true); // always in qlog?
return bytes.size();
}

int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct RemoteEncoder &re, const EncoderInfo &encoder_info) {
int bytes_count = 0;

// extract the message
capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr<capnp::word>((capnp::word *)msg->getData(), msg->getSize() / sizeof(capnp::word)));
auto event = cmsg.getRoot<cereal::Event>();
auto edata = (event.*(encoder_info.get_encode_data_func))();
auto idx = edata.getIdx();
auto flags = idx.getFlags();
auto idx = (event.*(encoder_info.get_encode_data_func))().getIdx();

// encoderd can have started long before loggerd
if (!re.seen_first_packet) {
Expand All @@ -78,7 +129,6 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct

if (offset_segment_num == s->rotate_segment) {
// loggerd is now on the segment that matches this packet

// if this is a new segment, we close any possible old segments, move to the new, and process any queued packets
if (re.current_segment != s->rotate_segment) {
if (re.recording) {
Expand All @@ -87,62 +137,18 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct
}
re.current_segment = s->rotate_segment;
re.marked_ready_to_rotate = false;
// we are in this segment now, process any queued messages before this one
if (!re.q.empty()) {
for (auto &qmsg: re.q) {
bytes_count += handle_encoder_msg(s, qmsg, name, re, encoder_info);
}
re.q.clear();
}
}

// if we aren't recording yet, try to start, since we are in the correct segment
if (!re.recording) {
if (flags & V4L2_BUF_FLAG_KEYFRAME) {
// only create on iframe
if (re.dropped_frames) {
// this should only happen for the first segment, maybe
LOGW("%s: dropped %d non iframe packets before init", name.c_str(), re.dropped_frames);
re.dropped_frames = 0;
}
// if we aren't actually recording, don't create the writer
if (encoder_info.record) {
re.writer.reset(new VideoWriter(s->segment_path,
encoder_info.filename, idx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C,
encoder_info.frame_width, encoder_info.frame_height, encoder_info.fps, idx.getType()));
// write the header
auto header = edata.getHeader();
re.writer->write((uint8_t *)header.begin(), header.size(), idx.getTimestampEof()/1000, true, false);
}
re.recording = true;
} else {
// this is a sad case when we aren't recording, but don't have an iframe
// nothing we can do but drop the frame
delete msg;
++re.dropped_frames;
return bytes_count;
// we are in this segment now, process any queued messages before this one
if (!re.q.empty()) {
for (auto &qmsg : re.q) {
capnp::FlatArrayMessageReader msg_reader({(capnp::word *)qmsg->getData(), qmsg->getSize() / sizeof(capnp::word)});
bytes_count += re.write_encode_data(s, msg_reader.getRoot<cereal::Event>(), name, encoder_info);
delete qmsg;
}
re.q.clear();
}

// we have to be recording if we are here
assert(re.recording);

// if we are actually writing the video file, do so
if (re.writer) {
auto data = edata.getData();
re.writer->write((uint8_t *)data.begin(), data.size(), idx.getTimestampEof()/1000, false, flags & V4L2_BUF_FLAG_KEYFRAME);
}

// put it in log stream as the idx packet
MessageBuilder bmsg;
auto evt = bmsg.initEvent(event.getValid());
evt.setLogMonoTime(event.getLogMonoTime());
(evt.*(encoder_info.set_encode_idx_func))(idx);
auto new_msg = bmsg.toBytes();
logger_log(&s->logger, (uint8_t *)new_msg.begin(), new_msg.size(), true); // always in qlog?
bytes_count += new_msg.size();

// free the message, we used it
bytes_count = re.write_encode_data(s, event, name, encoder_info);
delete msg;
} else if (offset_segment_num > s->rotate_segment) {
// encoderd packet has a newer segment, this means encoderd has rolled over
Expand Down