openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.

84 lines
2.1 KiB

#!/usr/bin/env python3
import time
from tqdm import tqdm
from selfdrive.manager.process_config import managed_processes
from cereal.messaging import PubMaster, recv_one, sub_sock
from tools.lib.framereader import FrameReader
def rreplace(s, old, new, occurrence):
li = s.rsplit(old, occurrence)
return new.join(li)
def regen_model(msgs, pm, frame_reader, model_sock):
# Send some livecalibration messages to initalize visiond
for msg in msgs:
if msg.which() == 'liveCalibration':
pm.send('liveCalibration', msg.as_builder())
out_msgs = []
fidx = 0
for msg in tqdm(msgs):
w = msg.which()
if w == 'roadCameraState':
msg = msg.as_builder()
img = frame_reader.get(fidx, pix_fmt="rgb24")[0][:,:,::-1]
msg.roadCameraState.image = img.flatten().tobytes()
pm.send(w, msg)
model = recv_one(model_sock)
fidx += 1
out_msgs.append(model)
elif w == 'liveCalibration':
pm.send(w, msg.as_builder())
return out_msgs
def inject_model(msgs, segment_name):
if segment_name.count('--') == 2:
segment_name = rreplace(segment_name, '--', '/', 1)
frame_reader = FrameReader('cd:/'+segment_name.replace("|", "/") + "/fcamera.hevc")
managed_processes['camerad'].start()
managed_processes['modeld'].start()
# TODO do better than just wait for modeld to boot
time.sleep(5)
pm = PubMaster(['liveCalibration', 'roadCameraState'])
model_sock = sub_sock('model')
try:
out_msgs = regen_model(msgs, pm, frame_reader, model_sock)
except (KeyboardInterrupt, SystemExit, Exception) as e:
managed_processes['modeld'].stop()
time.sleep(2)
managed_processes['camerad'].stop()
raise e
managed_processes['modeld'].stop()
time.sleep(2)
managed_processes['camerad'].stop()
new_msgs = []
midx = 0
for msg in msgs:
if (msg.which() == 'model') and (midx < len(out_msgs)):
model = out_msgs[midx].as_builder()
model.logMonoTime = msg.logMonoTime
model = model.as_reader()
new_msgs.append(model)
midx += 1
else:
new_msgs.append(msg)
print(len(new_msgs), len(list(msgs)))
assert abs(len(new_msgs) - len(list(msgs))) < 2
return new_msgs