openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

142 lines
4.3 KiB

5 years ago
#!/usr/bin/env python3
# simple boardd wrapper that updates the panda first
import os
import usb1
5 years ago
import time
import subprocess
from typing import List, NoReturn
from functools import cmp_to_key
5 years ago
from panda import Panda, PandaDFU
from common.basedir import BASEDIR
from common.params import Params
from system.hardware import HARDWARE
from system.swaglog import cloudlog
def get_expected_signature(panda: Panda) -> bytes:
try:
return Panda.get_signature_from_firmware(panda.get_mcu_type().config.app_path)
except Exception:
cloudlog.exception("Error computing expected signature")
return b""
5 years ago
def flash_panda(panda_serial: str) -> Panda:
panda = Panda(panda_serial)
5 years ago
fw_signature = get_expected_signature(panda)
internal_panda = panda.is_internal() and not panda.bootstub
5 years ago
panda_version = "bootstub" if panda.bootstub else panda.get_version()
panda_signature = b"" if panda.bootstub else panda.get_signature()
cloudlog.warning(f"Panda {panda_serial} connected, version: {panda_version}, signature {panda_signature.hex()[:16]}, expected {fw_signature.hex()[:16]}")
5 years ago
if panda.bootstub or panda_signature != fw_signature:
cloudlog.info("Panda firmware out of date, update required")
panda.flash()
5 years ago
cloudlog.info("Done flashing")
if panda.bootstub:
bootstub_version = panda.get_version()
cloudlog.info(f"Flashed firmware not booting, flashing development bootloader. Bootstub version: {bootstub_version}")
if internal_panda:
HARDWARE.recover_internal_panda()
panda.recover(reset=(not internal_panda))
5 years ago
cloudlog.info("Done flashing bootloader")
if panda.bootstub:
cloudlog.info("Panda still not booting, exiting")
raise AssertionError
panda_signature = panda.get_signature()
if panda_signature != fw_signature:
cloudlog.info("Version mismatch after flashing, exiting")
raise AssertionError
return panda
def panda_sort_cmp(a: Panda, b: Panda):
a_type = a.get_type()
b_type = b.get_type()
# make sure the internal one is always first
if a.is_internal() and not b.is_internal():
return -1
if not a.is_internal() and b.is_internal():
return 1
# sort by hardware type
if a_type != b_type:
return a_type < b_type
# last resort: sort by serial number
return a.get_usb_serial() < b.get_usb_serial()
5 years ago
def main() -> NoReturn:
first_run = True
params = Params()
while True:
try:
params.remove("PandaSignatures")
# Flash all Pandas in DFU mode
for serial in PandaDFU.list():
cloudlog.info(f"Panda in DFU mode found, flashing recovery {serial}")
PandaDFU(serial).recover()
time.sleep(1)
panda_serials = Panda.list()
if len(panda_serials) == 0:
if first_run:
cloudlog.info("Resetting internal panda")
HARDWARE.reset_internal_panda()
time.sleep(2) # wait to come back up
continue
cloudlog.info(f"{len(panda_serials)} panda(s) found, connecting - {panda_serials}")
# Flash pandas
pandas: List[Panda] = []
for serial in panda_serials:
pandas.append(flash_panda(serial))
# check health for lost heartbeat
for panda in pandas:
health = panda.health()
if health["heartbeat_lost"]:
params.put_bool("PandaHeartbeatLost", True)
cloudlog.event("heartbeat lost", deviceState=health, serial=panda.get_usb_serial())
if first_run:
cloudlog.info(f"Resetting panda {panda.get_usb_serial()}")
panda.reset()
# sort pandas to have deterministic order
pandas.sort(key=cmp_to_key(panda_sort_cmp))
panda_serials = list(map(lambda p: p.get_usb_serial(), pandas)) # type: ignore
# log panda fw versions
params.put("PandaSignatures", b','.join(p.get_signature() for p in pandas))
# close all pandas
for p in pandas:
p.close()
except (usb1.USBErrorNoDevice, usb1.USBErrorPipe):
# a panda was disconnected while setting everything up. let's try again
cloudlog.exception("Panda USB exception while setting up")
continue
first_run = False
# run boardd with all connected serials as arguments
os.environ['MANAGER_DAEMON'] = 'boardd'
os.chdir(os.path.join(BASEDIR, "selfdrive/boardd"))
subprocess.run(["./boardd", *panda_serials], check=True)
5 years ago
if __name__ == "__main__":
main()