#!/usr/bin/env python 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  argparse 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  os 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  sys 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  zmq 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  time 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  gc 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  signal 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  threading  import  Thread 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  numpy  as  np 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  uuid  import  uuid4 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  collections  import  namedtuple 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  collections  import  deque 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  multiprocessing  import  Process ,  TimeoutError 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  datetime  import  datetime 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								# strat 1: script to copy files 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								# strat 2: build pip packages around these 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								# could be its own pip package, which we'd need to build and release 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  cereal  import  log  as  capnp_log 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  cereal . services  import  service_list 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  cereal . messaging  import  pub_sock ,  MultiplePublishersError 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  common  import  realtime 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  tools . lib . file_helpers  import  mkdirs_exists_ok 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  tools . lib . kbhit  import  KBHit 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  tools . lib . logreader  import  MultiLogIterator 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  tools . lib . route  import  Route 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  tools . lib . route_framereader  import  RouteFrameReader 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								# Commands. 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								SetRoute  =  namedtuple ( " SetRoute " ,  ( " name " ,  " start_time " ,  " data_dir " ) ) 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								SeekAbsoluteTime  =  namedtuple ( " SeekAbsoluteTime " ,  ( " secs " , ) ) 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								SeekRelativeTime  =  namedtuple ( " SeekRelativeTime " ,  ( " secs " , ) ) 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								TogglePause  =  namedtuple ( " TogglePause " ,  ( ) ) 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								StopAndQuit  =  namedtuple ( " StopAndQuit " ,  ( ) ) 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  UnloggerWorker ( object ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  __init__ ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _frame_reader  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _cookie  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _readahead  =  deque ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  run ( self ,  commands_address ,  data_address ,  pub_types ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    zmq . Context . _instance  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    commands_socket  =  zmq . Context . instance ( ) . socket ( zmq . PULL ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    commands_socket . connect ( commands_address ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    data_socket  =  zmq . Context . instance ( ) . socket ( zmq . PUSH ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    data_socket . connect ( data_address ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    poller  =  zmq . Poller ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    poller . register ( commands_socket ,  zmq . POLLIN ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # We can't publish frames without encodeIdx, so add when it's missing. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  " frame "  in  pub_types : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      pub_types [ " encodeIdx " ]  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # gc.set_debug(gc.DEBUG_LEAK | gc.DEBUG_OBJECTS | gc.DEBUG_STATS | gc.DEBUG_SAVEALL | 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # gc.DEBUG_UNCOLLECTABLE) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # TODO: WARNING pycapnp leaks memory all over the place after unlogger runs for a while, gc 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # pauses become huge because there are so many tracked objects solution will be to switch to new 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # cython capnp 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      route  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      while  True : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        while  poller . poll ( 0. )  or  route  is  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          cookie ,  cmd  =  commands_socket . recv_pyobj ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          route  =  self . _process_commands ( cmd ,  route ,  pub_types ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # **** get message **** 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . _read_logs ( cookie ,  pub_types ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . _send_logs ( data_socket ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    finally : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  self . _frame_reader  is  not  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . _frame_reader . close ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      data_socket . close ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      commands_socket . close ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  _read_logs ( self ,  cookie ,  pub_types ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    fullHEVC  =  capnp_log . EncodeIndex . Type . fullHEVC 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lr  =  self . _lr 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    while  len ( self . _readahead )  <  1000 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      route_time  =  lr . tell ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      msg  =  next ( lr ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      typ  =  msg . which ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  typ  not  in  pub_types : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # **** special case certain message types **** 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  typ  ==  " encodeIdx "  and  msg . encodeIdx . type  ==  fullHEVC : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # this assumes the encodeIdx always comes before the frame 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . _frame_id_lookup [ 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          msg . encodeIdx . frameId ]  =  msg . encodeIdx . segmentNum ,  msg . encodeIdx . segmentId 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        #print "encode", msg.encodeIdx.frameId, len(self._readahead), route_time 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . _readahead . appendleft ( ( typ ,  msg ,  route_time ,  cookie ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  _send_logs ( self ,  data_socket ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    while  len ( self . _readahead )  >  500 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      typ ,  msg ,  route_time ,  cookie  =  self . _readahead . pop ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      smsg  =  msg . as_builder ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  typ  ==  " frame " : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        frame_id  =  msg . frame . frameId 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # Frame exists, make sure we have a framereader. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # load the frame readers as needed 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        s1  =  time . time ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        img  =  self . _frame_reader . get ( frame_id ,  pix_fmt = " rgb24 " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        fr_time  =  time . time ( )  -  s1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  fr_time  >  0.05 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          print ( " FRAME( %d ) LAG --  %.2f  ms "  %  ( frame_id ,  fr_time * 1000.0 ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  img  is  not  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          img  =  img [ : ,  : ,  : : - 1 ]  # Convert RGB to BGR, which is what the camera outputs 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          img  =  img . flatten ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          smsg . frame . image  =  img . tobytes ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      data_socket . send_pyobj ( ( cookie ,  typ ,  msg . logMonoTime ,  route_time ) ,  flags = zmq . SNDMORE ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      data_socket . send ( smsg . to_bytes ( ) ,  copy = False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  _process_commands ( self ,  cmd ,  route ,  pub_types ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    seek_to  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  route  is  None  or  ( isinstance ( cmd ,  SetRoute )  and  route . name  !=  cmd . name ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      seek_to  =  cmd . start_time 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      route  =  Route ( cmd . name ,  cmd . data_dir ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . _lr  =  MultiLogIterator ( route . log_paths ( ) ,  wraparound = True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  self . _frame_reader  is  not  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . _frame_reader . close ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      if  " frame "  in  pub_types  or  " encodeIdx "  in  pub_types : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # reset frames for a route 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . _frame_id_lookup  =  { } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . _frame_reader  =  RouteFrameReader ( 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          route . camera_paths ( ) ,  None ,  self . _frame_id_lookup ,  readahead = True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # always reset this on a seek 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  isinstance ( cmd ,  SeekRelativeTime ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      seek_to  =  self . _lr . tell ( )  +  cmd . secs 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    elif  isinstance ( cmd ,  SeekAbsoluteTime ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      seek_to  =  cmd . secs 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    elif  isinstance ( cmd ,  StopAndQuit ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      exit ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  seek_to  is  not  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      print ( " seeking " ,  seek_to ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  not  self . _lr . seek ( seek_to ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        print ( " Can ' t seek: time out of bounds " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        next ( self . _lr )    # ignore one 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  route 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  _get_address_send_func ( address ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  sock  =  pub_sock ( address ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  sock . send 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  unlogger_thread ( command_address ,  forward_commands_address ,  data_address ,  run_realtime , 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                    address_mapping ,  publish_time_length ,  bind_early ,  no_loop ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # Clear context to avoid problems with multiprocessing. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  zmq . Context . _instance  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  context  =  zmq . Context . instance ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  command_sock  =  context . socket ( zmq . PULL ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  command_sock . bind ( command_address ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  forward_commands_socket  =  context . socket ( zmq . PUSH ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  forward_commands_socket . bind ( forward_commands_address ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  data_socket  =  context . socket ( zmq . PULL ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  data_socket . bind ( data_address ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # Set readahead to a reasonable number. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  data_socket . setsockopt ( zmq . RCVHWM ,  10000 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  poller  =  zmq . Poller ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  poller . register ( command_sock ,  zmq . POLLIN ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  poller . register ( data_socket ,  zmq . POLLIN ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  bind_early : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    send_funcs  =  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      typ :  _get_address_send_func ( address ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  typ ,  address  in  address_mapping . items ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # Give subscribers a chance to connect. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    time . sleep ( 0.1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    send_funcs  =  { } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  start_time  =  float ( " inf " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  printed_at  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  generation  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  paused  =  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  reset_time  =  True 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  prev_msg_time  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  while  True : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    evts  =  dict ( poller . poll ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  command_sock  in  evts : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cmd  =  command_sock . recv_pyobj ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  isinstance ( cmd ,  TogglePause ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        paused  =  not  paused 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  paused : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          poller . modify ( data_socket ,  0 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          poller . modify ( data_socket ,  zmq . POLLIN ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # Forward the command the the log data thread. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # TODO: Remove everything on data_socket. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        generation  + =  1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        forward_commands_socket . send_pyobj ( ( generation ,  cmd ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  isinstance ( cmd ,  StopAndQuit ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          return 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      reset_time  =  True 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    elif  data_socket  in  evts : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      msg_generation ,  typ ,  msg_time ,  route_time  =  data_socket . recv_pyobj ( flags = zmq . RCVMORE ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      msg_bytes  =  data_socket . recv ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  msg_generation  <  generation : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # Skip packets. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  no_loop  and  prev_msg_time  is  not  None  and  prev_msg_time  >  msg_time  +  1e9 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        generation  + =  1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        forward_commands_socket . send_pyobj ( ( generation ,  StopAndQuit ( ) ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        return 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      prev_msg_time  =  msg_time 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      msg_time_seconds  =  msg_time  *  1e-9 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  reset_time : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        msg_start_time  =  msg_time_seconds 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        real_start_time  =  realtime . sec_since_boot ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        start_time  =  min ( start_time ,  msg_start_time ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        reset_time  =  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  publish_time_length  and  msg_time_seconds  -  start_time  >  publish_time_length : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        generation  + =  1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        forward_commands_socket . send_pyobj ( ( generation ,  StopAndQuit ( ) ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        return 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # Print time. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  abs ( printed_at  -  route_time )  >  5. : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        print ( " at " ,  route_time ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        printed_at  =  route_time 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  typ  not  in  send_funcs : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  typ  in  address_mapping : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          # Remove so we don't keep printing warnings. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          address  =  address_mapping . pop ( typ ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            print ( " binding " ,  typ ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            send_funcs [ typ ]  =  _get_address_send_func ( address ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          except  Exception  as  e : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            print ( " couldn ' t replay  {} :  {} " . format ( typ ,  e ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          # Skip messages that we are not registered to publish. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # Sleep as needed for real time playback. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  run_realtime : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        msg_time_offset  =  msg_time_seconds  -  msg_start_time 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        real_time_offset  =  realtime . sec_since_boot ( )  -  real_start_time 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        lag  =  msg_time_offset  -  real_time_offset 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  lag  >  0  and  lag  <  30 :  # a large jump is OK, likely due to an out of order segment 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          if  lag  >  1 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            print ( " sleeping for " ,  lag ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          time . sleep ( lag ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        elif  lag  <  - 1 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          # Relax the real time schedule when we slip far behind. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          reset_time  =  True 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # Send message. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        send_funcs [ typ ] ( msg_bytes ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      except  MultiplePublishersError : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        del  send_funcs [ typ ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  timestamp_to_s ( tss ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  time . mktime ( datetime . strptime ( tss ,  ' % Y- % m- %d -- % H- % M- % S ' ) . timetuple ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  absolute_time_str ( s ,  start_time ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # first try if it's a float 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  float ( s ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  except  ValueError : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # now see if it's a timestamp 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  timestamp_to_s ( s )  -  start_time 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  _get_address_mapping ( args ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  args . min  is  not  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    services_to_mock  =  [ 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      ' thermal ' ,  ' can ' ,  ' health ' ,  ' sensorEvents ' ,  ' gpsNMEA ' ,  ' frame ' ,  ' encodeIdx ' , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      ' model ' ,  ' features ' ,  ' liveLocation ' ,  ' gpsLocation ' 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  elif  args . enabled  is  not  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    services_to_mock  =  args . enabled 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    services_to_mock  =  service_list . keys ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  address_mapping  =  { service_name :  service_name  for  service_name  in  services_to_mock } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  address_mapping . update ( dict ( args . address_mapping ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  for  k  in  args . disabled : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    address_mapping . pop ( k ,  None ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  non_services  =  set ( address_mapping )  -  set ( service_list ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  non_services : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    print ( " WARNING: Unknown services  {} " . format ( list ( non_services ) ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  address_mapping 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  keyboard_controller_thread ( q ,  route_start_time ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  print ( " keyboard waiting for input " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  kb  =  KBHit ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  while  1 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    c  =  kb . getch ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  c == ' m ' :  # Move forward by 1m 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      q . send_pyobj ( SeekRelativeTime ( 60 ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    elif  c == ' M ' :  # Move backward by 1m 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      q . send_pyobj ( SeekRelativeTime ( - 60 ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    elif  c == ' s ' :  # Move forward by 10s 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      q . send_pyobj ( SeekRelativeTime ( 10 ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    elif  c == ' S ' :  # Move backward by 10s 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      q . send_pyobj ( SeekRelativeTime ( - 10 ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    elif  c == ' G ' :  # Move backward by 10s 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      q . send_pyobj ( SeekAbsoluteTime ( 0. ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    elif  c == " \x20 " :  # Space bar. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      q . send_pyobj ( TogglePause ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    elif  c == " \n " : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        seek_time_input  =  raw_input ( ' time:  ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        seek_time  =  absolute_time_str ( seek_time_input ,  route_start_time ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        q . send_pyobj ( SeekAbsoluteTime ( seek_time ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      except  Exception  as  e : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        print ( " Time not understood:  {} " . format ( e ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  get_arg_parser ( ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  parser  =  argparse . ArgumentParser ( 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    description = " Mock openpilot components by publishing logged messages. " , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    formatter_class = argparse . ArgumentDefaultsHelpFormatter ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  parser . add_argument ( " route_name " ,  type = ( lambda  x :  x . replace ( " # " ,  " | " ) ) ,  nargs = " ? " , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                      help = " The route whose messages will be published. " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  parser . add_argument ( " data_dir " ,  nargs = ' ? ' ,  default = os . getenv ( ' UNLOGGER_DATA_DIR ' ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										      help = " Path to directory in which log and camera files are located. " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  parser . add_argument ( " --no-loop " ,  action = " store_true " ,  help = " Stop at the end of the replay. " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  key_value_pair  =  lambda  x :  x . split ( " = " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  parser . add_argument ( " address_mapping " ,  nargs = " * " ,  type = key_value_pair , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      help = " Pairs <service>=<zmq_addr> to publish <service> on <zmq_addr>. " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  comma_list  =  lambda  x :  x . split ( " , " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  to_mock_group  =  parser . add_mutually_exclusive_group ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  to_mock_group . add_argument ( " --min " ,  action = " store_true " ,  default = os . getenv ( " MIN " ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  to_mock_group . add_argument ( " --enabled " ,  default = os . getenv ( " ENABLED " ) ,  type = comma_list ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  parser . add_argument ( " --disabled " ,  type = comma_list ,  default = os . getenv ( " DISABLED " )  or  ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  parser . add_argument ( 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " --tl " ,  dest = " publish_time_length " ,  type = float ,  default = None , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    help = " Length of interval in event time for which messages should be published. " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  parser . add_argument ( 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " --no-realtime " ,  dest = " realtime " ,  action = " store_false " ,  default = True , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    help = " Publish messages as quickly as possible instead of realtime. " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  parser . add_argument ( 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " --no-interactive " ,  dest = " interactive " ,  action = " store_false " ,  default = True , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    help = " Disable interactivity. " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  parser . add_argument ( 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " --bind-early " ,  action = " store_true " ,  default = False , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    help = " Bind early to avoid dropping messages. " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  parser 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  main ( argv ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  args  =  get_arg_parser ( ) . parse_args ( sys . argv [ 1 : ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  command_address  =  " ipc:///tmp/ {} " . format ( uuid4 ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  forward_commands_address  =  " ipc:///tmp/ {} " . format ( uuid4 ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  data_address  =  " ipc:///tmp/ {} " . format ( uuid4 ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  address_mapping  =  _get_address_mapping ( args ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  command_sock  =  zmq . Context . instance ( ) . socket ( zmq . PUSH ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  command_sock . connect ( command_address ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  args . route_name  is  not  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    route_name_split  =  args . route_name . split ( " | " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  len ( route_name_split )  >  1 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      route_start_time  =  timestamp_to_s ( route_name_split [ 1 ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      route_start_time  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    command_sock . send_pyobj ( 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      SetRoute ( args . route_name ,  0 ,  args . data_dir ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    print ( " waiting for external command... " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    route_start_time  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  subprocesses  =  { } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    subprocesses [ " data " ]  =  Process ( 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      target = UnloggerWorker ( ) . run , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      args = ( forward_commands_address ,  data_address ,  address_mapping . copy ( ) ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    subprocesses [ " control " ]  =  Process ( 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      target = unlogger_thread , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      args = ( command_address ,  forward_commands_address ,  data_address ,  args . realtime , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            _get_address_mapping ( args ) ,  args . publish_time_length ,  args . bind_early ,  args . no_loop ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  p  in  subprocesses . values ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      p . daemon  =  True 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    subprocesses [ " data " ] . start ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    subprocesses [ " control " ] . start ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # Exit if any of the children die. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    def  exit_if_children_dead ( * _ ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  name ,  p  in  subprocesses . items ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  not  p . is_alive ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          [ p . terminate ( )  for  p  in  subprocesses . values ( ) ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          exit ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      signal . signal ( signal . SIGCHLD ,  signal . SIGIGN ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    signal . signal ( signal . SIGCHLD ,  exit_if_children_dead ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  args . interactive : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      keyboard_controller_thread ( command_sock ,  route_start_time ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # Wait forever for children. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      while  True : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        time . sleep ( 10000. ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  finally : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  p  in  subprocesses . values ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  p . is_alive ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          p . join ( 3. ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        except  TimeoutError : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          p . terminate ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								if  __name__  ==  " __main__ " : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  sys . exit ( main ( sys . argv [ 1 : ] ) )