Ensure that first segment starts with all cameras on the same frameId (#22096)

* test that segment starts with the same frame id

* improve test

* add sync logic

* atomic max

* review comments
old-commit-hash: a93271a518
commatwo_master
Willem Melching 4 years ago committed by GitHub
parent 45ea7cc5e4
commit 60e7e09127
  1. 6
      selfdrive/common/util.h
  2. 42
      selfdrive/loggerd/loggerd.cc
  3. 19
      selfdrive/loggerd/tests/test_encoder.py

@ -141,3 +141,9 @@ public:
private: private:
float x_, k_; float x_, k_;
}; };
template<typename T>
void update_max_atomic(std::atomic<T>& max, T const& value) {
T prev = max;
while(prev < value && !max.compare_exchange_weak(prev, value)) {}
}

@ -46,7 +46,8 @@ const int DCAM_BITRATE = Hardware::TICI() ? MAIN_BITRATE : 2500000;
#define NO_CAMERA_PATIENCE 500 // fall back to time-based rotation if all cameras are dead #define NO_CAMERA_PATIENCE 500 // fall back to time-based rotation if all cameras are dead
const int SEGMENT_LENGTH = getenv("LOGGERD_TEST") ? atoi(getenv("LOGGERD_SEGMENT_LENGTH")) : 60; const bool LOGGERD_TEST = getenv("LOGGERD_TEST");
const int SEGMENT_LENGTH = LOGGERD_TEST ? atoi(getenv("LOGGERD_SEGMENT_LENGTH")) : 60;
ExitHandler do_exit; ExitHandler do_exit;
@ -112,6 +113,12 @@ struct LoggerdState {
std::atomic<int> waiting_rotate; std::atomic<int> waiting_rotate;
int max_waiting = 0; int max_waiting = 0;
double last_rotate_tms = 0.; double last_rotate_tms = 0.;
// Sync logic for startup
std::atomic<bool> encoders_synced;
std::atomic<int> encoders_ready;
std::atomic<uint32_t> start_frame_id;
std::atomic<uint32_t> latest_frame_id;
}; };
LoggerdState s; LoggerdState s;
@ -124,9 +131,11 @@ void encoder_thread(const LogCameraInfo &cam_info) {
std::vector<Encoder *> encoders; std::vector<Encoder *> encoders;
VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false); VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false);
bool ready = false;
while (!do_exit) { while (!do_exit) {
if (!vipc_client.connect(false)) { if (!vipc_client.connect(false)) {
util::sleep_for(100); util::sleep_for(5);
continue; continue;
} }
@ -151,6 +160,23 @@ void encoder_thread(const LogCameraInfo &cam_info) {
if (buf == nullptr) continue; if (buf == nullptr) continue;
if (cam_info.trigger_rotate) { if (cam_info.trigger_rotate) {
if (!ready) {
LOGE("%s encoder ready", cam_info.filename);
++s.encoders_ready;
ready = true;
}
if (!s.encoders_synced) {
update_max_atomic(s.latest_frame_id, extra.frame_id);
continue;
} else {
// Wait for all encoders to reach the same frame id
if (extra.frame_id < s.start_frame_id) {
LOGE("%s waiting for frame %d, cur %d", cam_info.filename, s.start_frame_id.load(), extra.frame_id);
continue;
}
}
s.last_camera_seen_tms = millis_since_boot(); s.last_camera_seen_tms = millis_since_boot();
} }
@ -262,7 +288,8 @@ void rotate_if_needed() {
double tms = millis_since_boot(); double tms = millis_since_boot();
if ((tms - s.last_rotate_tms) > SEGMENT_LENGTH * 1000 && if ((tms - s.last_rotate_tms) > SEGMENT_LENGTH * 1000 &&
(tms - s.last_camera_seen_tms) > NO_CAMERA_PATIENCE) { (tms - s.last_camera_seen_tms) > NO_CAMERA_PATIENCE &&
!LOGGERD_TEST) {
LOGW("no camera packet seen. auto rotating"); LOGW("no camera packet seen. auto rotating");
logger_rotate(); logger_rotate();
} }
@ -312,6 +339,15 @@ int main(int argc, char** argv) {
uint64_t msg_count = 0, bytes_count = 0; uint64_t msg_count = 0, bytes_count = 0;
double start_ts = millis_since_boot(); double start_ts = millis_since_boot();
while (!do_exit) { while (!do_exit) {
// Check if all encoders are ready and start encoding at the same time
if (!s.encoders_synced && (s.encoders_ready == s.max_waiting)) {
// Small margin in case one of the encoders already dropped the next frame
s.start_frame_id = s.latest_frame_id + 2;
s.encoders_synced = true;
LOGE("starting encoders at frame id %d", s.start_frame_id.load());
}
// poll for new messages on all sockets // poll for new messages on all sockets
for (auto sock : poller->poll(1000)) { for (auto sock : poller->poll(1000)) {
// drain socket // drain socket

@ -15,7 +15,6 @@ from common.params import Params
from common.timeout import Timeout from common.timeout import Timeout
from selfdrive.hardware import EON, TICI from selfdrive.hardware import EON, TICI
from selfdrive.loggerd.config import ROOT from selfdrive.loggerd.config import ROOT
from selfdrive.test.helpers import with_processes
from selfdrive.manager.process_config import managed_processes from selfdrive.manager.process_config import managed_processes
from tools.lib.logreader import LogReader from tools.lib.logreader import LogReader
@ -66,9 +65,14 @@ class TestEncoder(unittest.TestCase):
# TODO: this should run faster than real time # TODO: this should run faster than real time
@parameterized.expand([(True, ), (False, )]) @parameterized.expand([(True, ), (False, )])
@with_processes(['camerad', 'sensord', 'loggerd'], init_time=3, ignore_stopped=['loggerd'])
def test_log_rotation(self, record_front): def test_log_rotation(self, record_front):
Params().put("RecordFront", str(int(record_front))) Params().put_bool("RecordFront", record_front)
managed_processes['sensord'].start()
managed_processes['loggerd'].start()
time.sleep(1.0)
managed_processes['camerad'].start()
num_segments = int(os.getenv("SEGMENTS", random.randint(10, 15))) num_segments = int(os.getenv("SEGMENTS", random.randint(10, 15)))
@ -84,6 +88,7 @@ class TestEncoder(unittest.TestCase):
def check_seg(i): def check_seg(i):
# check each camera file size # check each camera file size
counts = [] counts = []
first_frames = []
for camera, fps, size, encode_idx_name in CAMERAS: for camera, fps, size, encode_idx_name in CAMERAS:
if not record_front and "dcamera" in camera: if not record_front and "dcamera" in camera:
continue continue
@ -119,6 +124,7 @@ class TestEncoder(unittest.TestCase):
segment_idxs = [getattr(m, encode_idx_name).segmentId for m in LogReader(rlog_path) if m.which() == encode_idx_name] segment_idxs = [getattr(m, encode_idx_name).segmentId for m in LogReader(rlog_path) if m.which() == encode_idx_name]
encode_idxs = [getattr(m, encode_idx_name).encodeId for m in LogReader(rlog_path) if m.which() == encode_idx_name] encode_idxs = [getattr(m, encode_idx_name).encodeId for m in LogReader(rlog_path) if m.which() == encode_idx_name]
frame_idxs = [getattr(m, encode_idx_name).frameId for m in LogReader(rlog_path) if m.which() == encode_idx_name]
# Check frame count # Check frame count
self.assertEqual(frame_count, len(segment_idxs)) self.assertEqual(frame_count, len(segment_idxs))
@ -130,8 +136,11 @@ class TestEncoder(unittest.TestCase):
if not eon_dcam: if not eon_dcam:
self.assertEqual(expected_frames * i, encode_idxs[0]) self.assertEqual(expected_frames * i, encode_idxs[0])
first_frames.append(frame_idxs[0])
self.assertEqual(len(set(encode_idxs)), len(encode_idxs)) self.assertEqual(len(set(encode_idxs)), len(encode_idxs))
self.assertEqual(1, len(set(first_frames)))
if TICI: if TICI:
expected_frames = fps * SEGMENT_LENGTH expected_frames = fps * SEGMENT_LENGTH
self.assertEqual(min(counts), expected_frames) self.assertEqual(min(counts), expected_frames)
@ -139,11 +148,13 @@ class TestEncoder(unittest.TestCase):
for i in trange(num_segments + 1): for i in trange(num_segments + 1):
# poll for next segment # poll for next segment
with Timeout(int(SEGMENT_LENGTH*2), error_msg=f"timed out waiting for segment {i}"): with Timeout(int(SEGMENT_LENGTH*10), error_msg=f"timed out waiting for segment {i}"):
while Path(f"{route_prefix_path}--{i}") not in Path(ROOT).iterdir(): while Path(f"{route_prefix_path}--{i}") not in Path(ROOT).iterdir():
time.sleep(0.1) time.sleep(0.1)
managed_processes['loggerd'].stop() managed_processes['loggerd'].stop()
managed_processes['camerad'].stop()
managed_processes['sensord'].stop()
for i in trange(num_segments): for i in trange(num_segments):
check_seg(i) check_seg(i)

Loading…
Cancel
Save