@ -3,6 +3,7 @@
import os
import os
import usb1
import usb1
import time
import time
import json
import subprocess
import subprocess
from typing import List , NoReturn
from typing import List , NoReturn
from functools import cmp_to_key
from functools import cmp_to_key
@ -23,6 +24,48 @@ def get_expected_signature(panda: Panda) -> bytes:
cloudlog . exception ( " Error computing expected signature " )
cloudlog . exception ( " Error computing expected signature " )
return b " "
return b " "
def read_panda_logs ( panda : Panda ) - > None :
"""
Forward panda logs to the cloud
"""
params = Params ( )
serial = panda . get_usb_serial ( )
log_state = { }
try :
l = json . loads ( params . get ( " PandaLogState " ) )
for k , v in l . items ( ) :
if isinstance ( k , str ) and isinstance ( v , int ) :
log_state [ k ] = v
except ( TypeError , json . JSONDecodeError ) :
cloudlog . exception ( " failed to parse PandaLogState " )
try :
if serial in log_state :
logs = panda . get_logs ( last_id = log_state [ serial ] )
else :
logs = panda . get_logs ( get_all = True )
# truncate logs to 100 entries if needed
MAX_LOGS = 100
if len ( logs ) > MAX_LOGS :
cloudlog . warning ( f " Panda { serial } has { len ( logs ) } logs, truncating to { MAX_LOGS } " )
logs = logs [ - MAX_LOGS : ]
# update log state
if len ( logs ) > 0 :
log_state [ serial ] = logs [ - 1 ] [ " id " ]
for log in logs :
if log [ ' timestamp ' ] is not None :
log [ ' timestamp ' ] = log [ ' timestamp ' ] . isoformat ( )
cloudlog . event ( " panda_log " , * * log , serial = serial )
params . put ( " PandaLogState " , json . dumps ( log_state ) )
except Exception :
cloudlog . exception ( f " Error getting logs for panda { serial } " )
def flash_panda ( panda_serial : str ) - > Panda :
def flash_panda ( panda_serial : str ) - > Panda :
try :
try :
@ -47,7 +90,6 @@ def flash_panda(panda_serial: str) -> Panda:
if panda . bootstub :
if panda . bootstub :
bootstub_version = panda . get_version ( )
bootstub_version = panda . get_version ( )
cloudlog . info ( f " Flashed firmware not booting, flashing development bootloader. { bootstub_version =} , { internal_panda =} " )
cloudlog . info ( f " Flashed firmware not booting, flashing development bootloader. { bootstub_version =} , { internal_panda =} " )
if internal_panda :
if internal_panda :
HARDWARE . recover_internal_panda ( )
HARDWARE . recover_internal_panda ( )
panda . recover ( reset = ( not internal_panda ) )
panda . recover ( reset = ( not internal_panda ) )
@ -139,6 +181,8 @@ def main() -> NoReturn:
params . put_bool ( " PandaHeartbeatLost " , True )
params . put_bool ( " PandaHeartbeatLost " , True )
cloudlog . event ( " heartbeat lost " , deviceState = health , serial = panda . get_usb_serial ( ) )
cloudlog . event ( " heartbeat lost " , deviceState = health , serial = panda . get_usb_serial ( ) )
read_panda_logs ( panda )
if first_run :
if first_run :
if panda . is_internal ( ) :
if panda . is_internal ( ) :
# update time from RTC
# update time from RTC