encoderd: enable (#24492)
* enable encoderd * correct enable line * fix loggerd tests * fix power draw and cpu tests * correct cpu for encoderd * fix a bug, video_writer is shared * fix issue with not recording dcam * add recording state * wooo tests pass. encode id keeps counting * core 3 * loggerd then encoderd * stop loggerd first * core 3 always online * see the camera when we see encoder packet * encoderd on small core uses 37% * remove encoder logic from loggerd * delete unit test that doesn't really make sense anymore Co-authored-by: Comma Device <device@comma.ai>pull/24532/head
parent
a693b3a26b
commit
94b9972eb7
21 changed files with 160 additions and 434 deletions
@ -1,18 +0,0 @@ |
||||
#include "selfdrive/loggerd/loggerd.h" |
||||
|
||||
#include <sys/resource.h> |
||||
|
||||
int main(int argc, char** argv) { |
||||
if (Hardware::TICI()) { |
||||
int ret; |
||||
ret = util::set_core_affinity({0, 1, 2, 3}); |
||||
assert(ret == 0); |
||||
// TODO: why does this impact camerad timings?
|
||||
//ret = util::set_realtime_priority(1);
|
||||
//assert(ret == 0);
|
||||
} |
||||
|
||||
loggerd_thread(); |
||||
|
||||
return 0; |
||||
} |
@ -1,82 +0,0 @@ |
||||
#include "selfdrive/loggerd/loggerd.h" |
||||
#include "selfdrive/loggerd/remote_encoder.h" |
||||
|
||||
int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct RemoteEncoder &re) { |
||||
const LogCameraInfo &cam_info = (name == "driverEncodeData") ? cameras_logged[1] : |
||||
((name == "wideRoadEncodeData") ? cameras_logged[2] : |
||||
((name == "qRoadEncodeData") ? qcam_info : cameras_logged[0])); |
||||
if (!cam_info.record) return 0; // TODO: handle this by not subscribing
|
||||
|
||||
// rotation happened, process the queue (happens before the current message)
|
||||
int bytes_count = 0; |
||||
if (re.logger_segment != s->rotate_segment) { |
||||
re.logger_segment = s->rotate_segment; |
||||
for (auto &qmsg: re.q) { |
||||
bytes_count += handle_encoder_msg(s, qmsg, name, re); |
||||
} |
||||
re.q.clear(); |
||||
} |
||||
|
||||
// extract the message
|
||||
capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr<capnp::word>((capnp::word *)msg->getData(), msg->getSize())); |
||||
auto event = cmsg.getRoot<cereal::Event>(); |
||||
auto edata = (name == "driverEncodeData") ? event.getDriverEncodeData() : |
||||
((name == "wideRoadEncodeData") ? event.getWideRoadEncodeData() : |
||||
((name == "qRoadEncodeData") ? event.getQRoadEncodeData() : event.getRoadEncodeData())); |
||||
auto idx = edata.getIdx(); |
||||
auto flags = idx.getFlags(); |
||||
|
||||
if (!re.writer) { |
||||
// only create on iframe
|
||||
if (flags & V4L2_BUF_FLAG_KEYFRAME) { |
||||
if (re.dropped_frames) { |
||||
// this should only happen for the first segment, maybe
|
||||
LOGD("%s: dropped %d non iframe packets before init", name.c_str(), re.dropped_frames); |
||||
re.dropped_frames = 0; |
||||
} |
||||
re.writer.reset(new VideoWriter(s->segment_path, |
||||
cam_info.filename, idx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C, |
||||
cam_info.frame_width, cam_info.frame_height, cam_info.fps, idx.getType())); |
||||
// write the header
|
||||
auto header = edata.getHeader(); |
||||
re.writer->write((uint8_t *)header.begin(), header.size(), idx.getTimestampEof()/1000, true, false); |
||||
re.segment = idx.getSegmentNum(); |
||||
} else { |
||||
++re.dropped_frames; |
||||
return bytes_count; |
||||
} |
||||
} |
||||
|
||||
if (re.segment != idx.getSegmentNum()) { |
||||
if (re.writer) { |
||||
// encoder is on the next segment, this segment is over so we close the videowriter
|
||||
re.writer.reset(); |
||||
++s->ready_to_rotate; |
||||
LOGD("rotate %d -> %d ready %d/%d", re.segment, idx.getSegmentNum(), s->ready_to_rotate.load(), s->max_waiting); |
||||
} |
||||
// queue up all the new segment messages, they go in after the rotate
|
||||
re.q.push_back(msg); |
||||
} else { |
||||
auto data = edata.getData(); |
||||
re.writer->write((uint8_t *)data.begin(), data.size(), idx.getTimestampEof()/1000, false, flags & V4L2_BUF_FLAG_KEYFRAME); |
||||
|
||||
// put it in log stream as the idx packet
|
||||
MessageBuilder bmsg; |
||||
auto evt = bmsg.initEvent(event.getValid()); |
||||
evt.setLogMonoTime(event.getLogMonoTime()); |
||||
if (name == "driverEncodeData") { evt.setDriverEncodeIdx(idx); } |
||||
if (name == "wideRoadEncodeData") { evt.setWideRoadEncodeIdx(idx); } |
||||
if (name == "qRoadEncodeData") { evt.setQRoadEncodeIdx(idx); } |
||||
if (name == "roadEncodeData") { evt.setRoadEncodeIdx(idx); } |
||||
auto new_msg = bmsg.toBytes(); |
||||
logger_log(&s->logger, (uint8_t *)new_msg.begin(), new_msg.size(), true); // always in qlog?
|
||||
bytes_count += new_msg.size(); |
||||
|
||||
// this frees the message
|
||||
delete msg; |
||||
} |
||||
|
||||
return bytes_count; |
||||
} |
||||
|
||||
|
@ -1,11 +0,0 @@ |
||||
#include "selfdrive/loggerd/encoder/video_writer.h" |
||||
|
||||
struct RemoteEncoder { |
||||
std::unique_ptr<VideoWriter> writer; |
||||
int segment = -1; |
||||
std::vector<Message *> q; |
||||
int logger_segment = -1; |
||||
int dropped_frames = 0; |
||||
}; |
||||
|
||||
int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct RemoteEncoder &re); |
@ -1,93 +0,0 @@ |
||||
#include <future> |
||||
|
||||
#include "catch2/catch.hpp" |
||||
#include "selfdrive/loggerd/loggerd.h" |
||||
|
||||
int random_int(int min, int max) { |
||||
std::random_device dev; |
||||
std::mt19937 rng(dev()); |
||||
std::uniform_int_distribution<std::mt19937::result_type> dist(min, max); |
||||
return dist(rng); |
||||
} |
||||
|
||||
int get_synced_frame_id(LoggerdState *s, CameraType cam_type, int start_frame_id) { |
||||
int frame_id = start_frame_id; |
||||
while (!sync_encoders(s, cam_type, frame_id)) { |
||||
++frame_id; |
||||
usleep(0); |
||||
} |
||||
return frame_id; |
||||
}; |
||||
|
||||
TEST_CASE("sync_encoders") { |
||||
const int max_waiting = GENERATE(1, 2, 3); |
||||
|
||||
for (int test_cnt = 0; test_cnt < 10; ++test_cnt) { |
||||
LoggerdState s{.max_waiting = max_waiting}; |
||||
std::vector<int> start_frames; |
||||
std::vector<std::future<int>> futures; |
||||
|
||||
for (int i = 0; i < max_waiting; ++i) { |
||||
int start_frame_id = random_int(0, 20); |
||||
start_frames.push_back(start_frame_id); |
||||
futures.emplace_back(std::async(std::launch::async, get_synced_frame_id, &s, (CameraType)i, start_frame_id)); |
||||
} |
||||
|
||||
// get results
|
||||
int synced_frame_id = 0; |
||||
for (int i = 0; i < max_waiting; ++i) { |
||||
if (i == 0) { |
||||
synced_frame_id = futures[i].get(); |
||||
// require synced_frame_id equal start_frame_id if max_waiting == 1
|
||||
if (max_waiting == 1) { |
||||
REQUIRE(synced_frame_id == start_frames[0]); |
||||
} |
||||
} else { |
||||
REQUIRE(futures[i].get() == synced_frame_id); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
const int MAX_SEGMENT_CNT = 100; |
||||
|
||||
std::pair<int, uint32_t> encoder_thread(LoggerdState *s) { |
||||
int cur_seg = 0; |
||||
uint32_t frame_id = s->start_frame_id; |
||||
|
||||
while (cur_seg < MAX_SEGMENT_CNT) { |
||||
++frame_id; |
||||
if (trigger_rotate_if_needed(s, cur_seg, frame_id)) { |
||||
cur_seg = s->rotate_segment; |
||||
} |
||||
util::sleep_for(0); |
||||
} |
||||
|
||||
return {cur_seg, frame_id}; |
||||
} |
||||
|
||||
TEST_CASE("trigger_rotate") { |
||||
const int encoders = GENERATE(1, 2, 3); |
||||
const int start_frame_id = random_int(0, 20); |
||||
|
||||
LoggerdState s{ |
||||
.max_waiting = encoders, |
||||
.start_frame_id = start_frame_id, |
||||
}; |
||||
|
||||
std::vector<std::future<std::pair<int, uint32_t>>> futures; |
||||
for (int i = 0; i < encoders; ++i) { |
||||
futures.emplace_back(std::async(std::launch::async, encoder_thread, &s)); |
||||
} |
||||
|
||||
while (s.rotate_segment < MAX_SEGMENT_CNT) { |
||||
rotate_if_needed(&s); |
||||
util::sleep_for(10); |
||||
} |
||||
|
||||
for (auto &f : futures) { |
||||
auto [encoder_seg, frame_id] = f.get(); |
||||
REQUIRE(encoder_seg == MAX_SEGMENT_CNT); |
||||
REQUIRE(frame_id == start_frame_id + encoder_seg * (SEGMENT_LENGTH * MAIN_FPS)); |
||||
} |
||||
} |
Loading…
Reference in new issue