segment regen (#21148)
	
		
	
				
					
				
			* start seg regen
* send vipc
* fix up some paths
* set fp
* fix no entries
* engages
* seperate camera procs
* send real frames
* regen test routes
* nice argparser
* fix valgrind test
* move that
* fix that
old-commit-hash: 4a1aec27ef
			
			
				vw-mqb-aeb
			
			
		
							parent
							
								
									2c3d136d32
								
							
						
					
					
						commit
						d31397b84b
					
				
				 13 changed files with 222 additions and 161 deletions
			
			
		@ -0,0 +1 @@ | 
				
			||||
fakedata/ | 
				
			||||
@ -1,83 +0,0 @@ | 
				
			||||
#!/usr/bin/env python3 | 
				
			||||
 | 
				
			||||
import time | 
				
			||||
 | 
				
			||||
from tqdm import tqdm | 
				
			||||
 | 
				
			||||
from selfdrive.manager.process_config import managed_processes | 
				
			||||
from cereal.messaging import PubMaster, recv_one, sub_sock | 
				
			||||
from tools.lib.framereader import FrameReader | 
				
			||||
 | 
				
			||||
 | 
				
			||||
def rreplace(s, old, new, occurrence): | 
				
			||||
  li = s.rsplit(old, occurrence) | 
				
			||||
  return new.join(li) | 
				
			||||
 | 
				
			||||
 | 
				
			||||
def regen_model(msgs, pm, frame_reader, model_sock): | 
				
			||||
  # Send some livecalibration messages to initalize visiond | 
				
			||||
  for msg in msgs: | 
				
			||||
    if msg.which() == 'liveCalibration': | 
				
			||||
      pm.send('liveCalibration', msg.as_builder()) | 
				
			||||
 | 
				
			||||
  out_msgs = [] | 
				
			||||
  fidx = 0 | 
				
			||||
  for msg in tqdm(msgs): | 
				
			||||
    w = msg.which() | 
				
			||||
 | 
				
			||||
    if w == 'roadCameraState': | 
				
			||||
      msg = msg.as_builder() | 
				
			||||
 | 
				
			||||
      img = frame_reader.get(fidx, pix_fmt="rgb24")[0][:,:,::-1] | 
				
			||||
 | 
				
			||||
      msg.roadCameraState.image = img.flatten().tobytes() | 
				
			||||
 | 
				
			||||
      pm.send(w, msg) | 
				
			||||
      model = recv_one(model_sock) | 
				
			||||
      fidx += 1 | 
				
			||||
      out_msgs.append(model) | 
				
			||||
    elif w == 'liveCalibration': | 
				
			||||
      pm.send(w, msg.as_builder()) | 
				
			||||
 | 
				
			||||
  return out_msgs | 
				
			||||
 | 
				
			||||
 | 
				
			||||
def inject_model(msgs, segment_name): | 
				
			||||
  if segment_name.count('--') == 2: | 
				
			||||
    segment_name = rreplace(segment_name, '--', '/', 1) | 
				
			||||
  frame_reader = FrameReader('cd:/'+segment_name.replace("|", "/") + "/fcamera.hevc") | 
				
			||||
 | 
				
			||||
  managed_processes['camerad'].start() | 
				
			||||
  managed_processes['modeld'].start() | 
				
			||||
  # TODO do better than just wait for modeld to boot | 
				
			||||
  time.sleep(5) | 
				
			||||
 | 
				
			||||
  pm = PubMaster(['liveCalibration', 'roadCameraState']) | 
				
			||||
  model_sock = sub_sock('model') | 
				
			||||
  try: | 
				
			||||
    out_msgs = regen_model(msgs, pm, frame_reader, model_sock) | 
				
			||||
  except (KeyboardInterrupt, SystemExit, Exception) as e: | 
				
			||||
    managed_processes['modeld'].stop() | 
				
			||||
    time.sleep(2) | 
				
			||||
    managed_processes['camerad'].stop() | 
				
			||||
    raise e | 
				
			||||
  managed_processes['modeld'].stop() | 
				
			||||
  time.sleep(2) | 
				
			||||
  managed_processes['camerad'].stop() | 
				
			||||
 | 
				
			||||
  new_msgs = [] | 
				
			||||
  midx = 0 | 
				
			||||
  for msg in msgs: | 
				
			||||
    if (msg.which() == 'model') and (midx < len(out_msgs)): | 
				
			||||
      model = out_msgs[midx].as_builder() | 
				
			||||
      model.logMonoTime = msg.logMonoTime | 
				
			||||
      model = model.as_reader() | 
				
			||||
      new_msgs.append(model) | 
				
			||||
      midx += 1 | 
				
			||||
    else: | 
				
			||||
      new_msgs.append(msg) | 
				
			||||
 | 
				
			||||
  print(len(new_msgs), len(list(msgs))) | 
				
			||||
  assert abs(len(new_msgs) - len(list(msgs))) < 2 | 
				
			||||
 | 
				
			||||
  return new_msgs | 
				
			||||
@ -0,0 +1,184 @@ | 
				
			||||
#!/usr/bin/env python3 | 
				
			||||
import argparse | 
				
			||||
import os | 
				
			||||
import time | 
				
			||||
import multiprocessing | 
				
			||||
from tqdm import tqdm | 
				
			||||
 | 
				
			||||
# run DM procs | 
				
			||||
os.environ["USE_WEBCAM"] = "1" | 
				
			||||
 | 
				
			||||
import cereal.messaging as messaging | 
				
			||||
from cereal.services import service_list | 
				
			||||
from cereal.visionipc.visionipc_pyx import VisionIpcServer, VisionStreamType  # pylint: disable=no-name-in-module, import-error | 
				
			||||
from common.params import Params | 
				
			||||
from common.realtime import Ratekeeper, DT_MDL, DT_DMON | 
				
			||||
from common.transformations.camera import eon_f_frame_size, eon_d_frame_size | 
				
			||||
from selfdrive.car.fingerprints import FW_VERSIONS | 
				
			||||
from selfdrive.manager.process import ensure_running | 
				
			||||
from selfdrive.manager.process_config import managed_processes | 
				
			||||
from selfdrive.test.update_ci_routes import upload_route | 
				
			||||
from tools.lib.route import Route | 
				
			||||
from tools.lib.framereader import FrameReader | 
				
			||||
from tools.lib.logreader import LogReader | 
				
			||||
 | 
				
			||||
 | 
				
			||||
process_replay_dir = os.path.dirname(os.path.abspath(__file__)) | 
				
			||||
FAKEDATA = os.path.join(process_replay_dir, "fakedata/") | 
				
			||||
 | 
				
			||||
 | 
				
			||||
def replay_service(s, msgs): | 
				
			||||
  pm = messaging.PubMaster([s, ]) | 
				
			||||
  rk = Ratekeeper(service_list[s].frequency, print_delay_threshold=None) | 
				
			||||
  smsgs = [m for m in msgs if m.which() == s] | 
				
			||||
  for m in smsgs: | 
				
			||||
    pm.send(s, m.as_builder()) | 
				
			||||
    rk.keep_time() | 
				
			||||
 | 
				
			||||
vs = None | 
				
			||||
def replay_cameras(lr, frs): | 
				
			||||
  cameras = [ | 
				
			||||
    ("roadCameraState", DT_MDL, eon_f_frame_size, VisionStreamType.VISION_STREAM_YUV_BACK), | 
				
			||||
    ("driverCameraState", DT_DMON, eon_d_frame_size, VisionStreamType.VISION_STREAM_YUV_FRONT), | 
				
			||||
  ] | 
				
			||||
 | 
				
			||||
  def replay_camera(s, stream, dt, vipc_server, fr, size): | 
				
			||||
    pm = messaging.PubMaster([s, ]) | 
				
			||||
    rk = Ratekeeper(1 / dt, print_delay_threshold=None) | 
				
			||||
 | 
				
			||||
    img = b"\x00" * int(size[0]*size[1]*3/2) | 
				
			||||
    while True: | 
				
			||||
      if fr is not None and False: | 
				
			||||
        img = fr.get(rk.frame % fr.frame_count, pix_fmt='yuv420p')[0] | 
				
			||||
        img = img.flatten().tobytes() | 
				
			||||
        print("got img") | 
				
			||||
 | 
				
			||||
      rk.keep_time() | 
				
			||||
 | 
				
			||||
      m = messaging.new_message(s) | 
				
			||||
      msg = getattr(m, s) | 
				
			||||
      msg.frameId = rk.frame | 
				
			||||
      pm.send(s, m) | 
				
			||||
 | 
				
			||||
      vipc_server.send(stream, img, msg.frameId, msg.timestampSof, msg.timestampEof) | 
				
			||||
 | 
				
			||||
  # init vipc server and cameras | 
				
			||||
  p = [] | 
				
			||||
  global vs | 
				
			||||
  vs = VisionIpcServer("camerad") | 
				
			||||
  for (s, dt, size, stream) in cameras: | 
				
			||||
    fr = frs.get(s, None) | 
				
			||||
    vs.create_buffers(stream, 40, False, size[0], size[1]) | 
				
			||||
    p.append(multiprocessing.Process(target=replay_camera, | 
				
			||||
                                     args=(s, stream, dt, vs, fr, size))) | 
				
			||||
 | 
				
			||||
  # hack to make UI work | 
				
			||||
  vs.create_buffers(VisionStreamType.VISION_STREAM_RGB_BACK, 4, True, eon_f_frame_size[0], eon_f_frame_size[1]) | 
				
			||||
  vs.start_listener() | 
				
			||||
  return p | 
				
			||||
 | 
				
			||||
 | 
				
			||||
def regen_segment(lr, frs=None, outdir=FAKEDATA): | 
				
			||||
 | 
				
			||||
  lr = list(lr) | 
				
			||||
  if frs is None: | 
				
			||||
    frs = dict() | 
				
			||||
 | 
				
			||||
  # setup env | 
				
			||||
  params = Params() | 
				
			||||
  params.clear_all() | 
				
			||||
  params.put_bool("Passive", False) | 
				
			||||
  params.put_bool("OpenpilotEnabledToggle", True) | 
				
			||||
  params.put_bool("CommunityFeaturesToggle", True) | 
				
			||||
  params.put_bool("CommunityFeaturesToggle", True) | 
				
			||||
  cal = messaging.new_message('liveCalibration') | 
				
			||||
  cal.liveCalibration.validBlocks = 20 | 
				
			||||
  cal.liveCalibration.rpyCalib = [0.0, 0.0, 0.0] | 
				
			||||
  params.put("CalibrationParams", cal.to_bytes()) | 
				
			||||
 | 
				
			||||
  os.environ["LOG_ROOT"] = outdir | 
				
			||||
  os.environ["SIMULATION"] = "1" | 
				
			||||
 | 
				
			||||
  os.environ['SKIP_FW_QUERY'] = "" | 
				
			||||
  os.environ['FINGERPRINT'] = "" | 
				
			||||
  for msg in lr: | 
				
			||||
    if msg.which() == 'carParams': | 
				
			||||
      car_fingerprint = msg.carParams.carFingerprint | 
				
			||||
      if len(msg.carParams.carFw) and (car_fingerprint in FW_VERSIONS): | 
				
			||||
        params.put("CarParamsCache", msg.carParams.as_builder().to_bytes()) | 
				
			||||
      else: | 
				
			||||
        os.environ['SKIP_FW_QUERY'] = "1" | 
				
			||||
        os.environ['FINGERPRINT'] = car_fingerprint | 
				
			||||
 | 
				
			||||
  fake_daemons = { | 
				
			||||
    'sensord': [ | 
				
			||||
      multiprocessing.Process(target=replay_service, args=('sensorEvents', lr)), | 
				
			||||
    ], | 
				
			||||
    'pandad': [ | 
				
			||||
      multiprocessing.Process(target=replay_service, args=('can', lr)), | 
				
			||||
      multiprocessing.Process(target=replay_service, args=('pandaState', lr)), | 
				
			||||
    ], | 
				
			||||
    #'managerState': [ | 
				
			||||
    #  multiprocessing.Process(target=replay_service, args=('managerState', lr)), | 
				
			||||
    #], | 
				
			||||
    'thermald': [ | 
				
			||||
      multiprocessing.Process(target=replay_service, args=('deviceState', lr)), | 
				
			||||
    ], | 
				
			||||
    'camerad': [ | 
				
			||||
      *replay_cameras(lr, frs), | 
				
			||||
    ], | 
				
			||||
 | 
				
			||||
    # TODO: fix these and run them | 
				
			||||
    'paramsd': [ | 
				
			||||
      multiprocessing.Process(target=replay_service, args=('liveParameters', lr)), | 
				
			||||
    ], | 
				
			||||
    'locationd': [ | 
				
			||||
      multiprocessing.Process(target=replay_service, args=('liveLocationKalman', lr)), | 
				
			||||
    ], | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  try: | 
				
			||||
    # start procs up | 
				
			||||
    ignore = list(fake_daemons.keys()) + ['ui', 'manage_athenad', 'uploader'] | 
				
			||||
    ensure_running(managed_processes.values(), started=True, not_run=ignore) | 
				
			||||
    for procs in fake_daemons.values(): | 
				
			||||
      for p in procs: | 
				
			||||
        p.start() | 
				
			||||
 | 
				
			||||
    for _ in tqdm(range(60)): | 
				
			||||
      # ensure all procs are running | 
				
			||||
      for d, procs in fake_daemons.items(): | 
				
			||||
        for p in procs: | 
				
			||||
          if not p.is_alive(): | 
				
			||||
            raise Exception(f"{d}'s {p.name} died") | 
				
			||||
      time.sleep(1) | 
				
			||||
  finally: | 
				
			||||
    # kill everything | 
				
			||||
    for p in managed_processes.values(): | 
				
			||||
      p.stop() | 
				
			||||
    for procs in fake_daemons.values(): | 
				
			||||
      for p in procs: | 
				
			||||
        p.terminate() | 
				
			||||
 | 
				
			||||
  r = params.get("CurrentRoute", encoding='utf-8') | 
				
			||||
  return os.path.join(outdir, r + "--0") | 
				
			||||
 | 
				
			||||
if __name__ == "__main__": | 
				
			||||
 | 
				
			||||
  parser = argparse.ArgumentParser(description="Generate new segments from old ones") | 
				
			||||
  parser.add_argument("--upload", action="store_true", help="Upload the new segment to the CI bucket") | 
				
			||||
  parser.add_argument("route", type=str, help="The source route") | 
				
			||||
  parser.add_argument("seg", type=int, help="Segment in source route") | 
				
			||||
  args = parser.parse_args() | 
				
			||||
 | 
				
			||||
  r = Route(args.route) | 
				
			||||
  lr = LogReader(r.log_paths()[args.seg]) | 
				
			||||
  fr = FrameReader(r.camera_paths()[args.seg]) | 
				
			||||
  rpath = regen_segment(lr, {'roadCameraState': fr}) | 
				
			||||
  relr = os.path.relpath(rpath) | 
				
			||||
 | 
				
			||||
  print("\n\n", "*"*30, "\n\n") | 
				
			||||
  print("New route:", relr, "\n") | 
				
			||||
  if args.upload: | 
				
			||||
    upload_route(relr) | 
				
			||||
 | 
				
			||||
@ -1,47 +0,0 @@ | 
				
			||||
#!/usr/bin/env python3 | 
				
			||||
import os | 
				
			||||
import sys | 
				
			||||
 | 
				
			||||
from selfdrive.test.openpilotci import upload_file | 
				
			||||
from selfdrive.test.process_replay.compare_logs import save_log | 
				
			||||
from selfdrive.test.process_replay.test_processes import segments, get_segment | 
				
			||||
from selfdrive.version import get_git_commit | 
				
			||||
from tools.lib.logreader import LogReader | 
				
			||||
from selfdrive.test.process_replay.inject_model import inject_model | 
				
			||||
 | 
				
			||||
if __name__ == "__main__": | 
				
			||||
 | 
				
			||||
  no_upload = "--no-upload" in sys.argv | 
				
			||||
 | 
				
			||||
  process_replay_dir = os.path.dirname(os.path.abspath(__file__)) | 
				
			||||
  ref_commit_fn = os.path.join(process_replay_dir, "model_ref_commit") | 
				
			||||
 | 
				
			||||
  ref_commit = get_git_commit() | 
				
			||||
  if ref_commit is None: | 
				
			||||
    raise Exception("couldn't get ref commit") | 
				
			||||
  with open(ref_commit_fn, "w") as f: | 
				
			||||
    f.write(ref_commit) | 
				
			||||
 | 
				
			||||
  for car_brand, segment in segments: | 
				
			||||
    rlog_fn = get_segment(segment, original=True) | 
				
			||||
 | 
				
			||||
    if rlog_fn is None: | 
				
			||||
      print("failed to get segment %s" % segment) | 
				
			||||
      sys.exit(1) | 
				
			||||
 | 
				
			||||
    lr = LogReader(rlog_fn) | 
				
			||||
    print('injecting model into % s' % segment) | 
				
			||||
    lr = inject_model(lr, segment) | 
				
			||||
 | 
				
			||||
    route_name, segment_num = segment.rsplit("--", 1) | 
				
			||||
    log_fn = "%s/%s/rlog_%s.bz2" % (route_name.replace("|", "/"), segment_num, ref_commit) | 
				
			||||
    tmp_name = 'tmp_%s_%s' % (route_name, segment_num) | 
				
			||||
    save_log(tmp_name, lr) | 
				
			||||
 | 
				
			||||
    if not no_upload: | 
				
			||||
      upload_file(tmp_name, log_fn) | 
				
			||||
      print('uploaded %s', log_fn) | 
				
			||||
      os.remove(tmp_name) | 
				
			||||
    os.remove(rlog_fn) | 
				
			||||
 | 
				
			||||
  print("done") | 
				
			||||
					Loading…
					
					
				
		Reference in new issue