Ensure controlsd can engage again in process replay (#23556)

* hacks to make process replay engage again

* dont change that

* enable engagement check again

* chmod +x

* first working regen

* proper logMonoTime

* fix video framerate consistency

* mpc is valid again

* proper alive checks

* revert loggerd change

* ensure engaged in regen, fix managerState, peripheralState

* ubloxRaw is unused

* add submaster config for gm

* regen all services we can run without HW

* fix loggerd

* loggerd: matroska without the extension

* update ref
old-commit-hash: d3d10e014a
commatwo_master
Willem Melching 3 years ago committed by GitHub
parent 3aaf423dde
commit 962201fff1
  1. 9
      selfdrive/controls/controlsd.py
  2. 2
      selfdrive/loggerd/raw_logger.cc
  3. 17
      selfdrive/test/process_replay/process_replay.py
  4. 2
      selfdrive/test/process_replay/ref_commit
  5. 163
      selfdrive/test/process_replay/regen.py
  6. 3
      selfdrive/test/process_replay/regen_all.py
  7. 28
      selfdrive/test/process_replay/test_processes.py

@ -263,6 +263,7 @@ class Controls:
safety_mismatch = pandaState.safetyModel != self.CP.safetyConfigs[i].safetyModel or pandaState.safetyParam != self.CP.safetyConfigs[i].safetyParam
else:
safety_mismatch = pandaState.safetyModel not in IGNORED_SAFETY_MODES
if safety_mismatch or self.mismatch_counter >= 200:
self.events.add(EventName.controlsMismatch)
@ -279,7 +280,7 @@ class Controls:
if not self.logged_comm_issue:
invalid = [s for s, valid in self.sm.valid.items() if not valid]
not_alive = [s for s, alive in self.sm.alive.items() if not alive]
cloudlog.event("commIssue", invalid=invalid, not_alive=not_alive)
cloudlog.event("commIssue", invalid=invalid, not_alive=not_alive, can_error=self.can_rcv_error, error=True)
self.logged_comm_issue = True
else:
self.logged_comm_issue = False
@ -365,6 +366,10 @@ class Controls:
if not self.read_only:
self.CI.init(self.CP, self.can_sock, self.pm.sock['sendcan'])
self.initialized = True
if REPLAY and self.sm['pandaStates'][0].controlsAllowed:
self.state = State.enabled
Params().put_bool("ControlsReady", True)
# Check for CAN timeout
@ -593,7 +598,7 @@ class Controls:
ldw_allowed = self.is_ldw_enabled and CS.vEgo > LDW_MIN_SPEED and not recent_blinker \
and not self.active and self.sm['liveCalibration'].calStatus == Calibration.CALIBRATED
model_v2 = self.sm['modelV2']
model_v2 = self.sm['modelV2']
desire_prediction = model_v2.meta.desirePrediction
if len(desire_prediction) and ldw_allowed:
right_lane_visible = self.sm['lateralPlan'].rProb > 0.5

@ -76,7 +76,7 @@ void RawLogger::encoder_open(const char* path) {
close(lock_fd);
format_ctx = NULL;
avformat_alloc_output_context2(&format_ctx, NULL, NULL, vid_path.c_str());
avformat_alloc_output_context2(&format_ctx, NULL, "matroska", vid_path.c_str());
assert(format_ctx);
stream = avformat_new_stream(format_ctx, codec);

@ -25,7 +25,7 @@ NUMPY_TOLERANCE = 1e-7
CI = "CI" in os.environ
TIMEOUT = 15
ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'init_callback', 'should_recv_callback', 'tolerance', 'fake_pubsubmaster'])
ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'init_callback', 'should_recv_callback', 'tolerance', 'fake_pubsubmaster', 'submaster_config'], defaults=({},))
def wait_for_event(evt):
@ -88,8 +88,8 @@ class DumbSocket:
class FakeSubMaster(messaging.SubMaster):
def __init__(self, services):
super().__init__(services, addr=None)
def __init__(self, services, ignore_alive=None, ignore_avg_freq=None):
super().__init__(services, ignore_alive=ignore_alive, ignore_avg_freq=ignore_avg_freq, addr=None)
self.sock = {s: DumbSocket(s) for s in services}
self.update_called = threading.Event()
self.update_ready = threading.Event()
@ -241,13 +241,14 @@ CONFIGS = [
pub_sub={
"can": ["controlsState", "carState", "carControl", "sendcan", "carEvents", "carParams"],
"deviceState": [], "pandaStates": [], "peripheralState": [], "liveCalibration": [], "driverMonitoringState": [], "longitudinalPlan": [], "lateralPlan": [], "liveLocationKalman": [], "liveParameters": [], "radarState": [],
"modelV2": [], "driverCameraState": [], "roadCameraState": [], "ubloxRaw": [], "managerState": [],
"modelV2": [], "driverCameraState": [], "roadCameraState": [], "managerState": [],
},
ignore=["logMonoTime", "valid", "controlsState.startMonoTime", "controlsState.cumLagMs"],
init_callback=fingerprint,
should_recv_callback=controlsd_rcv_callback,
tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=True,
submaster_config={'ignore_avg_freq': ['radarState', 'longitudinalPlan']}
),
ProcessConfig(
proc_name="radard",
@ -348,14 +349,14 @@ def setup_env():
params.put_bool("Passive", False)
params.put_bool("CommunityFeaturesToggle", True)
os.environ['NO_RADAR_SLEEP'] = "1"
os.environ["SIMULATION"] = "1"
os.environ["NO_RADAR_SLEEP"] = "1"
os.environ["REPLAY"] = "1"
def python_replay_process(cfg, lr, fingerprint=None):
sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub]
pub_sockets = [s for s in cfg.pub_sub.keys() if s != 'can']
fsm = FakeSubMaster(pub_sockets)
fsm = FakeSubMaster(pub_sockets, **cfg.submaster_config)
fpm = FakePubMaster(sub_sockets)
args = (fsm, fpm)
if 'can' in list(cfg.pub_sub.keys()):
@ -426,7 +427,7 @@ def python_replay_process(cfg, lr, fingerprint=None):
msg_queue.append(msg.as_builder())
if should_recv:
fsm.update_msgs(0, msg_queue)
fsm.update_msgs(msg.logMonoTime / 1e9, msg_queue)
msg_queue = []
recv_cnt = len(recv_socks)

@ -1 +1 @@
d898cccaa7d772c92c8128092d42898607b08a40
4a64d5122c408e2ba759d531df2d6ab7b4d7508a

@ -11,8 +11,8 @@ import cereal.messaging as messaging
from cereal.services import service_list
from cereal.visionipc.visionipc_pyx import VisionIpcServer, VisionStreamType # pylint: disable=no-name-in-module, import-error
from common.params import Params
from common.realtime import Ratekeeper, DT_MDL, DT_DMON
from common.transformations.camera import eon_f_frame_size, eon_d_frame_size
from common.realtime import Ratekeeper, DT_MDL, DT_DMON, sec_since_boot
from common.transformations.camera import eon_f_frame_size, eon_d_frame_size, tici_f_frame_size, tici_d_frame_size
from selfdrive.car.fingerprints import FW_VERSIONS
from selfdrive.manager.process import ensure_running
from selfdrive.manager.process_config import managed_processes
@ -26,32 +26,107 @@ process_replay_dir = os.path.dirname(os.path.abspath(__file__))
FAKEDATA = os.path.join(process_replay_dir, "fakedata/")
def replay_panda_states(s, msgs):
pm = messaging.PubMaster([s, 'peripheralState'])
rk = Ratekeeper(service_list[s].frequency, print_delay_threshold=None)
smsgs = [m for m in msgs if m.which() in ['pandaStates', 'pandaStateDEPRECATED']]
# Migrate safety param base on carState
cp = [m for m in msgs if m.which() == 'carParams'][0].carParams
if len(cp.safetyConfigs):
safety_param = cp.safetyConfigs[0].safetyParam
else:
safety_param = cp.safetyParamDEPRECATED
while True:
for m in smsgs:
if m.which() == 'pandaStateDEPRECATED':
new_m = messaging.new_message('pandaStates', 1)
new_m.pandaStates[0] = m.pandaStateDEPRECATED
new_m.pandaStates[0].safetyParam = safety_param
pm.send(s, new_m)
else:
new_m = m.as_builder()
new_m.logMonoTime = int(sec_since_boot() * 1e9)
pm.send(s, new_m)
new_m = messaging.new_message('peripheralState')
pm.send('peripheralState', new_m)
rk.keep_time()
def replay_manager_state(s, msgs):
pm = messaging.PubMaster([s, ])
rk = Ratekeeper(service_list[s].frequency, print_delay_threshold=None)
while True:
new_m = messaging.new_message('managerState')
new_m.managerState.processes = [{'name': name, 'running': True} for name in managed_processes]
pm.send(s, new_m)
rk.keep_time()
def replay_device_state(s, msgs):
pm = messaging.PubMaster([s, ])
rk = Ratekeeper(service_list[s].frequency, print_delay_threshold=None)
smsgs = [m for m in msgs if m.which() == s]
while True:
for m in smsgs:
new_m = m.as_builder()
new_m.logMonoTime = int(sec_since_boot() * 1e9)
new_m.deviceState.freeSpacePercent = 50
new_m.deviceState.memoryUsagePercent = 50
pm.send(s, new_m)
rk.keep_time()
def replay_sensor_events(s, msgs):
pm = messaging.PubMaster([s, ])
rk = Ratekeeper(service_list[s].frequency, print_delay_threshold=None)
smsgs = [m for m in msgs if m.which() == s]
while True:
for m in smsgs:
new_m = m.as_builder()
new_m.logMonoTime = int(sec_since_boot() * 1e9)
for evt in new_m.sensorEvents:
evt.timestamp = new_m.logMonoTime
pm.send(s, new_m)
rk.keep_time()
def replay_service(s, msgs):
pm = messaging.PubMaster([s, ])
rk = Ratekeeper(service_list[s].frequency, print_delay_threshold=None)
smsgs = [m for m in msgs if m.which() == s]
while True:
for m in smsgs:
# TODO: use logMonoTime
pm.send(s, m.as_builder())
new_m = m.as_builder()
new_m.logMonoTime = int(sec_since_boot() * 1e9)
pm.send(s, new_m)
rk.keep_time()
vs = None
def replay_cameras(lr, frs):
cameras = [
eon_cameras = [
("roadCameraState", DT_MDL, eon_f_frame_size, VisionStreamType.VISION_STREAM_ROAD),
("driverCameraState", DT_DMON, eon_d_frame_size, VisionStreamType.VISION_STREAM_DRIVER),
]
tici_cameras = [
("roadCameraState", DT_MDL, tici_f_frame_size, VisionStreamType.VISION_STREAM_ROAD),
("driverCameraState", DT_MDL, tici_d_frame_size, VisionStreamType.VISION_STREAM_DRIVER),
]
def replay_camera(s, stream, dt, vipc_server, fr, size):
def replay_camera(s, stream, dt, vipc_server, frames, size):
pm = messaging.PubMaster([s, ])
rk = Ratekeeper(1 / dt, print_delay_threshold=None)
img = b"\x00" * int(size[0]*size[1]*3/2)
while True:
if fr is not None:
img = fr.get(rk.frame % fr.frame_count, pix_fmt='yuv420p')[0]
img = img.flatten().tobytes()
if frames is not None:
img = frames[rk.frame % len(frames)]
rk.keep_time()
@ -62,24 +137,34 @@ def replay_cameras(lr, frs):
vipc_server.send(stream, img, msg.frameId, msg.timestampSof, msg.timestampEof)
init_data = [m for m in lr if m.which() == 'initData'][0]
cameras = tici_cameras if (init_data.initData.deviceType == 'tici') else eon_cameras
# init vipc server and cameras
p = []
global vs
vs = VisionIpcServer("camerad")
for (s, dt, size, stream) in cameras:
fr = frs.get(s, None)
frames = None
if fr is not None:
print(f"Decomressing frames {s}")
frames = []
for i in tqdm(range(fr.frame_count)):
img = fr.get(i, pix_fmt='yuv420p')[0]
frames.append(img.flatten().tobytes())
vs.create_buffers(stream, 40, False, size[0], size[1])
p.append(multiprocessing.Process(target=replay_camera,
args=(s, stream, dt, vs, fr, size)))
args=(s, stream, dt, vs, frames, size)))
# hack to make UI work
vs.create_buffers(VisionStreamType.VISION_STREAM_RGB_BACK, 4, True, eon_f_frame_size[0], eon_f_frame_size[1])
vs.start_listener()
return p
return vs, p
def regen_segment(lr, frs=None, outdir=FAKEDATA):
lr = list(lr)
if frs is None:
frs = dict()
@ -91,51 +176,48 @@ def regen_segment(lr, frs=None, outdir=FAKEDATA):
params.put_bool("OpenpilotEnabledToggle", True)
params.put_bool("CommunityFeaturesToggle", True)
params.put_bool("CommunityFeaturesToggle", True)
cal = messaging.new_message('liveCalibration')
cal.liveCalibration.validBlocks = 20
cal.liveCalibration.rpyCalib = [0.0, 0.0, 0.0]
params.put("CalibrationParams", cal.to_bytes())
os.environ["LOG_ROOT"] = outdir
os.environ["SIMULATION"] = "1"
os.environ["REPLAY"] = "1"
os.environ['SKIP_FW_QUERY'] = ""
os.environ['FINGERPRINT'] = ""
# TODO: remove after getting new route for mazda
migration = {
"Mazda CX-9 2021": "MAZDA CX-9 2021",
}
for msg in lr:
if msg.which() == 'carParams':
car_fingerprint = msg.carParams.carFingerprint
car_fingerprint = migration.get(msg.carParams.carFingerprint, msg.carParams.carFingerprint)
if len(msg.carParams.carFw) and (car_fingerprint in FW_VERSIONS):
params.put("CarParamsCache", msg.carParams.as_builder().to_bytes())
else:
os.environ['SKIP_FW_QUERY'] = "1"
os.environ['FINGERPRINT'] = car_fingerprint
elif msg.which() == 'liveCalibration':
params.put("CalibrationParams", msg.as_builder().to_bytes())
#TODO: init car, make sure starts engaged when segment is engaged
vs, cam_procs = replay_cameras(lr, frs)
fake_daemons = {
'sensord': [
multiprocessing.Process(target=replay_service, args=('sensorEvents', lr)),
multiprocessing.Process(target=replay_sensor_events, args=('sensorEvents', lr)),
],
'pandad': [
multiprocessing.Process(target=replay_service, args=('can', lr)),
multiprocessing.Process(target=replay_service, args=('pandaStates', lr)),
multiprocessing.Process(target=replay_service, args=('ubloxRaw', lr)),
multiprocessing.Process(target=replay_panda_states, args=('pandaStates', lr)),
],
'managerState': [
multiprocessing.Process(target=replay_manager_state, args=('managerState', lr)),
],
#'managerState': [
# multiprocessing.Process(target=replay_service, args=('managerState', lr)),
#],
'thermald': [
multiprocessing.Process(target=replay_service, args=('deviceState', lr)),
multiprocessing.Process(target=replay_device_state, args=('deviceState', lr)),
],
'camerad': [
*replay_cameras(lr, frs),
],
# TODO: fix these and run them
'paramsd': [
multiprocessing.Process(target=replay_service, args=('liveParameters', lr)),
],
'locationd': [
multiprocessing.Process(target=replay_service, args=('liveLocationKalman', lr)),
*cam_procs,
],
}
@ -162,11 +244,13 @@ def regen_segment(lr, frs=None, outdir=FAKEDATA):
for p in procs:
p.terminate()
del vs
r = params.get("CurrentRoute", encoding='utf-8')
return os.path.join(outdir, r + "--0")
def regen_and_save(route, sidx, upload=False, use_route_meta=True):
def regen_and_save(route, sidx, upload=False, use_route_meta=False):
if use_route_meta:
r = Route(args.route)
lr = LogReader(r.log_paths()[args.seg])
@ -175,6 +259,11 @@ def regen_and_save(route, sidx, upload=False, use_route_meta=True):
lr = LogReader(f"cd:/{route.replace('|', '/')}/{sidx}/rlog.bz2")
fr = FrameReader(f"cd:/{route.replace('|', '/')}/{sidx}/fcamera.hevc")
rpath = regen_segment(lr, {'roadCameraState': fr})
lr = LogReader(os.path.join(rpath, 'rlog.bz2'))
controls_state_active = [m.controlsState.active for m in lr if m.which() == 'controlsState']
assert any(controls_state_active), "Segment did not engage"
relr = os.path.relpath(rpath)
print("\n\n", "*"*30, "\n\n")

@ -7,12 +7,13 @@ if __name__ == "__main__":
for segment in segments:
route = segment[1].rsplit('--', 1)[0]
sidx = int(segment[1].rsplit('--', 1)[1])
print("Regen", route, sidx)
relr = regen_and_save(route, sidx, upload=True, use_route_meta=False)
print("\n\n", "*"*30, "\n\n")
print("New route:", relr, "\n")
relr = relr.replace('/', '|')
new_segments.append(f'("{segment[0]}", "{relr}"), ')
new_segments.append(f' ("{segment[0]}", "{relr}"), ')
print()
print()
print()

@ -30,18 +30,18 @@ original_segments = [
]
segments = [
("HYUNDAI", "fakedata|2021-10-07--15-56-26--0"),
("TOYOTA", "fakedata|2021-10-07--15-57-47--0"),
("TOYOTA2", "fakedata|2021-10-07--15-59-03--0"),
("TOYOTA3", "fakedata|2021-10-07--15-53-21--0"),
("HONDA", "fakedata|2021-10-07--16-00-19--0"),
("HONDA2", "fakedata|2021-10-07--16-01-35--0"),
("CHRYSLER", "fakedata|2021-10-07--16-02-52--0"),
("SUBARU", "fakedata|2021-10-07--16-04-09--0"),
("GM", "fakedata|2021-10-07--16-05-26--0"),
("NISSAN", "fakedata|2021-10-07--16-09-53--0"),
("VOLKSWAGEN", "fakedata|2021-10-07--16-11-11--0"),
("MAZDA", "bd6a637565e91581|2021-10-30--15-14-53--2"),
("HYUNDAI", "fakedata|2022-01-20--17-49-04--0"),
("TOYOTA", "fakedata|2022-01-20--17-50-51--0"),
("TOYOTA2", "fakedata|2022-01-20--17-52-36--0"),
("TOYOTA3", "fakedata|2022-01-20--17-54-50--0"),
("HONDA", "fakedata|2022-01-20--17-56-40--0"),
("HONDA2", "fakedata|2022-01-20--17-58-25--0"),
("CHRYSLER", "fakedata|2022-01-20--18-00-11--0"),
("SUBARU", "fakedata|2022-01-20--18-01-57--0"),
("GM", "fakedata|2022-01-20--18-03-41--0"),
("NISSAN", "fakedata|2022-01-20--18-05-29--0"),
("VOLKSWAGEN", "fakedata|2022-01-20--18-07-15--0"),
("MAZDA", "fakedata|2022-01-20--18-09-32--0"),
]
# dashcamOnly makes don't need to be tested until a full port is done
@ -65,9 +65,7 @@ def test_process(cfg, lr, cmp_log_fn, ignore_fields=None, ignore_msgs=None):
log_msgs = replay_process(cfg, lr)
# check to make sure openpilot is engaged in the route
# TODO: update routes so enable check can run
# failed enable check: honda bosch, hyundai, chrysler, and subaru
if cfg.proc_name == "controlsd" and FULL_TEST and False:
if cfg.proc_name == "controlsd":
for msg in log_msgs:
if msg.which() == "controlsState":
if msg.controlsState.active:

Loading…
Cancel
Save