@ -268,6 +268,19 @@ class ProcessContainer:
self . prefix . clean_dirs ( )
self . prefix . clean_dirs ( )
self . _clean_env ( )
self . _clean_env ( )
def get_output_msgs ( self , start_time : int ) :
assert self . rc and self . sockets
output_msgs = [ ]
self . rc . wait_for_recv_called ( )
for socket in self . sockets :
ms = messaging . drain_sock ( socket )
for m in ms :
m = m . as_builder ( )
m . logMonoTime = start_time + int ( self . cfg . processing_time * 1e9 )
output_msgs . append ( m . as_reader ( ) )
return output_msgs
def run_step ( self , msg : capnp . _DynamicStructReader , frs : dict [ str , FrameReader ] | None ) - > list [ capnp . _DynamicStructReader ] :
def run_step ( self , msg : capnp . _DynamicStructReader , frs : dict [ str , FrameReader ] | None ) - > list [ capnp . _DynamicStructReader ] :
assert self . rc and self . pm and self . sockets and self . process . proc
assert self . rc and self . pm and self . sockets and self . process . proc
@ -279,18 +292,19 @@ class ProcessContainer:
self . msg_queue . append ( msg )
self . msg_queue . append ( msg )
if end_of_cycle :
if end_of_cycle :
self . rc . wait_for_recv_called ( )
# call recv to let sub-sockets reconnect, after we know the process is ready
# call recv to let sub-sockets reconnect, after we know the process is ready
if self . cnt == 0 :
if self . cnt == 0 :
for s in self . sockets :
for s in self . sockets :
messaging . recv_one_or_none ( s )
messaging . recv_one_or_none ( s )
# empty recv on drained pub indicates the end of messages, only do that if there're any
# certain processes use drain_sock. need to cause empty recv to break from this loop
trigger_empty_recv = False
trigger_empty_recv = False
if self . cfg . main_pub and self . cfg . main_pub_drained :
if self . cfg . main_pub and self . cfg . main_pub_drained :
trigger_empty_recv = next ( ( True for m in self . msg_queue if m . which ( ) == self . cfg . main_pub ) , False )
trigger_empty_recv = next ( ( True for m in self . msg_queue if m . which ( ) == self . cfg . main_pub ) , False )
# get output msgs from previous inputs
output_msgs = self . get_output_msgs ( msg . logMonoTime )
for m in self . msg_queue :
for m in self . msg_queue :
self . pm . send ( m . which ( ) , m . as_builder ( ) )
self . pm . send ( m . which ( ) , m . as_builder ( ) )
# send frames if needed
# send frames if needed
@ -304,14 +318,8 @@ class ProcessContainer:
self . msg_queue = [ ]
self . msg_queue = [ ]
self . rc . unlock_sockets ( )
self . rc . unlock_sockets ( )
self . rc . wait_for_next_recv ( trigger_empty_recv )
if trigger_empty_recv :
self . rc . unlock_sockets ( )
for socket in self . sockets :
ms = messaging . drain_sock ( socket )
for m in ms :
m = m . as_builder ( )
m . logMonoTime = msg . logMonoTime + int ( self . cfg . processing_time * 1e9 )
output_msgs . append ( m . as_reader ( ) )
self . cnt + = 1
self . cnt + = 1
assert self . process . proc . is_alive ( )
assert self . process . proc . is_alive ( )
@ -740,6 +748,11 @@ def _replay_multi_process(
internal_pub_queue . append ( m )
internal_pub_queue . append ( m )
heapq . heappush ( internal_pub_index_heap , ( m . logMonoTime , len ( internal_pub_queue ) - 1 ) )
heapq . heappush ( internal_pub_index_heap , ( m . logMonoTime , len ( internal_pub_queue ) - 1 ) )
log_msgs . extend ( output_msgs )
log_msgs . extend ( output_msgs )
# flush last set of messages from each process
for container in containers :
last_time = log_msgs [ - 1 ] . logMonoTime if len ( log_msgs ) > 0 else int ( time . monotonic ( ) * 1e9 )
log_msgs . extend ( container . get_output_msgs ( last_time ) )
finally :
finally :
for container in containers :
for container in containers :
container . stop ( )
container . stop ( )