#!/usr/bin/env python3 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								import  argparse 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								import  os 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								import  sys 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								import  zmq 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								import  time 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								import  signal 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  uuid  import  uuid4 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  collections  import  namedtuple 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  collections  import  deque 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  multiprocessing  import  Process ,  TimeoutError 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  datetime  import  datetime 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								# strat 1: script to copy files 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								# strat 2: build pip packages around these 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								# could be its own pip package, which we'd need to build and release 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  cereal  import  log  as  capnp_log 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  cereal . services  import  service_list 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  cereal . messaging  import  pub_sock ,  MultiplePublishersError 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  common  import  realtime 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  tools . lib . kbhit  import  KBHit 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  tools . lib . logreader  import  MultiLogIterator 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  tools . lib . route  import  Route 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  tools . lib . route_framereader  import  RouteFrameReader 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								# Commands. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								SetRoute  =  namedtuple ( " SetRoute " ,  ( " name " ,  " start_time " ,  " data_dir " ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								SeekAbsoluteTime  =  namedtuple ( " SeekAbsoluteTime " ,  ( " secs " , ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								SeekRelativeTime  =  namedtuple ( " SeekRelativeTime " ,  ( " secs " , ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								TogglePause  =  namedtuple ( " TogglePause " ,  ( ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								StopAndQuit  =  namedtuple ( " StopAndQuit " ,  ( ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								class  UnloggerWorker ( object ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  def  __init__ ( self ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . _frame_reader  =  None 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . _cookie  =  None 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . _readahead  =  deque ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  def  run ( self ,  commands_address ,  data_address ,  pub_types ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    zmq . Context . _instance  =  None 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    commands_socket  =  zmq . Context . instance ( ) . socket ( zmq . PULL ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    commands_socket . connect ( commands_address ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    data_socket  =  zmq . Context . instance ( ) . socket ( zmq . PUSH ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    data_socket . connect ( data_address ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    poller  =  zmq . Poller ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    poller . register ( commands_socket ,  zmq . POLLIN ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # We can't publish frames without encodeIdx, so add when it's missing. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  " frame "  in  pub_types : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      pub_types [ " encodeIdx " ]  =  None 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # gc.set_debug(gc.DEBUG_LEAK | gc.DEBUG_OBJECTS | gc.DEBUG_STATS | gc.DEBUG_SAVEALL | 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # gc.DEBUG_UNCOLLECTABLE) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # TODO: WARNING pycapnp leaks memory all over the place after unlogger runs for a while, gc 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # pauses become huge because there are so many tracked objects solution will be to switch to new 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # cython capnp 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    try : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      route  =  None 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      while  True : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        while  poller . poll ( 0. )  or  route  is  None : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          cookie ,  cmd  =  commands_socket . recv_pyobj ( ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								          route  =  self . _process_commands ( cmd ,  route ,  pub_types ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        # **** get message **** 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        self . _read_logs ( cookie ,  pub_types ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        self . _send_logs ( data_socket ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    finally : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  self . _frame_reader  is  not  None : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        self . _frame_reader . close ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      data_socket . close ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      commands_socket . close ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  def  _read_logs ( self ,  cookie ,  pub_types ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    fullHEVC  =  capnp_log . EncodeIndex . Type . fullHEVC 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    lr  =  self . _lr 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    while  len ( self . _readahead )  <  1000 : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      route_time  =  lr . tell ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      msg  =  next ( lr ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      typ  =  msg . which ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  typ  not  in  pub_types : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        continue 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      # **** special case certain message types **** 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  typ  ==  " encodeIdx "  and  msg . encodeIdx . type  ==  fullHEVC : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        # this assumes the encodeIdx always comes before the frame 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        self . _frame_id_lookup [ 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          msg . encodeIdx . frameId ]  =  msg . encodeIdx . segmentNum ,  msg . encodeIdx . segmentId 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        #print "encode", msg.encodeIdx.frameId, len(self._readahead), route_time 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      self . _readahead . appendleft ( ( typ ,  msg ,  route_time ,  cookie ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  def  _send_logs ( self ,  data_socket ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    while  len ( self . _readahead )  >  500 : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      typ ,  msg ,  route_time ,  cookie  =  self . _readahead . pop ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      smsg  =  msg . as_builder ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  typ  ==  " frame " : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        frame_id  =  msg . frame . frameId 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        # Frame exists, make sure we have a framereader. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        # load the frame readers as needed 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        s1  =  time . time ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        img  =  self . _frame_reader . get ( frame_id ,  pix_fmt = " rgb24 " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        fr_time  =  time . time ( )  -  s1 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        if  fr_time  >  0.05 : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          print ( " FRAME( %d ) LAG --  %.2f  ms "  %  ( frame_id ,  fr_time * 1000.0 ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        if  img  is  not  None : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          img  =  img [ : ,  : ,  : : - 1 ]  # Convert RGB to BGR, which is what the camera outputs 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          img  =  img . flatten ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          smsg . frame . image  =  img . tobytes ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      data_socket . send_pyobj ( ( cookie ,  typ ,  msg . logMonoTime ,  route_time ) ,  flags = zmq . SNDMORE ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      data_socket . send ( smsg . to_bytes ( ) ,  copy = False ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  def  _process_commands ( self ,  cmd ,  route ,  pub_types ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    seek_to  =  None 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  route  is  None  or  ( isinstance ( cmd ,  SetRoute )  and  route . name  !=  cmd . name ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      seek_to  =  cmd . start_time 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      route  =  Route ( cmd . name ,  cmd . data_dir ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      self . _lr  =  MultiLogIterator ( route . log_paths ( ) ,  wraparound = True ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  self . _frame_reader  is  not  None : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        self . _frame_reader . close ( ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								      if  " frame "  in  pub_types  or  " encodeIdx "  in  pub_types : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        # reset frames for a route 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        self . _frame_id_lookup  =  { } 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        self . _frame_reader  =  RouteFrameReader ( 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          route . camera_paths ( ) ,  None ,  self . _frame_id_lookup ,  readahead = True ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # always reset this on a seek 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  isinstance ( cmd ,  SeekRelativeTime ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      seek_to  =  self . _lr . tell ( )  +  cmd . secs 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    elif  isinstance ( cmd ,  SeekAbsoluteTime ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      seek_to  =  cmd . secs 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    elif  isinstance ( cmd ,  StopAndQuit ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      exit ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  seek_to  is  not  None : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      print ( " seeking " ,  seek_to ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  not  self . _lr . seek ( seek_to ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        print ( " Can ' t seek: time out of bounds " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      else : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        next ( self . _lr )    # ignore one 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    return  route 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  _get_address_send_func ( address ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  sock  =  pub_sock ( address ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  return  sock . send 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  unlogger_thread ( command_address ,  forward_commands_address ,  data_address ,  run_realtime , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								                    address_mapping ,  publish_time_length ,  bind_early ,  no_loop ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  # Clear context to avoid problems with multiprocessing. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  zmq . Context . _instance  =  None 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  context  =  zmq . Context . instance ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  command_sock  =  context . socket ( zmq . PULL ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  command_sock . bind ( command_address ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  forward_commands_socket  =  context . socket ( zmq . PUSH ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  forward_commands_socket . bind ( forward_commands_address ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  data_socket  =  context . socket ( zmq . PULL ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  data_socket . bind ( data_address ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  # Set readahead to a reasonable number. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  data_socket . setsockopt ( zmq . RCVHWM ,  10000 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  poller  =  zmq . Poller ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  poller . register ( command_sock ,  zmq . POLLIN ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  poller . register ( data_socket ,  zmq . POLLIN ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  if  bind_early : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    send_funcs  =  { 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      typ :  _get_address_send_func ( address ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      for  typ ,  address  in  address_mapping . items ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    } 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # Give subscribers a chance to connect. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    time . sleep ( 0.1 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  else : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    send_funcs  =  { } 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  start_time  =  float ( " inf " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  printed_at  =  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  generation  =  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  paused  =  False 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  reset_time  =  True 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  prev_msg_time  =  None 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  while  True : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    evts  =  dict ( poller . poll ( ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  command_sock  in  evts : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      cmd  =  command_sock . recv_pyobj ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  isinstance ( cmd ,  TogglePause ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        paused  =  not  paused 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        if  paused : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          poller . modify ( data_socket ,  0 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        else : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          poller . modify ( data_socket ,  zmq . POLLIN ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      else : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        # Forward the command the the log data thread. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        # TODO: Remove everything on data_socket. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        generation  + =  1 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        forward_commands_socket . send_pyobj ( ( generation ,  cmd ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        if  isinstance ( cmd ,  StopAndQuit ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          return 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      reset_time  =  True 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    elif  data_socket  in  evts : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      msg_generation ,  typ ,  msg_time ,  route_time  =  data_socket . recv_pyobj ( flags = zmq . RCVMORE ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      msg_bytes  =  data_socket . recv ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  msg_generation  <  generation : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        # Skip packets. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        continue 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  no_loop  and  prev_msg_time  is  not  None  and  prev_msg_time  >  msg_time  +  1e9 : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        generation  + =  1 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        forward_commands_socket . send_pyobj ( ( generation ,  StopAndQuit ( ) ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        return 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      prev_msg_time  =  msg_time 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      msg_time_seconds  =  msg_time  *  1e-9 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  reset_time : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        msg_start_time  =  msg_time_seconds 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        real_start_time  =  realtime . sec_since_boot ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        start_time  =  min ( start_time ,  msg_start_time ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        reset_time  =  False 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  publish_time_length  and  msg_time_seconds  -  start_time  >  publish_time_length : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        generation  + =  1 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        forward_commands_socket . send_pyobj ( ( generation ,  StopAndQuit ( ) ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        return 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      # Print time. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  abs ( printed_at  -  route_time )  >  5. : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        print ( " at " ,  route_time ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        printed_at  =  route_time 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  typ  not  in  send_funcs : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        if  typ  in  address_mapping : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          # Remove so we don't keep printing warnings. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          address  =  address_mapping . pop ( typ ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          try : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								            print ( " binding " ,  typ ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								            send_funcs [ typ ]  =  _get_address_send_func ( address ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          except  Exception  as  e : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								            print ( " couldn ' t replay  {} :  {} " . format ( typ ,  e ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								            continue 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        else : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          # Skip messages that we are not registered to publish. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          continue 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      # Sleep as needed for real time playback. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  run_realtime : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        msg_time_offset  =  msg_time_seconds  -  msg_start_time 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        real_time_offset  =  realtime . sec_since_boot ( )  -  real_start_time 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        lag  =  msg_time_offset  -  real_time_offset 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        if  lag  >  0  and  lag  <  30 :  # a large jump is OK, likely due to an out of order segment 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          if  lag  >  1 : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								            print ( " sleeping for " ,  lag ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          time . sleep ( lag ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        elif  lag  <  - 1 : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          # Relax the real time schedule when we slip far behind. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          reset_time  =  True 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      # Send message. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      try : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        send_funcs [ typ ] ( msg_bytes ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      except  MultiplePublishersError : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        del  send_funcs [ typ ] 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  timestamp_to_s ( tss ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  return  time . mktime ( datetime . strptime ( tss ,  ' % Y- % m- %d -- % H- % M- % S ' ) . timetuple ( ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  absolute_time_str ( s ,  start_time ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  try : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # first try if it's a float 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    return  float ( s ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  except  ValueError : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # now see if it's a timestamp 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    return  timestamp_to_s ( s )  -  start_time 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  _get_address_mapping ( args ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  if  args . min  is  not  None : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    services_to_mock  =  [ 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      ' thermal ' ,  ' can ' ,  ' health ' ,  ' sensorEvents ' ,  ' gpsNMEA ' ,  ' frame ' ,  ' encodeIdx ' , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      ' model ' ,  ' features ' ,  ' liveLocation ' ,  ' gpsLocation ' 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    ] 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  elif  args . enabled  is  not  None : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    services_to_mock  =  args . enabled 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  else : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    services_to_mock  =  service_list . keys ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  address_mapping  =  { service_name :  service_name  for  service_name  in  services_to_mock } 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  address_mapping . update ( dict ( args . address_mapping ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  for  k  in  args . disabled : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    address_mapping . pop ( k ,  None ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  non_services  =  set ( address_mapping )  -  set ( service_list ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  if  non_services : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    print ( " WARNING: Unknown services  {} " . format ( list ( non_services ) ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  return  address_mapping 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  keyboard_controller_thread ( q ,  route_start_time ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  print ( " keyboard waiting for input " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  kb  =  KBHit ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  while  1 : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    c  =  kb . getch ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  c == ' m ' :  # Move forward by 1m 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      q . send_pyobj ( SeekRelativeTime ( 60 ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    elif  c == ' M ' :  # Move backward by 1m 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      q . send_pyobj ( SeekRelativeTime ( - 60 ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    elif  c == ' s ' :  # Move forward by 10s 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      q . send_pyobj ( SeekRelativeTime ( 10 ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    elif  c == ' S ' :  # Move backward by 10s 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      q . send_pyobj ( SeekRelativeTime ( - 10 ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    elif  c == ' G ' :  # Move backward by 10s 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      q . send_pyobj ( SeekAbsoluteTime ( 0. ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    elif  c == " \x20 " :  # Space bar. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      q . send_pyobj ( TogglePause ( ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    elif  c == " \n " : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      try : 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								        seek_time_input  =  input ( ' time:  ' ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        seek_time  =  absolute_time_str ( seek_time_input ,  route_start_time ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        q . send_pyobj ( SeekAbsoluteTime ( seek_time ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      except  Exception  as  e : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        print ( " Time not understood:  {} " . format ( e ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  get_arg_parser ( ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser  =  argparse . ArgumentParser ( 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    description = " Mock openpilot components by publishing logged messages. " , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    formatter_class = argparse . ArgumentDefaultsHelpFormatter ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser . add_argument ( " route_name " ,  type = ( lambda  x :  x . replace ( " # " ,  " | " ) ) ,  nargs = " ? " , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								                      help = " The route whose messages will be published. " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser . add_argument ( " data_dir " ,  nargs = ' ? ' ,  default = os . getenv ( ' UNLOGGER_DATA_DIR ' ) , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										      help = " Path to directory in which log and camera files are located. " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser . add_argument ( " --no-loop " ,  action = " store_true " ,  help = " Stop at the end of the replay. " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  key_value_pair  =  lambda  x :  x . split ( " = " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser . add_argument ( " address_mapping " ,  nargs = " * " ,  type = key_value_pair , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      help = " Pairs <service>=<zmq_addr> to publish <service> on <zmq_addr>. " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  comma_list  =  lambda  x :  x . split ( " , " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  to_mock_group  =  parser . add_mutually_exclusive_group ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  to_mock_group . add_argument ( " --min " ,  action = " store_true " ,  default = os . getenv ( " MIN " ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  to_mock_group . add_argument ( " --enabled " ,  default = os . getenv ( " ENABLED " ) ,  type = comma_list ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser . add_argument ( " --disabled " ,  type = comma_list ,  default = os . getenv ( " DISABLED " )  or  ( ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser . add_argument ( 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    " --tl " ,  dest = " publish_time_length " ,  type = float ,  default = None , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    help = " Length of interval in event time for which messages should be published. " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser . add_argument ( 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    " --no-realtime " ,  dest = " realtime " ,  action = " store_false " ,  default = True , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    help = " Publish messages as quickly as possible instead of realtime. " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser . add_argument ( 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    " --no-interactive " ,  dest = " interactive " ,  action = " store_false " ,  default = True , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    help = " Disable interactivity. " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser . add_argument ( 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    " --bind-early " ,  action = " store_true " ,  default = False , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    help = " Bind early to avoid dropping messages. " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  return  parser 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  main ( argv ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  args  =  get_arg_parser ( ) . parse_args ( sys . argv [ 1 : ] ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  command_address  =  " ipc:///tmp/ {} " . format ( uuid4 ( ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  forward_commands_address  =  " ipc:///tmp/ {} " . format ( uuid4 ( ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  data_address  =  " ipc:///tmp/ {} " . format ( uuid4 ( ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  address_mapping  =  _get_address_mapping ( args ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  command_sock  =  zmq . Context . instance ( ) . socket ( zmq . PUSH ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  command_sock . connect ( command_address ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  if  args . route_name  is  not  None : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    route_name_split  =  args . route_name . split ( " | " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  len ( route_name_split )  >  1 : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      route_start_time  =  timestamp_to_s ( route_name_split [ 1 ] ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    else : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      route_start_time  =  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    command_sock . send_pyobj ( 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      SetRoute ( args . route_name ,  0 ,  args . data_dir ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  else : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    print ( " waiting for external command... " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    route_start_time  =  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  subprocesses  =  { } 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  try : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    subprocesses [ " data " ]  =  Process ( 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      target = UnloggerWorker ( ) . run , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      args = ( forward_commands_address ,  data_address ,  address_mapping . copy ( ) ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    subprocesses [ " control " ]  =  Process ( 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      target = unlogger_thread , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      args = ( command_address ,  forward_commands_address ,  data_address ,  args . realtime , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								            _get_address_mapping ( args ) ,  args . publish_time_length ,  args . bind_early ,  args . no_loop ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    for  p  in  subprocesses . values ( ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      p . daemon  =  True 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    subprocesses [ " data " ] . start ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    subprocesses [ " control " ] . start ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # Exit if any of the children die. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    def  exit_if_children_dead ( * _ ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      for  name ,  p  in  subprocesses . items ( ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        if  not  p . is_alive ( ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          [ p . terminate ( )  for  p  in  subprocesses . values ( ) ] 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          exit ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      signal . signal ( signal . SIGCHLD ,  signal . SIGIGN ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    signal . signal ( signal . SIGCHLD ,  exit_if_children_dead ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  args . interactive : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      keyboard_controller_thread ( command_sock ,  route_start_time ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    else : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      # Wait forever for children. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      while  True : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        time . sleep ( 10000. ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  finally : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    for  p  in  subprocesses . values ( ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  p . is_alive ( ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        try : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          p . join ( 3. ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        except  TimeoutError : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          p . terminate ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          continue 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  return  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								if  __name__  ==  " __main__ " : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  sys . exit ( main ( sys . argv [ 1 : ] ) )