openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

96 lines
3.7 KiB

#!/usr/bin/env python3
import capnp
import time
import cereal.messaging as messaging
from panda.python.uds import SERVICE_TYPE
from openpilot.selfdrive.car import make_can_msg
from openpilot.selfdrive.car.fw_query_definitions import EcuAddrBusType
from openpilot.selfdrive.pandad import can_list_to_can_capnp
from openpilot.common.swaglog import cloudlog
def make_tester_present_msg(addr, bus, subaddr=None):
dat = [0x02, SERVICE_TYPE.TESTER_PRESENT, 0x0]
if subaddr is not None:
dat.insert(0, subaddr)
dat.extend([0x0] * (8 - len(dat)))
return make_can_msg(addr, bytes(dat), bus)
def is_tester_present_response(msg: capnp.lib.capnp._DynamicStructReader, subaddr: int = None) -> bool:
# ISO-TP messages are always padded to 8 bytes
# tester present response is always a single frame
dat_offset = 1 if subaddr is not None else 0
if len(msg.dat) == 8 and 1 <= msg.dat[dat_offset] <= 7:
# success response
if msg.dat[dat_offset + 1] == (SERVICE_TYPE.TESTER_PRESENT + 0x40):
return True
# error response
if msg.dat[dat_offset + 1] == 0x7F and msg.dat[dat_offset + 2] == SERVICE_TYPE.TESTER_PRESENT:
return True
return False
def get_all_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, bus: int, timeout: float = 1, debug: bool = True) -> set[EcuAddrBusType]:
addr_list = [0x700 + i for i in range(256)] + [0x18da00f1 + (i << 8) for i in range(256)]
queries: set[EcuAddrBusType] = {(addr, None, bus) for addr in addr_list}
responses = queries
return get_ecu_addrs(logcan, sendcan, queries, responses, timeout=timeout, debug=debug)
def get_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, queries: set[EcuAddrBusType],
responses: set[EcuAddrBusType], timeout: float = 1, debug: bool = False) -> set[EcuAddrBusType]:
ecu_responses: set[EcuAddrBusType] = set() # set((addr, subaddr, bus),)
try:
msgs = [make_tester_present_msg(addr, bus, subaddr) for addr, subaddr, bus in queries]
messaging.drain_sock_raw(logcan)
sendcan.send(can_list_to_can_capnp(msgs, msgtype='sendcan'))
start_time = time.monotonic()
while time.monotonic() - start_time < timeout:
can_packets = messaging.drain_sock(logcan, wait_for_one=True)
for packet in can_packets:
for msg in packet.can:
if not len(msg.dat):
cloudlog.warning("ECU addr scan: skipping empty remote frame")
continue
subaddr = None if (msg.address, None, msg.src) in responses else msg.dat[0]
if (msg.address, subaddr, msg.src) in responses and is_tester_present_response(msg, subaddr):
if debug:
print(f"CAN-RX: {hex(msg.address)} - 0x{bytes.hex(msg.dat)}")
if (msg.address, subaddr, msg.src) in ecu_responses:
print(f"Duplicate ECU address: {hex(msg.address)}")
ecu_responses.add((msg.address, subaddr, msg.src))
except Exception:
cloudlog.exception("ECU addr scan exception")
return ecu_responses
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Get addresses of all ECUs')
parser.add_argument('--debug', action='store_true')
parser.add_argument('--bus', type=int, default=1)
parser.add_argument('--timeout', type=float, default=1.0)
args = parser.parse_args()
logcan = messaging.sub_sock('can')
sendcan = messaging.pub_sock('sendcan')
time.sleep(1.0)
print("Getting ECU addresses ...")
ecu_addrs = get_all_ecu_addrs(logcan, sendcan, args.bus, args.timeout, debug=args.debug)
print()
print("Found ECUs on addresses:")
for addr, subaddr, _ in ecu_addrs:
msg = f" 0x{hex(addr)}"
if subaddr is not None:
msg += f" (sub-address: {hex(subaddr)})"
print(msg)