dragonpilot - 基於 openpilot 的開源駕駛輔助系統
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

129 lines
6.7 KiB

import os
import operator
import platform
from cereal import car
from openpilot.common.params import Params
from openpilot.system.hardware import PC, TICI
from openpilot.system.manager.process import PythonProcess, NativeProcess, DaemonProcess
WEBCAM = os.getenv("USE_WEBCAM") is not None
def driverview(started: bool, params: Params, CP: car.CarParams) -> bool:
return started or params.get_bool("IsDriverViewEnabled")
def notcar(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and CP.notCar
def iscar(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and not CP.notCar
def logging(started: bool, params: Params, CP: car.CarParams) -> bool:
run = not params.get_bool("DisableLogging")
return started and run
def ublox_available() -> bool:
return os.path.exists('/dev/ttyHS0') and not os.path.exists('/persist/comma/use-quectel-gps')
def ublox(started: bool, params: Params, CP: car.CarParams) -> bool:
use_ublox = ublox_available()
if use_ublox != params.get_bool("UbloxAvailable"):
params.put_bool("UbloxAvailable", use_ublox)
return started and use_ublox
def joystick(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and params.get_bool("JoystickDebugMode")
def not_joystick(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and not params.get_bool("JoystickDebugMode")
def long_maneuver(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and params.get_bool("LongitudinalManeuverMode")
def not_long_maneuver(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and not params.get_bool("LongitudinalManeuverMode")
def qcomgps(started: bool, params: Params, CP: car.CarParams) -> bool:
return started and not ublox_available()
def always_run(started: bool, params: Params, CP: car.CarParams) -> bool:
return True
def only_onroad(started: bool, params: Params, CP: car.CarParams) -> bool:
return started
def only_offroad(started: bool, params: Params, CP: car.CarParams) -> bool:
return not started
def dashy(started: bool, params: Params, CP: car.CarParams) -> bool:
return int(params.get("dp_dev_dashy") or 0) > 0
def dashy_with_video(started: bool, params: Params, CP: car.CarParams) -> bool:
return int(params.get("dp_dev_dashy") or 0) == 2
def or_(*fns):
return lambda *args: operator.or_(*(fn(*args) for fn in fns))
def and_(*fns):
return lambda *args: operator.and_(*(fn(*args) for fn in fns))
procs = [
DaemonProcess("manage_athenad", "system.athena.manage_athenad", "AthenadPid"),
NativeProcess("loggerd", "system/loggerd", ["./loggerd"], logging),
NativeProcess("encoderd", "system/loggerd", ["./encoderd"], only_onroad),
NativeProcess("stream_encoderd", "system/loggerd", ["./encoderd", "--stream"], or_(notcar, and_(dashy_with_video, only_onroad))),
PythonProcess("logmessaged", "system.logmessaged", always_run),
NativeProcess("camerad", "system/camerad", ["./camerad"], driverview, enabled=not WEBCAM),
PythonProcess("webcamerad", "tools.webcam.camerad", driverview, enabled=WEBCAM),
NativeProcess("logcatd", "system/logcatd", ["./logcatd"], only_onroad, platform.system() != "Darwin"),
NativeProcess("proclogd", "system/proclogd", ["./proclogd"], only_onroad, platform.system() != "Darwin"),
PythonProcess("micd", "system.micd", iscar, enabled=not os.getenv("LITE")),
PythonProcess("timed", "system.timed", always_run, enabled=not PC),
PythonProcess("modeld", "selfdrive.modeld.modeld", only_onroad),
PythonProcess("dmonitoringmodeld", "selfdrive.modeld.dmonitoringmodeld", driverview, enabled=(WEBCAM or not PC) and not os.getenv("LITE")),
PythonProcess("sensord", "system.sensord.sensord", only_onroad, enabled=not PC),
NativeProcess("ui", "selfdrive/ui", ["./ui"], always_run, watchdog_max_dt=(5 if not PC else None)),
PythonProcess("raylib_ui", "selfdrive.ui.ui", always_run, enabled=False, watchdog_max_dt=(5 if not PC else None)),
PythonProcess("soundd", "selfdrive.ui.soundd", only_onroad, enabled=not os.getenv("LITE")),
PythonProcess("beepd", "dragonpilot.selfdrive.ui.beepd", only_onroad, enabled=(Params().get_bool("dp_device_beep") and os.getenv("LITE"))),
PythonProcess("locationd", "selfdrive.locationd.locationd", only_onroad),
NativeProcess("_pandad", "selfdrive/pandad", ["./pandad"], always_run, enabled=False),
PythonProcess("calibrationd", "selfdrive.locationd.calibrationd", only_onroad),
PythonProcess("torqued", "selfdrive.locationd.torqued", only_onroad),
PythonProcess("controlsd", "selfdrive.controls.controlsd", and_(not_joystick, iscar)),
PythonProcess("joystickd", "tools.joystick.joystickd", or_(joystick, notcar)),
PythonProcess("selfdrived", "selfdrive.selfdrived.selfdrived", only_onroad),
PythonProcess("card", "selfdrive.car.card", only_onroad),
PythonProcess("deleter", "system.loggerd.deleter", always_run),
PythonProcess("dmonitoringd", "selfdrive.monitoring.dmonitoringd", driverview, enabled=(WEBCAM or not PC) and not os.getenv("LITE")),
PythonProcess("dpmonitoringd", "selfdrive.monitoring.dpmonitoringd", only_onroad, enabled=os.getenv("LITE")),
PythonProcess("qcomgpsd", "system.qcomgpsd.qcomgpsd", qcomgps, enabled=TICI),
PythonProcess("pandad", "selfdrive.pandad.pandad", always_run),
PythonProcess("paramsd", "selfdrive.locationd.paramsd", only_onroad),
PythonProcess("lagd", "selfdrive.locationd.lagd", only_onroad),
NativeProcess("ubloxd", "system/ubloxd", ["./ubloxd"], ublox, enabled=TICI),
PythonProcess("pigeond", "system.ubloxd.pigeond", ublox, enabled=TICI),
PythonProcess("plannerd", "selfdrive.controls.plannerd", not_long_maneuver),
PythonProcess("maneuversd", "tools.longitudinal_maneuvers.maneuversd", long_maneuver),
PythonProcess("radard", "selfdrive.controls.radard", only_onroad),
PythonProcess("hardwared", "system.hardware.hardwared", always_run),
PythonProcess("tombstoned", "system.tombstoned", always_run, enabled=not PC),
PythonProcess("updated", "system.updated.updated", only_offroad, enabled=not PC),
PythonProcess("uploader", "system.loggerd.uploader", always_run),
PythonProcess("statsd", "system.statsd", always_run),
PythonProcess("feedbackd", "selfdrive.ui.feedback.feedbackd", only_onroad, enabled=not os.getenv("LITE")),
# debug procs
NativeProcess("bridge", "cereal/messaging", ["./bridge"], or_(notcar, and_(dashy_with_video, only_onroad))),
PythonProcess("webrtcd", "system.webrtc.webrtcd", or_(notcar, and_(dashy_with_video, only_onroad))),
PythonProcess("webjoystick", "tools.bodyteleop.web", notcar),
PythonProcess("joystick", "tools.joystick.joystick_control", and_(joystick, iscar)),
PythonProcess("dashy", "dragonpilot.dashy.backend.server", dashy),
]
managed_processes = {p.name: p for p in procs}