pandad: refactor to consolidate threads, keep only one can_send thread (#32680)

* single thread

improve comment

* Keep can_send() running in a separate thread

* send send_peripheral_state in pandad_run

* new PandaSafety class
old-commit-hash: a4de8739e9
pull/33302/head
Dean Lee 9 months ago committed by GitHub
parent 7003776265
commit 04b0871c45
  1. 2
      selfdrive/pandad/SConscript
  2. 79
      selfdrive/pandad/panda_safety.cc
  3. 240
      selfdrive/pandad/pandad.cc
  4. 19
      selfdrive/pandad/pandad.h

@ -3,7 +3,7 @@ Import('env', 'envCython', 'common', 'messaging')
libs = ['usb-1.0', common, messaging, 'pthread'] libs = ['usb-1.0', common, messaging, 'pthread']
panda = env.Library('panda', ['panda.cc', 'panda_comms.cc', 'spi.cc']) panda = env.Library('panda', ['panda.cc', 'panda_comms.cc', 'spi.cc'])
env.Program('pandad', ['main.cc', 'pandad.cc'], LIBS=[panda] + libs) env.Program('pandad', ['main.cc', 'pandad.cc', 'panda_safety.cc'], LIBS=[panda] + libs)
env.Library('libcan_list_to_can_capnp', ['can_list_to_can_capnp.cc']) env.Library('libcan_list_to_can_capnp', ['can_list_to_can_capnp.cc'])
pandad_python = envCython.Program('pandad_api_impl.so', 'pandad_api_impl.pyx', LIBS=["can_list_to_can_capnp", 'capnp', 'kj'] + envCython["LIBS"]) pandad_python = envCython.Program('pandad_api_impl.so', 'pandad_api_impl.pyx', LIBS=["can_list_to_can_capnp", 'capnp', 'kj'] + envCython["LIBS"])

@ -0,0 +1,79 @@
#include "selfdrive/pandad/pandad.h"
#include "cereal/messaging/messaging.h"
#include "common/swaglog.h"
void PandaSafety::configureSafetyMode() {
bool is_onroad = params_.getBool("IsOnroad");
if (is_onroad && !safety_configured_) {
updateMultiplexingMode();
auto car_params = fetchCarParams();
if (!car_params.empty()) {
LOGW("got %lu bytes CarParams", car_params.size());
setSafetyMode(car_params);
safety_configured_ = true;
}
} else if (!is_onroad) {
initialized_ = false;
safety_configured_ = false;
}
}
void PandaSafety::updateMultiplexingMode() {
// Initialize to ELM327 without OBD multiplexing for initial fingerprinting
if (!initialized_) {
prev_obd_multiplexing_ = false;
for (int i = 0; i < pandas_.size(); ++i) {
pandas_[i]->set_safety_model(cereal::CarParams::SafetyModel::ELM327, 1U);
}
initialized_ = true;
}
// Switch between multiplexing modes based on the OBD multiplexing request
bool obd_multiplexing_requested = params_.getBool("ObdMultiplexingEnabled");
if (obd_multiplexing_requested != prev_obd_multiplexing_) {
for (int i = 0; i < pandas_.size(); ++i) {
const uint16_t safety_param = (i > 0 || !obd_multiplexing_requested) ? 1U : 0U;
pandas_[i]->set_safety_model(cereal::CarParams::SafetyModel::ELM327, safety_param);
}
prev_obd_multiplexing_ = obd_multiplexing_requested;
params_.putBool("ObdMultiplexingChanged", true);
}
}
std::string PandaSafety::fetchCarParams() {
if (!params_.getBool("FirmwareQueryDone")) {
return {};
}
LOGW("Finished FW query");
LOGW("Waiting for params to set safety model");
if (!params_.getBool("ControlsReady")) {
return {};
}
return params_.get("CarParams");
}
void PandaSafety::setSafetyMode(const std::string &params_string) {
AlignedBuffer aligned_buf;
capnp::FlatArrayMessageReader cmsg(aligned_buf.align(params_string.data(), params_string.size()));
cereal::CarParams::Reader car_params = cmsg.getRoot<cereal::CarParams>();
auto safety_configs = car_params.getSafetyConfigs();
uint16_t alternative_experience = car_params.getAlternativeExperience();
for (int i = 0; i < pandas_.size(); ++i) {
// Default to SILENT safety model if not specified
cereal::CarParams::SafetyModel safety_model = cereal::CarParams::SafetyModel::SILENT;
uint16_t safety_param = 0U;
if (i < safety_configs.size()) {
safety_model = safety_configs[i].getSafetyModel();
safety_param = safety_configs[i].getSafetyParam();
}
LOGW("Panda %d: setting safety model: %d, param: %d, alternative experience: %d", i, (int)safety_model, safety_param, alternative_experience);
pandas_[i]->set_alternative_experience(alternative_experience);
pandas_[i]->set_safety_model(safety_model, safety_param);
}
}

@ -2,18 +2,15 @@
#include <algorithm> #include <algorithm>
#include <array> #include <array>
#include <atomic>
#include <bitset> #include <bitset>
#include <cassert> #include <cassert>
#include <cerrno> #include <cerrno>
#include <chrono>
#include <future>
#include <memory> #include <memory>
#include <thread> #include <thread>
#include <utility>
#include "cereal/gen/cpp/car.capnp.h" #include "cereal/gen/cpp/car.capnp.h"
#include "cereal/messaging/messaging.h" #include "cereal/messaging/messaging.h"
#include "common/params.h"
#include "common/ratekeeper.h" #include "common/ratekeeper.h"
#include "common/swaglog.h" #include "common/swaglog.h"
#include "common/timing.h" #include "common/timing.h"
@ -43,9 +40,6 @@
#define MIN_IR_POWER 0.0f #define MIN_IR_POWER 0.0f
#define CUTOFF_IL 400 #define CUTOFF_IL 400
#define SATURATE_IL 1000 #define SATURATE_IL 1000
using namespace std::chrono_literals;
std::atomic<bool> ignition(false);
ExitHandler do_exit; ExitHandler do_exit;
@ -59,88 +53,6 @@ bool check_all_connected(const std::vector<Panda *> &pandas) {
return true; return true;
} }
bool safety_setter_thread(std::vector<Panda *> pandas) {
LOGD("Starting safety setter thread");
Params p;
// there should be at least one panda connected
if (pandas.size() == 0) {
return false;
}
// initialize to ELM327 without OBD multiplexing for fingerprinting
bool obd_multiplexing_enabled = false;
for (int i = 0; i < pandas.size(); i++) {
pandas[i]->set_safety_model(cereal::CarParams::SafetyModel::ELM327, 1U);
}
// openpilot can switch between multiplexing modes for different FW queries
while (true) {
if (do_exit || !check_all_connected(pandas) || !ignition) {
return false;
}
bool obd_multiplexing_requested = p.getBool("ObdMultiplexingEnabled");
if (obd_multiplexing_requested != obd_multiplexing_enabled) {
for (int i = 0; i < pandas.size(); i++) {
const uint16_t safety_param = (i > 0 || !obd_multiplexing_requested) ? 1U : 0U;
pandas[i]->set_safety_model(cereal::CarParams::SafetyModel::ELM327, safety_param);
}
obd_multiplexing_enabled = obd_multiplexing_requested;
p.putBool("ObdMultiplexingChanged", true);
}
if (p.getBool("FirmwareQueryDone")) {
LOGW("finished FW query");
break;
}
util::sleep_for(20);
}
std::string params;
LOGW("waiting for params to set safety model");
while (true) {
if (do_exit || !check_all_connected(pandas) || !ignition) {
return false;
}
if (p.getBool("ControlsReady")) {
params = p.get("CarParams");
if (params.size() > 0) break;
}
util::sleep_for(100);
}
LOGW("got %lu bytes CarParams", params.size());
AlignedBuffer aligned_buf;
capnp::FlatArrayMessageReader cmsg(aligned_buf.align(params.data(), params.size()));
cereal::CarParams::Reader car_params = cmsg.getRoot<cereal::CarParams>();
cereal::CarParams::SafetyModel safety_model;
uint16_t safety_param;
auto safety_configs = car_params.getSafetyConfigs();
uint16_t alternative_experience = car_params.getAlternativeExperience();
for (uint32_t i = 0; i < pandas.size(); i++) {
auto panda = pandas[i];
if (safety_configs.size() > i) {
safety_model = safety_configs[i].getSafetyModel();
safety_param = safety_configs[i].getSafetyParam();
} else {
// If no safety mode is specified, default to silent
safety_model = cereal::CarParams::SafetyModel::SILENT;
safety_param = 0U;
}
LOGW("panda %d: setting safety model: %d, param: %d, alternative experience: %d", i, (int)safety_model, safety_param, alternative_experience);
panda->set_alternative_experience(alternative_experience);
panda->set_safety_model(safety_model, safety_param);
}
return true;
}
Panda *connect(std::string serial="", uint32_t index=0) { Panda *connect(std::string serial="", uint32_t index=0) {
std::unique_ptr<Panda> panda; std::unique_ptr<Panda> panda;
try { try {
@ -197,16 +109,9 @@ void can_send_thread(std::vector<Panda *> pandas, bool fake_send) {
} }
} }
void can_recv_thread(std::vector<Panda *> pandas) { void can_recv(std::vector<Panda *> &pandas, PubMaster *pm) {
util::set_thread_name("pandad_can_recv"); static std::vector<can_frame> raw_can_data;
{
PubMaster pm({"can"});
// run at 100Hz
RateKeeper rk("pandad_can_recv", 100);
std::vector<can_frame> raw_can_data;
while (!do_exit && check_all_connected(pandas)) {
bool comms_healthy = true; bool comms_healthy = true;
raw_can_data.clear(); raw_can_data.clear();
for (const auto& panda : pandas) { for (const auto& panda : pandas) {
@ -217,14 +122,12 @@ void can_recv_thread(std::vector<Panda *> pandas) {
auto evt = msg.initEvent(); auto evt = msg.initEvent();
evt.setValid(comms_healthy); evt.setValid(comms_healthy);
auto canData = evt.initCan(raw_can_data.size()); auto canData = evt.initCan(raw_can_data.size());
for (uint i = 0; i<raw_can_data.size(); i++) { for (size_t i = 0; i < raw_can_data.size(); ++i) {
canData[i].setAddress(raw_can_data[i].address); canData[i].setAddress(raw_can_data[i].address);
canData[i].setDat(kj::arrayPtr((uint8_t*)raw_can_data[i].dat.data(), raw_can_data[i].dat.size())); canData[i].setDat(kj::arrayPtr((uint8_t*)raw_can_data[i].dat.data(), raw_can_data[i].dat.size()));
canData[i].setSrc(raw_can_data[i].src); canData[i].setSrc(raw_can_data[i].src);
} }
pm.send("can", msg); pm->send("can", msg);
rk.keepTime();
} }
} }
@ -386,7 +289,7 @@ std::optional<bool> send_panda_states(PubMaster *pm, const std::vector<Panda *>
return ignition_local; return ignition_local;
} }
void send_peripheral_state(PubMaster *pm, Panda *panda) { void send_peripheral_state(Panda *panda, PubMaster *pm) {
// build msg // build msg
MessageBuilder msg; MessageBuilder msg;
auto evt = msg.initEvent(); auto evt = msg.initEvent();
@ -409,46 +312,23 @@ void send_peripheral_state(PubMaster *pm, Panda *panda) {
pm->send("peripheralState", msg); pm->send("peripheralState", msg);
} }
void panda_state_thread(std::vector<Panda *> pandas, bool spoofing_started) { void process_panda_state(std::vector<Panda *> &pandas, PubMaster *pm, bool spoofing_started) {
util::set_thread_name("pandad_panda_state"); static SubMaster sm({"controlsState"});
Params params;
SubMaster sm({"controlsState"});
PubMaster pm({"pandaStates", "peripheralState"});
Panda *peripheral_panda = pandas[0];
bool is_onroad = false;
bool is_onroad_last = false;
std::future<bool> safety_future;
std::vector<std::string> connected_serials; std::vector<std::string> connected_serials;
for (Panda *p : pandas) { for (Panda *p : pandas) {
connected_serials.push_back(p->hw_serial()); connected_serials.push_back(p->hw_serial());
} }
LOGD("start panda state thread"); {
auto ignition_opt = send_panda_states(pm, pandas, spoofing_started);
// run at 10hz
RateKeeper rk("panda_state_thread", 10);
while (!do_exit && check_all_connected(pandas)) {
// send out peripheralState at 2Hz
if (sm.frame % 5 == 0) {
send_peripheral_state(&pm, peripheral_panda);
}
auto ignition_opt = send_panda_states(&pm, pandas, spoofing_started);
if (!ignition_opt) { if (!ignition_opt) {
LOGE("Failed to get ignition_opt"); LOGE("Failed to get ignition_opt");
rk.keepTime(); return;
continue;
} }
ignition = *ignition_opt;
// check if we should have pandad reconnect // check if we should have pandad reconnect
if (!ignition) { if (!ignition_opt.value()) {
bool comms_healthy = true; bool comms_healthy = true;
for (const auto &panda : pandas) { for (const auto &panda : pandas) {
comms_healthy &= panda->comms_healthy(); comms_healthy &= panda->comms_healthy();
@ -468,52 +348,28 @@ void panda_state_thread(std::vector<Panda *> pandas, bool spoofing_started) {
} }
} }
} }
if (do_exit) {
break;
}
} }
is_onroad = params.getBool("IsOnroad");
// set new safety on onroad transition, after params are cleared
if (is_onroad && !is_onroad_last) {
if (!safety_future.valid() || safety_future.wait_for(0ms) == std::future_status::ready) {
safety_future = std::async(std::launch::async, safety_setter_thread, pandas);
} else {
LOGW("Safety setter thread already running");
}
}
is_onroad_last = is_onroad;
sm.update(0); sm.update(0);
const bool engaged = sm.allAliveAndValid({"controlsState"}) && sm["controlsState"].getControlsState().getEnabled(); const bool engaged = sm.allAliveAndValid({"controlsState"}) && sm["controlsState"].getControlsState().getEnabled();
for (const auto &panda : pandas) { for (const auto &panda : pandas) {
panda->send_heartbeat(engaged); panda->send_heartbeat(engaged);
} }
rk.keepTime();
} }
} }
void process_peripheral_state(Panda *panda, PubMaster *pm, bool no_fan_control) {
static SubMaster sm({"deviceState", "driverCameraState"});
void peripheral_control_thread(Panda *panda, bool no_fan_control) { static uint64_t last_driver_camera_t = 0;
util::set_thread_name("pandad_peripheral_control"); static uint16_t prev_fan_speed = 999;
static uint16_t ir_pwr = 0;
SubMaster sm({"deviceState", "driverCameraState"}); static uint16_t prev_ir_pwr = 999;
uint64_t last_driver_camera_t = 0;
uint16_t prev_fan_speed = 999;
uint16_t ir_pwr = 0;
uint16_t prev_ir_pwr = 999;
FirstOrderFilter integ_lines_filter(0, 30.0, 0.05);
while (!do_exit && panda->connected()) { static FirstOrderFilter integ_lines_filter(0, 30.0, 0.05);
sm.update(1000);
{
sm.update(0);
if (sm.updated("deviceState") && !no_fan_control) { if (sm.updated("deviceState") && !no_fan_control) {
// Fan speed // Fan speed
uint16_t fan_speed = sm["deviceState"].getDeviceState().getFanSpeedPercentDesired(); uint16_t fan_speed = sm["deviceState"].getDeviceState().getFanSpeedPercentDesired();
@ -551,9 +407,46 @@ void peripheral_control_thread(Panda *panda, bool no_fan_control) {
} }
} }
void pandad_main_thread(std::vector<std::string> serials) { void pandad_run(std::vector<Panda *> &pandas) {
LOGW("launching pandad"); const bool no_fan_control = getenv("NO_FAN_CONTROL") != nullptr;
const bool spoofing_started = getenv("STARTED") != nullptr;
const bool fake_send = getenv("FAKESEND") != nullptr;
// Start the CAN send thread
std::thread send_thread(can_send_thread, pandas, fake_send);
RateKeeper rk("pandad", 100);
PubMaster pm({"can", "pandaStates", "peripheralState"});
PandaSafety panda_safety(pandas);
Panda *peripheral_panda = pandas[0];
// Main loop: receive CAN data and process states
while (!do_exit && check_all_connected(pandas)) {
can_recv(pandas, &pm);
// Process peripheral state at 20 Hz
if (rk.frame() % 5 == 0) {
process_peripheral_state(peripheral_panda, &pm, no_fan_control);
}
// Process panda state at 10 Hz
if (rk.frame() % 10 == 0) {
process_panda_state(pandas, &pm, spoofing_started);
panda_safety.configureSafetyMode();
}
// Send out peripheralState at 2Hz
if (rk.frame() % 50 == 0) {
send_peripheral_state(peripheral_panda, &pm);
}
rk.keepTime();
}
send_thread.join();
}
void pandad_main_thread(std::vector<std::string> serials) {
if (serials.size() == 0) { if (serials.size() == 0) {
serials = Panda::list(); serials = Panda::list();
@ -585,16 +478,7 @@ void pandad_main_thread(std::vector<std::string> serials) {
if (!do_exit) { if (!do_exit) {
LOGW("connected to all pandas"); LOGW("connected to all pandas");
pandad_run(pandas);
std::vector<std::thread> threads;
threads.emplace_back(panda_state_thread, pandas, getenv("STARTED") != nullptr);
threads.emplace_back(peripheral_control_thread, pandas[0], getenv("NO_FAN_CONTROL") != nullptr);
threads.emplace_back(can_send_thread, pandas, getenv("FAKESEND") != nullptr);
threads.emplace_back(can_recv_thread, pandas);
for (auto &t : threads) t.join();
} }
for (Panda *panda : pandas) { for (Panda *panda : pandas) {

@ -3,7 +3,24 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "common/params.h"
#include "selfdrive/pandad/panda.h" #include "selfdrive/pandad/panda.h"
bool safety_setter_thread(std::vector<Panda *> pandas);
void pandad_main_thread(std::vector<std::string> serials); void pandad_main_thread(std::vector<std::string> serials);
class PandaSafety {
public:
PandaSafety(const std::vector<Panda *> &pandas) : pandas_(pandas) {}
void configureSafetyMode();
private:
void updateMultiplexingMode();
std::string fetchCarParams();
void setSafetyMode(const std::string &params_string);
bool initialized_ = false;
bool safety_configured_ = false;
bool prev_obd_multiplexing_ = false;
std::vector<Panda *> pandas_;
Params params_;
};

Loading…
Cancel
Save