You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							59 lines
						
					
					
						
							2.2 KiB
						
					
					
				
			
		
		
	
	
							59 lines
						
					
					
						
							2.2 KiB
						
					
					
				| #!/usr/bin/env python3
 | |
| import argparse
 | |
| import os
 | |
| import sys
 | |
| import numpy as np
 | |
| import rerun as rr
 | |
| import cereal.messaging as messaging
 | |
| from openpilot.common.basedir import BASEDIR
 | |
| from openpilot.tools.replay.lib.rp_helpers import (UP, rerunColorPalette,
 | |
|                                          get_blank_lid_overlay,
 | |
|                                          maybe_update_radar_points, plot_lead,
 | |
|                                          plot_model)
 | |
| from msgq.visionipc import VisionIpcClient, VisionStreamType
 | |
| 
 | |
| os.environ['BASEDIR'] = BASEDIR
 | |
| 
 | |
| UP.lidar_zoom = 6
 | |
| 
 | |
| def visualize(addr):
 | |
|   sm = messaging.SubMaster(['radarState', 'liveTracks', 'modelV2'], addr=addr)
 | |
|   vipc_client = VisionIpcClient("camerad", VisionStreamType.VISION_STREAM_ROAD, True)
 | |
|   while True:
 | |
|     if not vipc_client.is_connected():
 | |
|       vipc_client.connect(True)
 | |
|     new_data = vipc_client.recv()
 | |
|     if new_data is None or not new_data.data.any():
 | |
|       continue
 | |
| 
 | |
|     sm.update(0)
 | |
|     lid_overlay = get_blank_lid_overlay(UP)
 | |
|     if sm.recv_frame['modelV2']:
 | |
|       plot_model(sm['modelV2'], lid_overlay)
 | |
|     if sm.recv_frame['radarState']:
 | |
|       plot_lead(sm['radarState'], lid_overlay)
 | |
|     liveTracksTime = sm.logMonoTime['liveTracks']
 | |
|     maybe_update_radar_points(sm['liveTracks'], lid_overlay)
 | |
|     rr.set_time_nanos("TIMELINE", liveTracksTime)
 | |
|     rr.log("tracks", rr.SegmentationImage(np.flip(np.rot90(lid_overlay, k=-1), axis=1)))
 | |
| 
 | |
| 
 | |
| def get_arg_parser():
 | |
|   parser = argparse.ArgumentParser(
 | |
|     description="Show replay data in a UI.",
 | |
|     formatter_class=argparse.ArgumentDefaultsHelpFormatter)
 | |
|   parser.add_argument("ip_address", nargs="?", default="127.0.0.1",
 | |
|                       help="The ip address on which to receive zmq messages.")
 | |
|   parser.add_argument("--frame-address", default=None,
 | |
|                       help="The frame address (fully qualified ZMQ endpoint for frames) on which to receive zmq messages.")
 | |
|   return parser
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|   args = get_arg_parser().parse_args(sys.argv[1:])
 | |
|   if args.ip_address != "127.0.0.1":
 | |
|     os.environ["ZMQ"] = "1"
 | |
|     messaging.reset_context()
 | |
|   rr.init("RadarPoints", spawn= True)
 | |
|   rr.log("tracks", rr.AnnotationContext(rerunColorPalette), static=True)
 | |
|   visualize(args.ip_address)
 | |
| 
 |