Skip to content

Commit

Permalink
remove recursive call to handle_encoder_msg
Browse files Browse the repository at this point in the history
  • Loading branch information
deanlee committed Jul 11, 2023
1 parent 69e7f38 commit 2933538
Showing 1 changed file with 62 additions and 56 deletions.
118 changes: 62 additions & 56 deletions system/loggerd/loggerd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ void rotate_if_needed(LoggerdState *s) {
}

struct RemoteEncoder {
int write_encode_data(LoggerdState *s, const cereal::Event::Reader event, std::string &name, const EncoderInfo &encoder_info);

std::unique_ptr<VideoWriter> writer;
int encoderd_segment_offset;
int current_segment = -1;
Expand All @@ -58,15 +60,64 @@ struct RemoteEncoder {
bool seen_first_packet = false;
};

int RemoteEncoder::write_encode_data(LoggerdState *s, const cereal::Event::Reader event, std::string &name, const EncoderInfo &encoder_info) {
auto edata = (event.*(encoder_info.get_encode_data_func))();
const auto idx = edata.getIdx();
const bool is_key_frame = idx.getFlags() & V4L2_BUF_FLAG_KEYFRAME;

// if we aren't recording yet, try to start, since we are in the correct segment
if (!recording) {
if (is_key_frame) {
// only create on iframe
if (dropped_frames) {
// this should only happen for the first segment, maybe
LOGW("%s: dropped %d non iframe packets before init", name.c_str(), dropped_frames);
dropped_frames = 0;
}
// if we aren't actually recording, don't create the writer
if (encoder_info.record) {
writer.reset(new VideoWriter(s->segment_path,
encoder_info.filename, idx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C,
encoder_info.frame_width, encoder_info.frame_height, encoder_info.fps, idx.getType()));
// write the header
auto header = edata.getHeader();
writer->write((uint8_t *)header.begin(), header.size(), idx.getTimestampEof() / 1000, true, false);
}
recording = true;
} else {
// this is a sad case when we aren't recording, but don't have an iframe
// nothing we can do but drop the frame
++dropped_frames;
return 0;
}
}

// we have to be recording if we are here
assert(recording);

// if we are actually writing the video file, do so
if (writer) {
auto data = edata.getData();
writer->write((uint8_t *)data.begin(), data.size(), idx.getTimestampEof() / 1000, false, is_key_frame);
}

// put it in log stream as the idx packet
MessageBuilder msg;
auto evt = msg.initEvent(event.getValid());
evt.setLogMonoTime(event.getLogMonoTime());
(evt.*(encoder_info.set_encode_idx_func))(idx);
auto bytes = msg.toBytes();
logger_log(&s->logger, (uint8_t *)bytes.begin(), bytes.size(), true); // always in qlog?
return bytes.size();
}

int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct RemoteEncoder &re, const EncoderInfo &encoder_info) {
int bytes_count = 0;

// extract the message
capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr<capnp::word>((capnp::word *)msg->getData(), msg->getSize() / sizeof(capnp::word)));
auto event = cmsg.getRoot<cereal::Event>();
auto edata = (event.*(encoder_info.get_encode_data_func))();
auto idx = edata.getIdx();
auto flags = idx.getFlags();
auto idx = (event.*(encoder_info.get_encode_data_func))().getIdx();

// encoderd can have started long before loggerd
if (!re.seen_first_packet) {
Expand All @@ -78,7 +129,6 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct

if (offset_segment_num == s->rotate_segment) {
// loggerd is now on the segment that matches this packet

// if this is a new segment, we close any possible old segments, move to the new, and process any queued packets
if (re.current_segment != s->rotate_segment) {
if (re.recording) {
Expand All @@ -87,62 +137,18 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct
}
re.current_segment = s->rotate_segment;
re.marked_ready_to_rotate = false;
// we are in this segment now, process any queued messages before this one
if (!re.q.empty()) {
for (auto &qmsg: re.q) {
bytes_count += handle_encoder_msg(s, qmsg, name, re, encoder_info);
}
re.q.clear();
}
}

// if we aren't recording yet, try to start, since we are in the correct segment
if (!re.recording) {
if (flags & V4L2_BUF_FLAG_KEYFRAME) {
// only create on iframe
if (re.dropped_frames) {
// this should only happen for the first segment, maybe
LOGW("%s: dropped %d non iframe packets before init", name.c_str(), re.dropped_frames);
re.dropped_frames = 0;
}
// if we aren't actually recording, don't create the writer
if (encoder_info.record) {
re.writer.reset(new VideoWriter(s->segment_path,
encoder_info.filename, idx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C,
encoder_info.frame_width, encoder_info.frame_height, encoder_info.fps, idx.getType()));
// write the header
auto header = edata.getHeader();
re.writer->write((uint8_t *)header.begin(), header.size(), idx.getTimestampEof()/1000, true, false);
}
re.recording = true;
} else {
// this is a sad case when we aren't recording, but don't have an iframe
// nothing we can do but drop the frame
delete msg;
++re.dropped_frames;
return bytes_count;
// we are in this segment now, process any queued messages before this one
if (!re.q.empty()) {
for (auto &qmsg : re.q) {
capnp::FlatArrayMessageReader msg_reader({(capnp::word *)qmsg->getData(), qmsg->getSize() / sizeof(capnp::word)});
bytes_count += re.write_encode_data(s, msg_reader.getRoot<cereal::Event>(), name, encoder_info);
delete qmsg;
}
re.q.clear();
}

// we have to be recording if we are here
assert(re.recording);

// if we are actually writing the video file, do so
if (re.writer) {
auto data = edata.getData();
re.writer->write((uint8_t *)data.begin(), data.size(), idx.getTimestampEof()/1000, false, flags & V4L2_BUF_FLAG_KEYFRAME);
}

// put it in log stream as the idx packet
MessageBuilder bmsg;
auto evt = bmsg.initEvent(event.getValid());
evt.setLogMonoTime(event.getLogMonoTime());
(evt.*(encoder_info.set_encode_idx_func))(idx);
auto new_msg = bmsg.toBytes();
logger_log(&s->logger, (uint8_t *)new_msg.begin(), new_msg.size(), true); // always in qlog?
bytes_count += new_msg.size();

// free the message, we used it
bytes_count = re.write_encode_data(s, event, name, encoder_info);
delete msg;
} else if (offset_segment_num > s->rotate_segment) {
// encoderd packet has a newer segment, this means encoderd has rolled over
Expand Down

0 comments on commit 2933538

Please sign in to comment.