cabana: support direct streaming from panda over USB (#27936)
* refactor livestream into devicestream * add panda stream * unused * whitespace * move logging to base class * add cmdline args * Update selfdrive/boardd/boardd.cc --------- Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>pull/27943/head
parent
a324d79fa3
commit
2a981f5531
11 changed files with 211 additions and 57 deletions
@ -0,0 +1,27 @@ |
||||
#include "tools/cabana/streams/devicestream.h" |
||||
|
||||
DeviceStream::DeviceStream(QObject *parent, QString address) : zmq_address(address), LiveStream(parent) { |
||||
} |
||||
|
||||
void DeviceStream::streamThread() { |
||||
if (!zmq_address.isEmpty()) { |
||||
setenv("ZMQ", "1", 1); |
||||
} |
||||
|
||||
std::unique_ptr<Context> context(Context::create()); |
||||
std::string address = zmq_address.isEmpty() ? "127.0.0.1" : zmq_address.toStdString(); |
||||
std::unique_ptr<SubSocket> sock(SubSocket::create(context.get(), "can", address)); |
||||
assert(sock != NULL); |
||||
sock->setTimeout(50); |
||||
// run as fast as messages come in
|
||||
while (!QThread::currentThread()->isInterruptionRequested()) { |
||||
Message *msg = sock->receive(true); |
||||
if (!msg) { |
||||
QThread::msleep(50); |
||||
continue; |
||||
} |
||||
|
||||
std::lock_guard lk(lock); |
||||
handleEvent(messages.emplace_back(msg).event); |
||||
} |
||||
} |
@ -0,0 +1,17 @@ |
||||
#pragma once |
||||
|
||||
#include "tools/cabana/streams/livestream.h" |
||||
|
||||
class DeviceStream : public LiveStream { |
||||
Q_OBJECT |
||||
public: |
||||
DeviceStream(QObject *parent, QString address = {}); |
||||
|
||||
inline QString routeName() const override { |
||||
return QString("Live Streaming From %1").arg(zmq_address.isEmpty() ? "127.0.0.1" : zmq_address); |
||||
} |
||||
|
||||
protected: |
||||
void streamThread() override; |
||||
const QString zmq_address; |
||||
}; |
@ -0,0 +1,85 @@ |
||||
#include "tools/cabana/streams/pandastream.h" |
||||
|
||||
PandaStream::PandaStream(QObject *parent, PandaStreamConfig config_) : config(config_), LiveStream(parent) { |
||||
if (config.serial.isEmpty()) { |
||||
auto serials = Panda::list(); |
||||
if (serials.size() == 0) { |
||||
throw std::runtime_error("No panda found"); |
||||
} |
||||
config.serial = QString::fromStdString(serials[0]); |
||||
} |
||||
|
||||
qDebug() << "Connecting to panda with serial" << config.serial; |
||||
if (!connect()) { |
||||
throw std::runtime_error("Failed to connect to panda"); |
||||
} |
||||
} |
||||
|
||||
bool PandaStream::connect() { |
||||
try { |
||||
panda.reset(new Panda(config.serial.toStdString())); |
||||
config.bus_config.resize(3); |
||||
qDebug() << "Connected"; |
||||
} catch (const std::exception& e) { |
||||
return false; |
||||
} |
||||
|
||||
panda->set_safety_model(cereal::CarParams::SafetyModel::SILENT); |
||||
|
||||
for (int bus = 0; bus < config.bus_config.size(); bus++) { |
||||
panda->set_can_speed_kbps(bus, config.bus_config[bus].can_speed_kbps); |
||||
|
||||
// CAN-FD
|
||||
if (panda->hw_type == cereal::PandaState::PandaType::RED_PANDA || panda->hw_type == cereal::PandaState::PandaType::RED_PANDA_V2) { |
||||
if (config.bus_config[bus].can_fd) { |
||||
panda->set_data_speed_kbps(bus, config.bus_config[bus].data_speed_kbps); |
||||
} else { |
||||
// Hack to disable can-fd by setting data speed to a low value
|
||||
panda->set_data_speed_kbps(bus, 10); |
||||
} |
||||
} |
||||
|
||||
} |
||||
return true; |
||||
} |
||||
|
||||
void PandaStream::streamThread() { |
||||
std::vector<can_frame> raw_can_data; |
||||
|
||||
while (!QThread::currentThread()->isInterruptionRequested()) { |
||||
QThread::msleep(1); |
||||
|
||||
if (!panda->connected()) { |
||||
qDebug() << "Connection to panda lost. Attempting reconnect."; |
||||
if (!connect()){ |
||||
QThread::msleep(1000); |
||||
continue; |
||||
} |
||||
} |
||||
|
||||
raw_can_data.clear(); |
||||
if (!panda->can_receive(raw_can_data)) { |
||||
qDebug() << "failed to receive"; |
||||
continue; |
||||
} |
||||
|
||||
MessageBuilder msg; |
||||
auto evt = msg.initEvent(); |
||||
auto canData = evt.initCan(raw_can_data.size()); |
||||
|
||||
for (uint i = 0; i<raw_can_data.size(); i++) { |
||||
canData[i].setAddress(raw_can_data[i].address); |
||||
canData[i].setBusTime(raw_can_data[i].busTime); |
||||
canData[i].setDat(kj::arrayPtr((uint8_t*)raw_can_data[i].dat.data(), raw_can_data[i].dat.size())); |
||||
canData[i].setSrc(raw_can_data[i].src); |
||||
} |
||||
|
||||
{ |
||||
std::lock_guard lk(lock); |
||||
auto bytes = msg.toBytes(); |
||||
handleEvent(messages.emplace_back((const char*)bytes.begin(), bytes.size()).event); |
||||
} |
||||
|
||||
panda->send_heartbeat(false); |
||||
} |
||||
} |
@ -0,0 +1,33 @@ |
||||
#pragma once |
||||
|
||||
#include "tools/cabana/streams/livestream.h" |
||||
#include "selfdrive/boardd/panda.h" |
||||
|
||||
struct BusConfig { |
||||
int can_speed_kbps = 500; |
||||
int data_speed_kbps = 2000; |
||||
bool can_fd = false; |
||||
}; |
||||
|
||||
struct PandaStreamConfig { |
||||
QString serial = ""; |
||||
std::vector<BusConfig> bus_config; |
||||
}; |
||||
|
||||
class PandaStream : public LiveStream { |
||||
Q_OBJECT |
||||
public: |
||||
PandaStream(QObject *parent, PandaStreamConfig config_ = {}); |
||||
|
||||
inline QString routeName() const override { |
||||
return QString("Live Streaming From Panda %1").arg(config.serial); |
||||
} |
||||
|
||||
protected: |
||||
void streamThread() override; |
||||
bool connect(); |
||||
|
||||
std::unique_ptr<Panda> panda; |
||||
PandaStreamConfig config = {}; |
||||
}; |
||||
|
Loading…
Reference in new issue