import  os 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  re 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  functools  import  cache 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  urllib . parse  import  urlparse 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  collections  import  defaultdict 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  itertools  import  chain 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  typing  import  cast 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . tools . lib . auth_config  import  get_token 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . tools . lib . api  import  CommaApi 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . tools . lib . helpers  import  RE 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								QLOG_FILENAMES  =  [ ' qlog ' ,  ' qlog.bz2 ' ] 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								QCAMERA_FILENAMES  =  [ ' qcamera.ts ' ] 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								LOG_FILENAMES  =  [ ' rlog ' ,  ' rlog.bz2 ' ,  ' raw_log.bz2 ' ] 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								CAMERA_FILENAMES  =  [ ' fcamera.hevc ' ,  ' video.hevc ' ] 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								DCAMERA_FILENAMES  =  [ ' dcamera.hevc ' ] 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								ECAMERA_FILENAMES  =  [ ' ecamera.hevc ' ] 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								class  Route : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  __init__ ( self ,  name ,  data_dir = None ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _name  =  RouteName ( name ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . files  =  None 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    if  data_dir  is  not  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . _segments  =  self . _get_segments_local ( data_dir ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . _segments  =  self . _get_segments_remote ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . max_seg_number  =  self . _segments [ - 1 ] . name . segment_num 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  name ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  self . _name 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  segments ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  self . _segments 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  log_paths ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    log_path_by_seg_num  =  { s . name . segment_num :  s . log_path  for  s  in  self . _segments } 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  [ log_path_by_seg_num . get ( i ,  None )  for  i  in  range ( self . max_seg_number  +  1 ) ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  qlog_paths ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    qlog_path_by_seg_num  =  { s . name . segment_num :  s . qlog_path  for  s  in  self . _segments } 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  [ qlog_path_by_seg_num . get ( i ,  None )  for  i  in  range ( self . max_seg_number  +  1 ) ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  camera_paths ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    camera_path_by_seg_num  =  { s . name . segment_num :  s . camera_path  for  s  in  self . _segments } 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  [ camera_path_by_seg_num . get ( i ,  None )  for  i  in  range ( self . max_seg_number  +  1 ) ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  dcamera_paths ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    dcamera_path_by_seg_num  =  { s . name . segment_num :  s . dcamera_path  for  s  in  self . _segments } 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  [ dcamera_path_by_seg_num . get ( i ,  None )  for  i  in  range ( self . max_seg_number  +  1 ) ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  ecamera_paths ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    ecamera_path_by_seg_num  =  { s . name . segment_num :  s . ecamera_path  for  s  in  self . _segments } 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  [ ecamera_path_by_seg_num . get ( i ,  None )  for  i  in  range ( self . max_seg_number  +  1 ) ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  qcamera_paths ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    qcamera_path_by_seg_num  =  { s . name . segment_num :  s . qcamera_path  for  s  in  self . _segments } 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  [ qcamera_path_by_seg_num . get ( i ,  None )  for  i  in  range ( self . max_seg_number  +  1 ) ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  # TODO: refactor this, it's super repetitive 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  _get_segments_remote ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    api  =  CommaApi ( get_token ( ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    route_files  =  api . get ( ' v1/route/ '  +  self . name . canonical_name  +  ' /files ' ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . files  =  list ( chain . from_iterable ( route_files . values ( ) ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    segments  =  { } 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    for  url  in  self . files : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      _ ,  dongle_id ,  time_str ,  segment_num ,  fn  =  urlparse ( url ) . path . rsplit ( ' / ' ,  maxsplit = 4 ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      segment_name  =  f ' { dongle_id } | { time_str } -- { segment_num } ' 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  segments . get ( segment_name ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        segments [ segment_name ]  =  Segment ( 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          segment_name , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          url  if  fn  in  LOG_FILENAMES  else  segments [ segment_name ] . log_path , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          url  if  fn  in  QLOG_FILENAMES  else  segments [ segment_name ] . qlog_path , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          url  if  fn  in  CAMERA_FILENAMES  else  segments [ segment_name ] . camera_path , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          url  if  fn  in  DCAMERA_FILENAMES  else  segments [ segment_name ] . dcamera_path , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          url  if  fn  in  ECAMERA_FILENAMES  else  segments [ segment_name ] . ecamera_path , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          url  if  fn  in  QCAMERA_FILENAMES  else  segments [ segment_name ] . qcamera_path , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      else : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        segments [ segment_name ]  =  Segment ( 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          segment_name , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          url  if  fn  in  LOG_FILENAMES  else  None , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          url  if  fn  in  QLOG_FILENAMES  else  None , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          url  if  fn  in  CAMERA_FILENAMES  else  None , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          url  if  fn  in  DCAMERA_FILENAMES  else  None , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          url  if  fn  in  ECAMERA_FILENAMES  else  None , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          url  if  fn  in  QCAMERA_FILENAMES  else  None , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  sorted ( segments . values ( ) ,  key = lambda  seg :  seg . name . segment_num ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  _get_segments_local ( self ,  data_dir ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    files  =  os . listdir ( data_dir ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    segment_files  =  defaultdict ( list ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  f  in  files : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      fullpath  =  os . path . join ( data_dir ,  f ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      explorer_match  =  re . match ( RE . EXPLORER_FILE ,  f ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      op_match  =  re . match ( RE . OP_SEGMENT_DIR ,  f ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  explorer_match : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        segment_name  =  explorer_match . group ( ' segment_name ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        fn  =  explorer_match . group ( ' file_name ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  segment_name . replace ( ' _ ' ,  ' | ' ) . startswith ( self . name . canonical_name ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          segment_files [ segment_name ] . append ( ( fullpath ,  fn ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      elif  op_match  and  os . path . isdir ( fullpath ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        segment_name  =  op_match . group ( ' segment_name ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  segment_name . startswith ( self . name . canonical_name ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          for  seg_f  in  os . listdir ( fullpath ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            segment_files [ segment_name ] . append ( ( os . path . join ( fullpath ,  seg_f ) ,  seg_f ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      elif  f  ==  self . name . canonical_name : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        for  seg_num  in  os . listdir ( fullpath ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          if  not  seg_num . isdigit ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          segment_name  =  f ' { self . name . canonical_name } -- { seg_num } ' 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          for  seg_f  in  os . listdir ( os . path . join ( fullpath ,  seg_num ) ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            segment_files [ segment_name ] . append ( ( os . path . join ( fullpath ,  seg_num ,  seg_f ) ,  seg_f ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    segments  =  [ ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  segment ,  files  in  segment_files . items ( ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        log_path  =  next ( path  for  path ,  filename  in  files  if  filename  in  LOG_FILENAMES ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      except  StopIteration : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        log_path  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        qlog_path  =  next ( path  for  path ,  filename  in  files  if  filename  in  QLOG_FILENAMES ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      except  StopIteration : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        qlog_path  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        camera_path  =  next ( path  for  path ,  filename  in  files  if  filename  in  CAMERA_FILENAMES ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      except  StopIteration : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        camera_path  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        dcamera_path  =  next ( path  for  path ,  filename  in  files  if  filename  in  DCAMERA_FILENAMES ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      except  StopIteration : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        dcamera_path  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        ecamera_path  =  next ( path  for  path ,  filename  in  files  if  filename  in  ECAMERA_FILENAMES ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      except  StopIteration : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        ecamera_path  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        qcamera_path  =  next ( path  for  path ,  filename  in  files  if  filename  in  QCAMERA_FILENAMES ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      except  StopIteration : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        qcamera_path  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      segments . append ( Segment ( segment ,  log_path ,  qlog_path ,  camera_path ,  dcamera_path ,  ecamera_path ,  qcamera_path ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  len ( segments )  ==  0 : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      raise  ValueError ( f ' Could not find segments for route  { self . name . canonical_name }  in data directory  { data_dir } ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  sorted ( segments ,  key = lambda  seg :  seg . name . segment_num ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								class  Segment : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  __init__ ( self ,  name ,  log_path ,  qlog_path ,  camera_path ,  dcamera_path ,  ecamera_path ,  qcamera_path ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . _name  =  SegmentName ( name ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . log_path  =  log_path 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . qlog_path  =  qlog_path 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . camera_path  =  camera_path 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . dcamera_path  =  dcamera_path 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . ecamera_path  =  ecamera_path 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . qcamera_path  =  qcamera_path 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  name ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  self . _name 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								class  RouteName : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  __init__ ( self ,  name_str :  str ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _name_str  =  name_str 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    delim  =  next ( c  for  c  in  self . _name_str  if  c  in  ( " | " ,  " / " ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _dongle_id ,  self . _time_str  =  self . _name_str . split ( delim ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  len ( self . _dongle_id )  ==  16 ,  self . _name_str 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  len ( self . _time_str )  ==  20 ,  self . _name_str 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _canonical_name  =  f " { self . _dongle_id } | { self . _time_str } " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  canonical_name ( self )  - >  str :  return  self . _canonical_name 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  dongle_id ( self )  - >  str :  return  self . _dongle_id 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  time_str ( self )  - >  str :  return  self . _time_str 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  __str__ ( self )  - >  str :  return  self . _canonical_name 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								class  SegmentName : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # TODO: add constructor that takes dongle_id, time_str, segment_num and then create instances 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # of this class instead of manually constructing a segment name (use canonical_name prop instead) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  __init__ ( self ,  name_str :  str ,  allow_route_name = False ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    data_dir_path_separator_index  =  name_str . rsplit ( " | " ,  1 ) [ 0 ] . rfind ( " / " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    use_data_dir  =  ( data_dir_path_separator_index  !=  - 1 )  and  ( " | "  in  name_str ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _name_str  =  name_str [ data_dir_path_separator_index  +  1 : ]  if  use_data_dir  else  name_str 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _data_dir  =  name_str [ : data_dir_path_separator_index ]  if  use_data_dir  else  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    seg_num_delim  =  " -- "  if  self . _name_str . count ( " -- " )  ==  2  else  " / " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    name_parts  =  self . _name_str . rsplit ( seg_num_delim ,  1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  allow_route_name  and  len ( name_parts )  ==  1 : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      name_parts . append ( " -1 " )   # no segment number 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . _route_name  =  RouteName ( name_parts [ 0 ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _num  =  int ( name_parts [ 1 ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _canonical_name  =  f " { self . _route_name . _dongle_id } | { self . _route_name . _time_str } -- { self . _num } " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  canonical_name ( self )  - >  str :  return  self . _canonical_name 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  dongle_id ( self )  - >  str :  return  self . _route_name . dongle_id 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  time_str ( self )  - >  str :  return  self . _route_name . time_str 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  segment_num ( self )  - >  int :  return  self . _num 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  route_name ( self )  - >  RouteName :  return  self . _route_name 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  data_dir ( self )  - >  str  |  None :  return  self . _data_dir 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  __str__ ( self )  - >  str :  return  self . _canonical_name 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								@cache 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								def  get_max_seg_number_cached ( sr :  ' SegmentRange ' )  - >  int : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    api  =  CommaApi ( get_token ( ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  cast ( int ,  api . get ( " /v1/route/ "  +  sr . route_name . replace ( " / " ,  " | " ) ) [ " segment_numbers " ] [ - 1 ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  except  Exception  as  e : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    raise  Exception ( " unable to get max_segment_number. ensure you have access to this route or the route is public. " )  from  e 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								class  SegmentRange : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  __init__ ( self ,  segment_range :  str ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    m  =  re . fullmatch ( RE . SEGMENT_RANGE ,  segment_range ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  m  is  not  None ,  f " Segment range is not valid  { segment_range } " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . m  =  m 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  route_name ( self )  - >  str : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  self . m . group ( " route_name " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  dongle_id ( self )  - >  str : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  self . m . group ( " dongle_id " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  timestamp ( self )  - >  str : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  self . m . group ( " timestamp " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  slice ( self )  - >  str : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  self . m . group ( " slice " )  or  " " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  selector ( self )  - >  str  |  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  self . m . group ( " selector " ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  seg_idxs ( self )  - >  list [ int ] : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    m  =  re . fullmatch ( RE . SLICE ,  self . slice ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  m  is  not  None ,  f " Invalid slice:  { self . slice } " 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    start ,  end ,  step  =  ( None  if  s  is  None  else  int ( s )  for  s  in  m . groups ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # one segment specified 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    if  start  is  not  None  and  end  is  None  and  ' : '  not  in  self . slice : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      if  start  <  0 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        start  + =  get_max_seg_number_cached ( self )  +  1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      return  [ start ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    s  =  slice ( start ,  end ,  step ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # no specified end or using relative indexing, need number of segments 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  end  is  None  or  end  <  0  or  ( start  is  not  None  and  start  <  0 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      return  list ( range ( get_max_seg_number_cached ( self )  +  1 ) ) [ s ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      return  list ( range ( end  +  1 ) ) [ s ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  __str__ ( self )  - >  str : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  f " { self . dongle_id } / { self . timestamp } "  +  ( f " / { self . slice } "  if  self . slice  else  " " )  +  ( f " / { self . selector } "  if  self . selector  else  " " ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  __repr__ ( self )  - >  str : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  self . __str__ ( )