openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

61 lines
2.2 KiB

#!/usr/bin/env python3
import argparse
import os
import sys
import numpy as np
import rerun as rr
import cereal.messaging as messaging
from openpilot.common.basedir import BASEDIR
from openpilot.tools.replay.lib.rp_helpers import (UP, rerunColorPalette,
get_blank_lid_overlay,
update_radar_points, plot_lead,
plot_model)
from msgq.visionipc import VisionIpcClient, VisionStreamType
os.environ['BASEDIR'] = BASEDIR
UP.lidar_zoom = 6
def visualize(addr):
sm = messaging.SubMaster(['radarState', 'liveTracks', 'modelV2'], addr=addr)
vipc_client = VisionIpcClient("camerad", VisionStreamType.VISION_STREAM_ROAD, True)
while True:
if not vipc_client.is_connected():
vipc_client.connect(True)
new_data = vipc_client.recv()
if new_data is None or not new_data.data.any():
continue
sm.update(0)
lid_overlay = get_blank_lid_overlay(UP)
if sm.recv_frame['modelV2']:
plot_model(sm['modelV2'], lid_overlay)
if sm.recv_frame['radarState']:
plot_lead(sm['radarState'], lid_overlay)
liveTracksTime = sm.logMonoTime['liveTracks']
if sm.updated['liveTracks']:
update_radar_points(sm['liveTracks'], lid_overlay)
rr.set_time_nanos("TIMELINE", liveTracksTime)
rr.log("tracks", rr.SegmentationImage(np.flip(np.rot90(lid_overlay, k=-1), axis=1)))
def get_arg_parser():
parser = argparse.ArgumentParser(
description="Show replay data in a UI.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("ip_address", nargs="?", default="127.0.0.1",
help="The ip address on which to receive zmq messages.")
parser.add_argument("--frame-address", default=None,
help="The frame address (fully qualified ZMQ endpoint for frames) on which to receive zmq messages.")
return parser
if __name__ == "__main__":
args = get_arg_parser().parse_args(sys.argv[1:])
if args.ip_address != "127.0.0.1":
os.environ["ZMQ"] = "1"
messaging.reset_context()
rr.init("RadarPoints", spawn= True)
rr.log("tracks", rr.AnnotationContext(rerunColorPalette), static=True)
visualize(args.ip_address)