#!/usr/bin/env python3 import argparse import os import sys import numpy as np import rerun as rr import cereal.messaging as messaging from openpilot.common.basedir import BASEDIR from openpilot.tools.replay.lib.rp_helpers import (UP, rerunColorPalette, get_blank_lid_overlay, update_radar_points, plot_lead, plot_model) from msgq.visionipc import VisionIpcClient, VisionStreamType os.environ['BASEDIR'] = BASEDIR UP.lidar_zoom = 6 def visualize(addr): sm = messaging.SubMaster(['radarState', 'liveTracks', 'modelV2'], addr=addr) vipc_client = VisionIpcClient("camerad", VisionStreamType.VISION_STREAM_ROAD, True) while True: if not vipc_client.is_connected(): vipc_client.connect(True) new_data = vipc_client.recv() if new_data is None or not new_data.data.any(): continue sm.update(0) lid_overlay = get_blank_lid_overlay(UP) if sm.recv_frame['modelV2']: plot_model(sm['modelV2'], lid_overlay) if sm.recv_frame['radarState']: plot_lead(sm['radarState'], lid_overlay) liveTracksTime = sm.logMonoTime['liveTracks'] if sm.updated['liveTracks']: update_radar_points(sm['liveTracks'], lid_overlay) rr.set_time_nanos("TIMELINE", liveTracksTime) rr.log("tracks", rr.SegmentationImage(np.flip(np.rot90(lid_overlay, k=-1), axis=1))) def get_arg_parser(): parser = argparse.ArgumentParser( description="Show replay data in a UI.", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("ip_address", nargs="?", default="127.0.0.1", help="The ip address on which to receive zmq messages.") parser.add_argument("--frame-address", default=None, help="The frame address (fully qualified ZMQ endpoint for frames) on which to receive zmq messages.") return parser if __name__ == "__main__": args = get_arg_parser().parse_args(sys.argv[1:]) if args.ip_address != "127.0.0.1": os.environ["ZMQ"] = "1" messaging.reset_context() rr.init("RadarPoints", spawn= True) rr.log("tracks", rr.AnnotationContext(rerunColorPalette), static=True) visualize(args.ip_address)