@ -1,10 +1,12 @@
#!/usr/bin/env python3
#!/usr/bin/env python3
import sys
import sys
import argparse
import argparse
import multiprocessing
import multiprocessing
import rerun as rr
import rerun as rr
import rerun . blueprint as rrb
import rerun . blueprint as rrb
from functools import partial
from functools import partial
from collections import defaultdict
from cereal . services import SERVICE_LIST
from cereal . services import SERVICE_LIST
from openpilot . tools . rerun . camera_reader import probe_packet_info , CameraReader , CameraConfig , CameraType
from openpilot . tools . rerun . camera_reader import probe_packet_info , CameraReader , CameraConfig , CameraType
@ -20,19 +22,16 @@ RR_WIN = "openpilot logs"
"""
"""
Relevant upstream Rerun issues :
Relevant upstream Rerun issues :
- large time series : https : / / github . com / rerun - io / rerun / issues / 5967
- loading videos directly : https : / / github . com / rerun - io / rerun / issues / 6532
- loading videos directly : https : / / github . com / rerun - io / rerun / issues / 6532
"""
"""
class Rerunner :
class Rerunner :
def __init__ ( self , route , segment_range , camera_config , enabled_services ) :
def __init__ ( self , route , segment_range , camera_config ) :
self . enabled_services = [ s . lower ( ) for s in enabled_services ]
self . log_all = " all " in self . enabled_services
self . lr = LogReader ( route_or_segment_name )
self . lr = LogReader ( route_or_segment_name )
# hevc files don't have start_time. We get it from qcamera.ts
# hevc files don't have start_time. We get it from qcamera.ts
start_time = 0
start_time = 0
dat = probe_packet_info ( r . qcamera_paths ( ) [ 0 ] )
dat = probe_packet_info ( route . qcamera_paths ( ) [ 0 ] )
for d in dat :
for d in dat :
if d . startswith ( " pts_time= " ) :
if d . startswith ( " pts_time= " ) :
start_time = float ( d . split ( ' = ' ) [ 1 ] )
start_time = float ( d . split ( ' = ' ) [ 1 ] )
@ -49,34 +48,30 @@ class Rerunner:
if dcam :
if dcam :
self . camera_readers [ CameraType . dcam ] = CameraReader ( route . dcamera_paths ( ) , start_time , segment_range . seg_idxs )
self . camera_readers [ CameraType . dcam ] = CameraReader ( route . dcamera_paths ( ) , start_time , segment_range . seg_idxs )
def _start_rerun ( self ) :
self . blueprint = self . _create_blueprint ( )
rr . init ( RR_WIN , spawn = True )
def _create_blueprint ( self ) :
def _create_blueprint ( self ) :
blueprint = None
blueprint = None
service_views = [ ]
service_views = [ ]
log_msg_visible = len ( self . enabled_services ) < = 3 and not self . log_all
for topic in sorted ( SERVICE_LIST . keys ( ) ) :
for topic in sorted ( SERVICE_LIST . keys ( ) ) :
if not self . log_all and topic . lower ( ) not in self . enabled_services :
continue
View = rrb . TimeSeriesView if topic != " thumbnail " else rrb . Spatial2DView
View = rrb . TimeSeriesView if topic != " thumbnail " else rrb . Spatial2DView
service_views . append ( View ( name = topic , origin = f " / { topic } / " , visible = log_msg_visibl e) )
service_views . append ( View ( name = topic , origin = f " / { topic } / " , visible = False ) )
rr . log ( topic , rr . SeriesLine ( name = topic ) , timeless = True )
rr . log ( topic , rr . SeriesLine ( name = topic ) , timeless = True )
center_view = [ rrb . Vertical ( * service_views , name = " streams " ) ]
if len ( self . camera_readers ) :
center_view . append ( rrb . Vertical ( * [ rrb . Spatial2DView ( name = cam_type , origin = cam_type ) for cam_type in self . camera_readers . keys ( ) ] , name = " cameras " ) )
blueprint = rrb . Blueprint (
blueprint = rrb . Blueprint (
rrb . Horizontal (
rrb . Horizontal (
rrb . Vertical ( * service_views ) ,
* center_view
rrb . Vertical ( * [ rrb . Spatial2DView ( name = cam_type , origin = cam_type ) for cam_type in self . camera_readers . keys ( ) ] ) ,
) ,
) ,
rrb . SelectionPanel ( expanded = False ) ,
rrb . SelectionPanel ( expanded = False ) ,
rrb . TimePanel ( expanded = False )
rrb . TimePanel ( expanded = False ) ,
)
)
return blueprint
return blueprint
@staticmethod
@staticmethod
def _log _msg ( msg , parent_key = ' ' ) :
def _parse _msg ( msg , parent_key = ' ' ) :
stack = [ ( msg , parent_key ) ]
stack = [ ( msg , parent_key ) ]
while stack :
while stack :
current_msg , current_parent_key = stack . pop ( )
current_msg , current_parent_key = stack . pop ( )
@ -84,40 +79,47 @@ class Rerunner:
for index , item in enumerate ( current_msg ) :
for index , item in enumerate ( current_msg ) :
new_key = f " { current_parent_key } / { index } "
new_key = f " { current_parent_key } / { index } "
if isinstance ( item , ( int , float ) ) :
if isinstance ( item , ( int , float ) ) :
rr . log ( new_key , rr . Scalar ( item ) )
yield new_key , item
elif isinstance ( item , dict ) :
elif isinstance ( item , dict ) :
stack . append ( ( item , new_key ) )
stack . append ( ( item , new_key ) )
elif isinstance ( current_msg , dict ) :
elif isinstance ( current_msg , dict ) :
for key , value in current_msg . items ( ) :
for key , value in current_msg . items ( ) :
new_key = f " { current_parent_key } / { key } "
new_key = f " { current_parent_key } / { key } "
if isinstance ( value , ( int , float ) ) :
if isinstance ( value , ( int , float ) ) :
rr . log ( new_key , rr . Scalar ( value ) )
yield new_key , value
elif isinstance ( value , dict ) :
elif isinstance ( value , dict ) :
stack . append ( ( value , new_key ) )
stack . append ( ( value , new_key ) )
elif isinstance ( value , list ) :
elif isinstance ( value , list ) :
for index , item in enumerate ( value ) :
for index , item in enumerate ( value ) :
if isinstance ( item , ( int , float ) ) :
if isinstance ( item , ( int , float ) ) :
rr . log ( f " { new_key } / { index } " , rr . Scalar ( item ) )
yield f " { new_key } / { index } " , item
else :
else :
pass # Not a plottable value
pass # Not a plottable value
@staticmethod
@staticmethod
@rr . shutdown_at_exit
@rr . shutdown_at_exit
def _process_log_msgs ( blueprint , enabled_services , log_all , lr ) :
def _process_log_msgs ( blueprint , lr ) :
rr . init ( RR_WIN )
rr . init ( RR_WIN )
rr . connect ( default_blueprint = blueprint )
rr . connect ( )
rr . send_blueprint ( blueprint )
log_msgs = defaultdict ( lambda : defaultdict ( list ) )
for msg in lr :
for msg in lr :
rr . set_time_nanos ( RR_TIMELINE_NAME , msg . logMonoTime )
msg_type = msg . which ( )
msg_type = msg . which ( )
if not log_all and msg_type . lower ( ) not in enabled_services :
if msg_type == " thumbnail " :
continue
continue
if msg_type != " thumbnail " :
for entity_path , dat in Rerunner . _parse_msg ( msg . to_dict ( ) [ msg_type ] , msg_type ) :
Rerunner . _log_msg ( msg . to_dict ( ) [ msg . which ( ) ] , msg . which ( ) )
log_msgs [ entity_path ] [ " times " ] . append ( msg . logMonoTime / 1e9 )
else :
log_msgs [ entity_path ] [ " data " ] . append ( dat )
rr . log ( " /thumbnail " , rr . ImageEncoded ( contents = msg . to_dict ( ) [ msg . which ( ) ] . get ( " thumbnail " ) ) )
for entity_path , log_msg in log_msgs . items ( ) :
rr . log_temporal_batch (
entity_path ,
times = [ rr . TimeSecondsBatch ( RR_TIMELINE_NAME , log_msg [ " times " ] ) ] ,
components = [ rr . components . ScalarBatch ( log_msg [ " data " ] ) ]
)
return [ ]
return [ ]
@ -125,18 +127,22 @@ class Rerunner:
@rr . shutdown_at_exit
@rr . shutdown_at_exit
def _process_cam_readers ( blueprint , cam_type , h , w , fr ) :
def _process_cam_readers ( blueprint , cam_type , h , w , fr ) :
rr . init ( RR_WIN )
rr . init ( RR_WIN )
rr . connect ( default_blueprint = blueprint )
rr . connect ( )
rr . send_blueprint ( blueprint )
for ts , frame in fr :
for ts , frame in fr :
rr . set_time_nanos ( RR_TIMELINE_NAME , int ( ts * 1e9 ) )
rr . set_time_nanos ( RR_TIMELINE_NAME , int ( ts * 1e9 ) )
rr . log ( cam_type , rr . Image ( bytes = frame , width = w , height = h , pixel_format = rr . PixelFormat . NV12 ) )
rr . log ( cam_type , rr . Image ( bytes = frame , width = w , height = h , pixel_format = rr . PixelFormat . NV12 ) )
def load_data ( self ) :
def load_data ( self ) :
self . _start_rerun ( )
rr . init ( RR_WIN , spawn = True )
if len ( self . enabled_services ) > 0 :
self . lr . run_across_segments ( NUM_CPUS , partial ( self . _process_log_msgs , self . blueprint , self . enabled_services , self . log_all ) )
startup_blueprint = self . _create_blueprint ( )
self . lr . run_across_segments ( NUM_CPUS , partial ( self . _process_log_msgs , startup_blueprint ) , desc = " Log messages " )
for cam_type , cr in self . camera_readers . items ( ) :
for cam_type , cr in self . camera_readers . items ( ) :
cr . run_across_segments ( NUM_CPUS , partial ( self . _process_cam_readers , self . blueprint , cam_type , cr . h , cr . w ) )
cr . run_across_segments ( NUM_CPUS , partial ( self . _process_cam_readers , startup_blueprint , cam_type , cr . h , cr . w ) , desc = cam_type )
rr . send_blueprint ( self . _create_blueprint ( ) )
if __name__ == ' __main__ ' :
if __name__ == ' __main__ ' :
@ -147,34 +153,28 @@ if __name__ == '__main__':
parser . add_argument ( " --fcam " , action = " store_true " , help = " Show driving camera " )
parser . add_argument ( " --fcam " , action = " store_true " , help = " Show driving camera " )
parser . add_argument ( " --ecam " , action = " store_true " , help = " Show wide camera " )
parser . add_argument ( " --ecam " , action = " store_true " , help = " Show wide camera " )
parser . add_argument ( " --dcam " , action = " store_true " , help = " Show driver monitoring camera " )
parser . add_argument ( " --dcam " , action = " store_true " , help = " Show driver monitoring camera " )
parser . add_argument ( " --print_services " , action = " store_true " , help = " List out openpilot services " )
parser . add_argument ( " route_or_segment_name " , nargs = ' ? ' , help = " The route or segment name to plot " )
parser . add_argument ( " --services " , default = [ ] , nargs = ' * ' , help = " Specify openpilot services that will be logged. \
No service will be logged if not specified . \
To log all services include ' all ' as one of your services " )
parser . add_argument ( " --route " , nargs = ' ? ' , help = " The route or segment name to plot " )
args = parser . parse_args ( )
args = parser . parse_args ( )
if not args . demo and not args . route :
if not args . demo and not args . route_or_segment_name :
parser . print_help ( )
parser . print_help ( )
sys . exit ( )
sys . exit ( )
if args . print_services :
print ( " \n " . join ( SERVICE_LIST . keys ( ) ) )
sys . exit ( )
camera_config = CameraConfig ( args . qcam , args . fcam , args . ecam , args . dcam )
camera_config = CameraConfig ( args . qcam , args . fcam , args . ecam , args . dcam )
route_or_segment_name = DEMO_ROUTE if args . demo else args . route_or_segment_name . strip ( )
route_or_segment_name = DEMO_ROUTE if args . demo else args . route . strip ( )
sr = SegmentRange ( route_or_segment_name )
sr = SegmentRange ( route_or_segment_name )
r = Route ( sr . route_name )
r = Route ( sr . route_name )
if len ( sr . seg_idxs ) > 10 :
hevc_requested = any ( camera_config [ 1 : ] )
print ( " You ' re requesting more than 10 segments of the route, " + \
if len ( sr . seg_idxs ) > 1 and hevc_requested :
" please be aware that might take a lot of memory " )
print ( " You ' re requesting more than 1 segment with hevc videos, " + \
" please be aware that might take a lot of memory " + \
" since rerun isn ' t yet well supported for high resolution video logging " )
response = input ( " Do you wish to continue? (Y/n): " )
response = input ( " Do you wish to continue? (Y/n): " )
if response . strip ( ) . lower ( ) != " y " :
if response . strip ( ) . lower ( ) != " y " :
sys . exit ( )
sys . exit ( )
rerunner = Rerunner ( r , sr , camera_config , args . services )
rerunner = Rerunner ( r , sr , camera_config )
rerunner . load_data ( )
rerunner . load_data ( )