diff --git a/Jenkinsfile b/Jenkinsfile index e8f073ba99..7a84e1ef8d 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -138,6 +138,7 @@ pipeline { ["test sounds", "nosetests -s selfdrive/test/test_sounds.py"], ["test boardd loopback", "nosetests -s selfdrive/boardd/tests/test_boardd_loopback.py"], ["test loggerd", "CI=1 python selfdrive/loggerd/tests/test_loggerd.py"], + ["test encoder", "CI=1 python selfdrive/loggerd/tests/test_encoder.py"], //["test camerad", "CI=1 python selfdrive/camerad/test/test_camerad.py"], // wait for shelf refactor //["test updater", "python installer/updater/test_updater.py"], ]) diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index e946b780a4..a66a06831a 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -600,7 +600,7 @@ int main(int argc, char** argv) { double last_rotate_tms = millis_since_boot(); double last_camera_seen_tms = millis_since_boot(); while (!do_exit) { - for (auto sock : poller->poll(100 * 1000)) { + for (auto sock : poller->poll(1000)) { Message * last_msg = nullptr; while (!do_exit) { Message * msg = sock->receive(true); diff --git a/selfdrive/loggerd/tests/test_encoder.py b/selfdrive/loggerd/tests/test_encoder.py new file mode 100755 index 0000000000..3f5d26c1ab --- /dev/null +++ b/selfdrive/loggerd/tests/test_encoder.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 +import math +import os +import random +import shutil +import subprocess +import time +import unittest +from parameterized import parameterized +from pathlib import Path +from tqdm import trange + +from common.params import Params +from common.timeout import Timeout +from selfdrive.hardware import EON, TICI +from selfdrive.test.helpers import with_processes +from selfdrive.loggerd.config import ROOT, CAMERA_FPS + + +# baseline file sizes for a 2s segment, in bytes +FULL_SIZE = 1253786 +if EON: + CAMERAS = { + "fcamera": FULL_SIZE, + "dcamera": 770920, + "qcamera": 38533, + } +elif TICI: + CAMERAS = {f"{c}camera": FULL_SIZE if c!="q" else 38533 for c in ["f", "e", "d", "q"]} +else: + CAMERAS = {} + +ALL_CAMERA_COMBINATIONS = [(cameras,) for cameras in [CAMERAS, {k:CAMERAS[k] for k in CAMERAS if k!='dcamera'}]] + +FRAME_TOLERANCE = 0 +FILE_SIZE_TOLERANCE = 0.5 + +class TestEncoder(unittest.TestCase): + + # TODO: all of loggerd should work on PC + @classmethod + def setUpClass(cls): + if not (EON or TICI): + raise unittest.SkipTest + + def setUp(self): + self._clear_logs() + + self.segment_length = 2 + os.environ["LOGGERD_TEST"] = "1" + os.environ["LOGGERD_SEGMENT_LENGTH"] = str(self.segment_length) + + def tearDown(self): + self._clear_logs() + + def _clear_logs(self): + if os.path.exists(ROOT): + shutil.rmtree(ROOT) + + def _get_latest_segment_path(self): + last_route = sorted(Path(ROOT).iterdir(), key=os.path.getmtime)[-1] + return os.path.join(ROOT, last_route) + + # TODO: this should run faster than real time + @parameterized.expand(ALL_CAMERA_COMBINATIONS) + @with_processes(['camerad', 'sensord', 'loggerd'], init_time=5) + def test_log_rotation(self, cameras): + print("checking targets:", cameras) + Params().put("RecordFront", "1" if 'dcamera' in cameras else "0") + + num_segments = random.randint(80, 150) + if "CI" in os.environ: + num_segments = random.randint(15, 20) # ffprobe is slow on comma two + + # wait for loggerd to make the dir for first segment + time.sleep(10) + route_prefix_path = None + with Timeout(30): + while route_prefix_path is None: + try: + route_prefix_path = self._get_latest_segment_path().rsplit("--", 1)[0] + except Exception: + time.sleep(2) + continue + + for i in trange(num_segments): + # poll for next segment + if i < num_segments - 1: + with Timeout(self.segment_length*3, error_msg=f"timed out waiting for segment {i}"): + while True: + seg_num = int(self._get_latest_segment_path().rsplit("--", 1)[1]) + if seg_num > i: + break + time.sleep(0.1) + else: + time.sleep(self.segment_length) + + # check each camera file size + for camera, size in cameras.items(): + ext = "ts" if camera=='qcamera' else "hevc" + file_path = f"{route_prefix_path}--{i}/{camera}.{ext}" + + # check file size + self.assertTrue(os.path.exists(file_path), f"couldn't find {file_path}") + file_size = os.path.getsize(file_path) + self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE), + f"{camera} failed size check: expected {size}, got {file_size}") + + if camera == 'qcamera': + continue + + # check frame count + cmd = f"ffprobe -v error -count_frames -select_streams v:0 -show_entries stream=nb_read_frames \ + -of default=nokey=1:noprint_wrappers=1 {file_path}" + expected_frames = self.segment_length * CAMERA_FPS // 2 if (EON and camera=='dcamera') else self.segment_length * CAMERA_FPS + frame_tolerance = FRAME_TOLERANCE+1 if (EON and camera=='dcamera') or i==0 else FRAME_TOLERANCE + frame_count = int(subprocess.check_output(cmd, shell=True, encoding='utf8').strip()) + + self.assertTrue(abs(expected_frames - frame_count) <= frame_tolerance, + f"{camera} failed frame count check: expected {expected_frames}, got {frame_count}") + shutil.rmtree(f"{route_prefix_path}--{i}") + +if __name__ == "__main__": + unittest.main() diff --git a/selfdrive/loggerd/tests/test_loggerd.py b/selfdrive/loggerd/tests/test_loggerd.py index 4b7e23f7ba..96719dce6e 100755 --- a/selfdrive/loggerd/tests/test_loggerd.py +++ b/selfdrive/loggerd/tests/test_loggerd.py @@ -1,124 +1,211 @@ #!/usr/bin/env python3 -import math import os import random -import shutil +import string import subprocess import time import unittest -from parameterized import parameterized +from collections import defaultdict from pathlib import Path -from tqdm import trange -from common.params import Params +from cereal import log +import cereal.messaging as messaging +from cereal.services import service_list +from common.basedir import BASEDIR from common.timeout import Timeout -from selfdrive.hardware import EON, TICI -from selfdrive.test.helpers import with_processes -from selfdrive.loggerd.config import ROOT, CAMERA_FPS - - -# baseline file sizes for a 2s segment, in bytes -FULL_SIZE = 1253786 -if EON: - CAMERAS = { - "fcamera": FULL_SIZE, - "dcamera": 770920, - "qcamera": 38533, - } -elif TICI: - CAMERAS = {f"{c}camera": FULL_SIZE if c!="q" else 38533 for c in ["f", "e", "d", "q"]} -else: - CAMERAS = {} +from common.params import Params +import selfdrive.manager as manager +from selfdrive.loggerd.config import ROOT +from selfdrive.version import version as VERSION +from tools.lib.logreader import LogReader -ALL_CAMERA_COMBINATIONS = [(cameras,) for cameras in [CAMERAS, {k:CAMERAS[k] for k in CAMERAS if k!='dcamera'}]] +SentinelType = log.Sentinel.SentinelType -FRAME_TOLERANCE = 0 -FILE_SIZE_TOLERANCE = 0.5 +CEREAL_SERVICES = [f for f in log.Event.schema.union_fields if f in service_list + and service_list[f].should_log and "encode" not in f.lower()] class TestLoggerd(unittest.TestCase): - # TODO: all of loggerd should work on PC - @classmethod - def setUpClass(cls): - if not (EON or TICI): - raise unittest.SkipTest - - def setUp(self): - self._clear_logs() - - self.segment_length = 2 - os.environ["LOGGERD_TEST"] = "1" - os.environ["LOGGERD_SEGMENT_LENGTH"] = str(self.segment_length) - - def tearDown(self): - self._clear_logs() - - def _clear_logs(self): - if os.path.exists(ROOT): - shutil.rmtree(ROOT) - - def _get_latest_segment_path(self): - last_route = sorted(Path(ROOT).iterdir(), key=os.path.getmtime)[-1] - return os.path.join(ROOT, last_route) - - # TODO: this should run faster than real time - @parameterized.expand(ALL_CAMERA_COMBINATIONS) - @with_processes(['camerad', 'sensord', 'loggerd'], init_time=5) - def test_log_rotation(self, cameras): - print("checking targets:", cameras) - Params().put("RecordFront", "1" if 'dcamera' in cameras else "0") - - num_segments = random.randint(80, 150) - if "CI" in os.environ: - num_segments = random.randint(15, 20) # ffprobe is slow on comma two - - # wait for loggerd to make the dir for first segment - time.sleep(10) - route_prefix_path = None - with Timeout(30): - while route_prefix_path is None: + def _get_latest_log_dir(self): + log_dirs = sorted(Path(ROOT).iterdir(), key=lambda f: f.stat().st_mtime) + return log_dirs[-1] + + def _get_log_dir(self, x): + for p in x.split(' '): + path = Path(p.strip()) + if path.is_dir(): + return path + return None + + def _gen_bootlog(self): + with Timeout(5): + out = subprocess.check_output(["./loggerd", "--bootlog"], cwd=os.path.join(BASEDIR, "selfdrive/loggerd"), encoding='utf-8') + + # check existence + d = self._get_log_dir(out) + path = Path(os.path.join(d, "bootlog.bz2")) + assert path.is_file(), "failed to create bootlog file" + return path + + def _check_init_data(self, msgs): + msg = msgs[0] + assert msg.which() == 'initData' + + def _check_sentinel(self, msgs, route): + start_type = SentinelType.startOfRoute if route else SentinelType.startOfSegment + assert msgs[1].sentinel.type == start_type + + end_type = SentinelType.endOfRoute if route else SentinelType.endOfSegment + assert msgs[-1].sentinel.type == end_type + + def test_init_data_values(self): + os.environ["CLEAN"] = random.choice(["0", "1"]) + os.environ["DONGLE_ID"] = ''.join(random.choice(string.printable) for n in range(random.randint(1, 100))) + + fake_params = [ + ("GitCommit", "gitCommit", "commit"), + ("GitBranch", "gitBranch", "branch"), + ("GitRemote", "gitRemote", "remote"), + ] + params = Params() + for k, _, v in fake_params: + params.put(k, v) + + lr = list(LogReader(str(self._gen_bootlog()))) + initData = lr[0].initData + + assert initData.dirty != bool(os.environ["CLEAN"]) + assert initData.dongleId == os.environ["DONGLE_ID"] + assert initData.version == VERSION + + if os.path.isfile("/proc/cmdline"): + with open("/proc/cmdline") as f: + assert list(initData.kernelArgs) == f.read().strip().split(" ") + + with open("/proc/version") as f: + assert initData.kernelVersion == f.read() + + for _, k, v in fake_params: + assert getattr(initData, k) == v + + def test_bootlog(self): + # generate bootlog with fake launch log + launch_log = ''.join([str(random.choice(string.printable)) for _ in range(100)]) + with open("/tmp/launch_log", "w") as f: + f.write(launch_log) + + bootlog_path = self._gen_bootlog() + lr = list(LogReader(str(bootlog_path))) + + # check length + assert len(lr) == 4 # boot + initData + 2x sentinel + + # check initData and sentinel + self._check_init_data(lr) + self._check_sentinel(lr, True) + + # check msgs + bootlog_msgs = [m for m in lr if m.which() == 'boot'] + assert len(bootlog_msgs) == 1 + + # sanity check values + boot = bootlog_msgs.pop().boot + assert abs(boot.wallTimeNanos - time.time_ns()) < 5*1e9 # within 5s + assert boot.launchLog == launch_log + + for field, path in [("lastKmsg", "console-ramoops"), ("lastPmsg", "pmsg-ramoops-0")]: + path = Path(os.path.join("/sys/fs/pstore/", path)) + val = b"" + if path.is_file(): + val = open(path, "rb").read() + assert getattr(boot, field) == val + + def test_qlog(self): + qlog_services = [s for s in CEREAL_SERVICES if service_list[s].decimation is not None] + no_qlog_services = [s for s in CEREAL_SERVICES if service_list[s].decimation is None] + + services = random.sample(qlog_services, random.randint(2, 10)) + \ + random.sample(no_qlog_services, random.randint(2, 10)) + + pm = messaging.PubMaster(services) + + # sleep enough for the first poll to time out + # TOOD: fix loggerd bug dropping the msgs from the first poll + manager.start_managed_process("loggerd") + time.sleep(2) + + sent_msgs = defaultdict(list) + for _ in range(random.randint(2, 10) * 100): + for s in services: try: - route_prefix_path = self._get_latest_segment_path().rsplit("--", 1)[0] + m = messaging.new_message(s) except Exception: - time.sleep(2) - continue - - for i in trange(num_segments): - # poll for next segment - if i < num_segments - 1: - with Timeout(self.segment_length*3, error_msg=f"timed out waiting for segment {i}"): - while True: - seg_num = int(self._get_latest_segment_path().rsplit("--", 1)[1]) - if seg_num > i: - break - time.sleep(0.1) + m = messaging.new_message(s, random.randint(2, 10)) + pm.send(s, m) + sent_msgs[s].append(m) + time.sleep(0.01) + + time.sleep(1) + manager.kill_managed_process("loggerd") + + qlog_path = os.path.join(self._get_latest_log_dir(), "qlog.bz2") + lr = list(LogReader(qlog_path)) + + # check initData and sentinel + self._check_init_data(lr) + self._check_sentinel(lr, True) + + recv_msgs = defaultdict(list) + for m in lr: + recv_msgs[m.which()].append(m) + + for s, msgs in sent_msgs.items(): + recv_cnt = len(recv_msgs[s]) + + if s in no_qlog_services: + # check services with no specific decimation aren't in qlog + assert recv_cnt == 0, f"got {recv_cnt} {s} msgs in qlog" else: - time.sleep(self.segment_length) - - # check each camera file size - for camera, size in cameras.items(): - ext = "ts" if camera=='qcamera' else "hevc" - file_path = f"{route_prefix_path}--{i}/{camera}.{ext}" - - # check file size - self.assertTrue(os.path.exists(file_path), f"couldn't find {file_path}") - file_size = os.path.getsize(file_path) - self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE), - f"{camera} failed size check: expected {size}, got {file_size}") - - if camera == 'qcamera': - continue - - # check frame count - cmd = f"ffprobe -v error -count_frames -select_streams v:0 -show_entries stream=nb_read_frames \ - -of default=nokey=1:noprint_wrappers=1 {file_path}" - expected_frames = self.segment_length * CAMERA_FPS // 2 if (EON and camera=='dcamera') else self.segment_length * CAMERA_FPS - frame_tolerance = FRAME_TOLERANCE+1 if (EON and camera=='dcamera') or i==0 else FRAME_TOLERANCE - frame_count = int(subprocess.check_output(cmd, shell=True, encoding='utf8').strip()) - - self.assertTrue(abs(expected_frames - frame_count) <= frame_tolerance, - f"{camera} failed frame count check: expected {expected_frames}, got {frame_count}") - shutil.rmtree(f"{route_prefix_path}--{i}") + # check logged message count matches decimation + expected_cnt = len(msgs) // service_list[s].decimation + assert recv_cnt == expected_cnt, f"expected {expected_cnt} msgs for {s}, got {recv_cnt}" + + def test_rlog(self): + services = random.sample(CEREAL_SERVICES, random.randint(5, 10)) + pm = messaging.PubMaster(services) + + # sleep enough for the first poll to time out + # TOOD: fix loggerd bug dropping the msgs from the first poll + manager.start_managed_process("loggerd") + time.sleep(2) + + sent_msgs = defaultdict(list) + for _ in range(random.randint(2, 10) * 100): + for s in services: + try: + m = messaging.new_message(s) + except Exception: + m = messaging.new_message(s, random.randint(2, 10)) + pm.send(s, m) + sent_msgs[s].append(m) + time.sleep(0.01) + + time.sleep(1) + manager.kill_managed_process("loggerd") + + lr = list(LogReader(os.path.join(self._get_latest_log_dir(), "rlog.bz2"))) + + # check initData and sentinel + self._check_init_data(lr) + self._check_sentinel(lr, True) + + # check all messages were logged and in order + lr = lr[2:-1] # slice off initData and both sentinels + for m in lr: + sent = sent_msgs[m.which()].pop(0) + sent.clear_write_flag() + assert sent.to_bytes() == m.as_builder().to_bytes() + if __name__ == "__main__": unittest.main() diff --git a/selfdrive/logmessaged.py b/selfdrive/logmessaged.py index 91dcbb7e04..953f3d477f 100755 --- a/selfdrive/logmessaged.py +++ b/selfdrive/logmessaged.py @@ -19,8 +19,6 @@ def main(): dat = b''.join(sock.recv_multipart()) dat = dat.decode('utf8') - # print "RECV", repr(dat) - levelnum = ord(dat[0]) dat = dat[1:]