#!/usr/bin/env python3 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								import  argparse 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								import  os 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								import  sys 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								import  zmq 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								import  time 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								import  signal 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								import  multiprocessing 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  uuid  import  uuid4 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  collections  import  namedtuple 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  collections  import  deque 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  datetime  import  datetime 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								# strat 1: script to copy files 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								# strat 2: build pip packages around these 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								# could be its own pip package, which we'd need to build and release 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  cereal  import  log  as  capnp_log 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  cereal . services  import  service_list 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  cereal . messaging  import  pub_sock ,  MultiplePublishersError 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  common  import  realtime 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  tools . lib . kbhit  import  KBHit 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  tools . lib . logreader  import  MultiLogIterator 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  tools . lib . route  import  Route 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  tools . lib . route_framereader  import  RouteFrameReader 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								# Commands. 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								SetRoute  =  namedtuple ( " SetRoute " ,  ( " name " ,  " start_time " ,  " data_dir " ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								SeekAbsoluteTime  =  namedtuple ( " SeekAbsoluteTime " ,  ( " secs " , ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								SeekRelativeTime  =  namedtuple ( " SeekRelativeTime " ,  ( " secs " , ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								TogglePause  =  namedtuple ( " TogglePause " ,  ( ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								StopAndQuit  =  namedtuple ( " StopAndQuit " ,  ( ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								class  UnloggerWorker ( object ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  def  __init__ ( self ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . _frame_reader  =  None 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . _cookie  =  None 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    self . _readahead  =  deque ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  def  run ( self ,  commands_address ,  data_address ,  pub_types ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    zmq . Context . _instance  =  None 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    commands_socket  =  zmq . Context . instance ( ) . socket ( zmq . PULL ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    commands_socket . connect ( commands_address ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    data_socket  =  zmq . Context . instance ( ) . socket ( zmq . PUSH ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    data_socket . connect ( data_address ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    poller  =  zmq . Poller ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    poller . register ( commands_socket ,  zmq . POLLIN ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # We can't publish frames without encodeIdx, so add when it's missing. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  " frame "  in  pub_types : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      pub_types [ " encodeIdx " ]  =  None 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # gc.set_debug(gc.DEBUG_LEAK | gc.DEBUG_OBJECTS | gc.DEBUG_STATS | gc.DEBUG_SAVEALL | 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # gc.DEBUG_UNCOLLECTABLE) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # TODO: WARNING pycapnp leaks memory all over the place after unlogger runs for a while, gc 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # pauses become huge because there are so many tracked objects solution will be to switch to new 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # cython capnp 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    try : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      route  =  None 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      while  True : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        while  poller . poll ( 0. )  or  route  is  None : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          cookie ,  cmd  =  commands_socket . recv_pyobj ( ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								          route  =  self . _process_commands ( cmd ,  route ,  pub_types ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        # **** get message **** 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        self . _read_logs ( cookie ,  pub_types ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        self . _send_logs ( data_socket ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    finally : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  self . _frame_reader  is  not  None : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        self . _frame_reader . close ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      data_socket . close ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      commands_socket . close ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  def  _read_logs ( self ,  cookie ,  pub_types ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    fullHEVC  =  capnp_log . EncodeIndex . Type . fullHEVC 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    lr  =  self . _lr 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    while  len ( self . _readahead )  <  1000 : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      route_time  =  lr . tell ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      msg  =  next ( lr ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      typ  =  msg . which ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  typ  not  in  pub_types : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        continue 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      # **** special case certain message types **** 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  typ  ==  " encodeIdx "  and  msg . encodeIdx . type  ==  fullHEVC : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        # this assumes the encodeIdx always comes before the frame 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        self . _frame_id_lookup [ 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          msg . encodeIdx . frameId ]  =  msg . encodeIdx . segmentNum ,  msg . encodeIdx . segmentId 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        #print "encode", msg.encodeIdx.frameId, len(self._readahead), route_time 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      self . _readahead . appendleft ( ( typ ,  msg ,  route_time ,  cookie ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  def  _send_logs ( self ,  data_socket ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    while  len ( self . _readahead )  >  500 : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      typ ,  msg ,  route_time ,  cookie  =  self . _readahead . pop ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      smsg  =  msg . as_builder ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  typ  ==  " frame " : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        frame_id  =  msg . frame . frameId 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        # Frame exists, make sure we have a framereader. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        # load the frame readers as needed 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        s1  =  time . time ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        img  =  self . _frame_reader . get ( frame_id ,  pix_fmt = " rgb24 " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        fr_time  =  time . time ( )  -  s1 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        if  fr_time  >  0.05 : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          print ( " FRAME( %d ) LAG --  %.2f  ms "  %  ( frame_id ,  fr_time * 1000.0 ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        if  img  is  not  None : 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								          img  =  img [ : ,  : ,  : : - 1 ]   # Convert RGB to BGR, which is what the camera outputs 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          img  =  img . flatten ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          smsg . frame . image  =  img . tobytes ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      data_socket . send_pyobj ( ( cookie ,  typ ,  msg . logMonoTime ,  route_time ) ,  flags = zmq . SNDMORE ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      data_socket . send ( smsg . to_bytes ( ) ,  copy = False ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  def  _process_commands ( self ,  cmd ,  route ,  pub_types ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    seek_to  =  None 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  route  is  None  or  ( isinstance ( cmd ,  SetRoute )  and  route . name  !=  cmd . name ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      seek_to  =  cmd . start_time 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      route  =  Route ( cmd . name ,  cmd . data_dir ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      self . _lr  =  MultiLogIterator ( route . log_paths ( ) ,  wraparound = True ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  self . _frame_reader  is  not  None : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        self . _frame_reader . close ( ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								      if  " frame "  in  pub_types  or  " encodeIdx "  in  pub_types : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        # reset frames for a route 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        self . _frame_id_lookup  =  { } 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        self . _frame_reader  =  RouteFrameReader ( 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          route . camera_paths ( ) ,  None ,  self . _frame_id_lookup ,  readahead = True ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # always reset this on a seek 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  isinstance ( cmd ,  SeekRelativeTime ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      seek_to  =  self . _lr . tell ( )  +  cmd . secs 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    elif  isinstance ( cmd ,  SeekAbsoluteTime ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      seek_to  =  cmd . secs 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    elif  isinstance ( cmd ,  StopAndQuit ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      exit ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  seek_to  is  not  None : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      print ( " seeking " ,  seek_to ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  not  self . _lr . seek ( seek_to ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        print ( " Can ' t seek: time out of bounds " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      else : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        next ( self . _lr )    # ignore one 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    return  route 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  _get_address_send_func ( address ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  sock  =  pub_sock ( address ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  return  sock . send 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  unlogger_thread ( command_address ,  forward_commands_address ,  data_address ,  run_realtime , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								                    address_mapping ,  publish_time_length ,  bind_early ,  no_loop ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  # Clear context to avoid problems with multiprocessing. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  zmq . Context . _instance  =  None 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  context  =  zmq . Context . instance ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  command_sock  =  context . socket ( zmq . PULL ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  command_sock . bind ( command_address ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  forward_commands_socket  =  context . socket ( zmq . PUSH ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  forward_commands_socket . bind ( forward_commands_address ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  data_socket  =  context . socket ( zmq . PULL ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  data_socket . bind ( data_address ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  # Set readahead to a reasonable number. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  data_socket . setsockopt ( zmq . RCVHWM ,  10000 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  poller  =  zmq . Poller ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  poller . register ( command_sock ,  zmq . POLLIN ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  poller . register ( data_socket ,  zmq . POLLIN ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  if  bind_early : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    send_funcs  =  { 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      typ :  _get_address_send_func ( address ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      for  typ ,  address  in  address_mapping . items ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    } 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # Give subscribers a chance to connect. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    time . sleep ( 0.1 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  else : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    send_funcs  =  { } 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  start_time  =  float ( " inf " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  printed_at  =  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  generation  =  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  paused  =  False 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  reset_time  =  True 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  prev_msg_time  =  None 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  while  True : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    evts  =  dict ( poller . poll ( ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  command_sock  in  evts : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      cmd  =  command_sock . recv_pyobj ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  isinstance ( cmd ,  TogglePause ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        paused  =  not  paused 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        if  paused : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          poller . modify ( data_socket ,  0 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        else : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          poller . modify ( data_socket ,  zmq . POLLIN ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      else : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        # Forward the command the the log data thread. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        # TODO: Remove everything on data_socket. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        generation  + =  1 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        forward_commands_socket . send_pyobj ( ( generation ,  cmd ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        if  isinstance ( cmd ,  StopAndQuit ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          return 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      reset_time  =  True 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    elif  data_socket  in  evts : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      msg_generation ,  typ ,  msg_time ,  route_time  =  data_socket . recv_pyobj ( flags = zmq . RCVMORE ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      msg_bytes  =  data_socket . recv ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  msg_generation  <  generation : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        # Skip packets. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        continue 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  no_loop  and  prev_msg_time  is  not  None  and  prev_msg_time  >  msg_time  +  1e9 : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        generation  + =  1 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        forward_commands_socket . send_pyobj ( ( generation ,  StopAndQuit ( ) ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        return 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      prev_msg_time  =  msg_time 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      msg_time_seconds  =  msg_time  *  1e-9 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  reset_time : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        msg_start_time  =  msg_time_seconds 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        real_start_time  =  realtime . sec_since_boot ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        start_time  =  min ( start_time ,  msg_start_time ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        reset_time  =  False 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  publish_time_length  and  msg_time_seconds  -  start_time  >  publish_time_length : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        generation  + =  1 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        forward_commands_socket . send_pyobj ( ( generation ,  StopAndQuit ( ) ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        return 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      # Print time. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  abs ( printed_at  -  route_time )  >  5. : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        print ( " at " ,  route_time ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        printed_at  =  route_time 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  typ  not  in  send_funcs : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        if  typ  in  address_mapping : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          # Remove so we don't keep printing warnings. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          address  =  address_mapping . pop ( typ ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          try : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								            print ( " binding " ,  typ ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								            send_funcs [ typ ]  =  _get_address_send_func ( address ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          except  Exception  as  e : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								            print ( " couldn ' t replay  {} :  {} " . format ( typ ,  e ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								            continue 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        else : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          # Skip messages that we are not registered to publish. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          continue 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      # Sleep as needed for real time playback. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  run_realtime : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        msg_time_offset  =  msg_time_seconds  -  msg_start_time 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        real_time_offset  =  realtime . sec_since_boot ( )  -  real_start_time 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        lag  =  msg_time_offset  -  real_time_offset 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								        if  lag  >  0  and  lag  <  30 :   # a large jump is OK, likely due to an out of order segment 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          if  lag  >  1 : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								            print ( " sleeping for " ,  lag ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          time . sleep ( lag ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        elif  lag  <  - 1 : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          # Relax the real time schedule when we slip far behind. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          reset_time  =  True 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      # Send message. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      try : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        send_funcs [ typ ] ( msg_bytes ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      except  MultiplePublishersError : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        del  send_funcs [ typ ] 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  timestamp_to_s ( tss ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  return  time . mktime ( datetime . strptime ( tss ,  ' % Y- % m- %d -- % H- % M- % S ' ) . timetuple ( ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  absolute_time_str ( s ,  start_time ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  try : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # first try if it's a float 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    return  float ( s ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  except  ValueError : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # now see if it's a timestamp 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    return  timestamp_to_s ( s )  -  start_time 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  _get_address_mapping ( args ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  if  args . min  is  not  None : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    services_to_mock  =  [ 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      ' thermal ' ,  ' can ' ,  ' health ' ,  ' sensorEvents ' ,  ' gpsNMEA ' ,  ' frame ' ,  ' encodeIdx ' , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      ' model ' ,  ' features ' ,  ' liveLocation ' ,  ' gpsLocation ' 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    ] 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  elif  args . enabled  is  not  None : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    services_to_mock  =  args . enabled 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  else : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    services_to_mock  =  service_list . keys ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  address_mapping  =  { service_name :  service_name  for  service_name  in  services_to_mock } 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  address_mapping . update ( dict ( args . address_mapping ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  for  k  in  args . disabled : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    address_mapping . pop ( k ,  None ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  non_services  =  set ( address_mapping )  -  set ( service_list ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  if  non_services : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    print ( " WARNING: Unknown services  {} " . format ( list ( non_services ) ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  return  address_mapping 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  keyboard_controller_thread ( q ,  route_start_time ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  print ( " keyboard waiting for input " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  kb  =  KBHit ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  while  1 : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    c  =  kb . getch ( ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    if  c  ==  ' m ' :   # Move forward by 1m 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      q . send_pyobj ( SeekRelativeTime ( 60 ) ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    elif  c  ==  ' M ' :   # Move backward by 1m 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      q . send_pyobj ( SeekRelativeTime ( - 60 ) ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    elif  c  ==  ' s ' :   # Move forward by 10s 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      q . send_pyobj ( SeekRelativeTime ( 10 ) ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    elif  c  ==  ' S ' :   # Move backward by 10s 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      q . send_pyobj ( SeekRelativeTime ( - 10 ) ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    elif  c  ==  ' G ' :   # Move backward by 10s 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      q . send_pyobj ( SeekAbsoluteTime ( 0. ) ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    elif  c  ==  " \x20 " :   # Space bar. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      q . send_pyobj ( TogglePause ( ) ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    elif  c  ==  " \n " : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      try : 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								        seek_time_input  =  input ( ' time:  ' ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        seek_time  =  absolute_time_str ( seek_time_input ,  route_start_time ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        q . send_pyobj ( SeekAbsoluteTime ( seek_time ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      except  Exception  as  e : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        print ( " Time not understood:  {} " . format ( e ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  get_arg_parser ( ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser  =  argparse . ArgumentParser ( 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    description = " Mock openpilot components by publishing logged messages. " , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    formatter_class = argparse . ArgumentDefaultsHelpFormatter ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser . add_argument ( " route_name " ,  type = ( lambda  x :  x . replace ( " # " ,  " | " ) ) ,  nargs = " ? " , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								                      help = " The route whose messages will be published. " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser . add_argument ( " data_dir " ,  nargs = ' ? ' ,  default = os . getenv ( ' UNLOGGER_DATA_DIR ' ) , 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								          help = " Path to directory in which log and camera files are located. " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser . add_argument ( " --no-loop " ,  action = " store_true " ,  help = " Stop at the end of the replay. " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  def  key_value_pair ( x ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    return  x . split ( " = " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser . add_argument ( " address_mapping " ,  nargs = " * " ,  type = key_value_pair , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      help = " Pairs <service>=<zmq_addr> to publish <service> on <zmq_addr>. " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  def  comma_list ( x ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    return  x . split ( " , " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  to_mock_group  =  parser . add_mutually_exclusive_group ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  to_mock_group . add_argument ( " --min " ,  action = " store_true " ,  default = os . getenv ( " MIN " ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  to_mock_group . add_argument ( " --enabled " ,  default = os . getenv ( " ENABLED " ) ,  type = comma_list ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser . add_argument ( " --disabled " ,  type = comma_list ,  default = os . getenv ( " DISABLED " )  or  ( ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser . add_argument ( 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    " --tl " ,  dest = " publish_time_length " ,  type = float ,  default = None , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    help = " Length of interval in event time for which messages should be published. " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser . add_argument ( 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    " --no-realtime " ,  dest = " realtime " ,  action = " store_false " ,  default = True , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    help = " Publish messages as quickly as possible instead of realtime. " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser . add_argument ( 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    " --no-interactive " ,  dest = " interactive " ,  action = " store_false " ,  default = True , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    help = " Disable interactivity. " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  parser . add_argument ( 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    " --bind-early " ,  action = " store_true " ,  default = False , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    help = " Bind early to avoid dropping messages. " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  return  parser 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  main ( argv ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  args  =  get_arg_parser ( ) . parse_args ( sys . argv [ 1 : ] ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  command_address  =  " ipc:///tmp/ {} " . format ( uuid4 ( ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  forward_commands_address  =  " ipc:///tmp/ {} " . format ( uuid4 ( ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  data_address  =  " ipc:///tmp/ {} " . format ( uuid4 ( ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  address_mapping  =  _get_address_mapping ( args ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  command_sock  =  zmq . Context . instance ( ) . socket ( zmq . PUSH ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  command_sock . connect ( command_address ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  if  args . route_name  is  not  None : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    route_name_split  =  args . route_name . split ( " | " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  len ( route_name_split )  >  1 : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      route_start_time  =  timestamp_to_s ( route_name_split [ 1 ] ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    else : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      route_start_time  =  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    command_sock . send_pyobj ( 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      SetRoute ( args . route_name ,  0 ,  args . data_dir ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  else : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    print ( " waiting for external command... " ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    route_start_time  =  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  subprocesses  =  { } 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  try : 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    subprocesses [ " data " ]  =  multiprocessing . Process ( 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      target = UnloggerWorker ( ) . run , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      args = ( forward_commands_address ,  data_address ,  address_mapping . copy ( ) ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								    subprocesses [ " control " ]  =  multiprocessing . Process ( 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      target = unlogger_thread , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      args = ( command_address ,  forward_commands_address ,  data_address ,  args . realtime , 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								            _get_address_mapping ( args ) ,  args . publish_time_length ,  args . bind_early ,  args . no_loop ) ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    subprocesses [ " data " ] . start ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    subprocesses [ " control " ] . start ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    # Exit if any of the children die. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    def  exit_if_children_dead ( * _ ) : 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								      for  _ ,  p  in  subprocesses . items ( ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        if  not  p . is_alive ( ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          [ p . terminate ( )  for  p  in  subprocesses . values ( ) ] 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          exit ( ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								      signal . signal ( signal . SIGCHLD ,  signal . SIG_IGN ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    signal . signal ( signal . SIGCHLD ,  exit_if_children_dead ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    if  args . interactive : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      keyboard_controller_thread ( command_sock ,  route_start_time ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    else : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      # Wait forever for children. 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      while  True : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        time . sleep ( 10000. ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  finally : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    for  p  in  subprocesses . values ( ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  p . is_alive ( ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        try : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          p . join ( 3. ) 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								        except  multiprocessing . TimeoutError : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          p . terminate ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								          continue 
 
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								  return  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								if  __name__  ==  " __main__ " : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  sys . exit ( main ( sys . argv [ 1 : ] ) )