FPv2: log all present ECU addresses (#24916)
	
		
	
				
					
				
			* eliminate brands based on ECUs that respond to tester present * make it work * Add type hint for can message Use make_can_msg * Only query for addresses in fingerprints, and account for different busses * These need to be addresses, not response addresses * We need to listen to response addresses, not query addresses * add to files_common * Unused Optional Drain sock raw * add logging * only query essential ecus comments * simplify get_brand_candidates(), keep track of multiple request variants per make and request each subaddress * fixes make dat bytes bus is src Fix check * (addr, subaddr, bus) can be common across brands, add a match to each brand * fix length * query subaddrs in sequence * fix * candidate if a platform is a subset of responding ecu addresses comment comment * do logging for shadow mode * log responses so we can calculate candidates offline * get has_subaddress from response set * one liner * fix mypy * set to default at top * always log for now * log to make sure it's taking exactly timeout time * import time * fix logging * 0.1 timeout * clean up Co-authored-by: Greg Hogan <gregjhogan@gmail.com>pull/83/head
							parent
							
								
									59055724d6
								
							
						
					
					
						commit
						cccab50b16
					
				
				 4 changed files with 135 additions and 6 deletions
			
			
		@ -0,0 +1,90 @@ | 
				
			|||||||
 | 
					#!/usr/bin/env python3 | 
				
			||||||
 | 
					import capnp | 
				
			||||||
 | 
					import time | 
				
			||||||
 | 
					import traceback | 
				
			||||||
 | 
					from typing import Optional, Set, Tuple | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import cereal.messaging as messaging | 
				
			||||||
 | 
					from panda.python.uds import SERVICE_TYPE | 
				
			||||||
 | 
					from selfdrive.car import make_can_msg | 
				
			||||||
 | 
					from selfdrive.boardd.boardd import can_list_to_can_capnp | 
				
			||||||
 | 
					from system.swaglog import cloudlog | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def make_tester_present_msg(addr, bus, subaddr=None): | 
				
			||||||
 | 
					  dat = [0x02, SERVICE_TYPE.TESTER_PRESENT, 0x0] | 
				
			||||||
 | 
					  if subaddr is not None: | 
				
			||||||
 | 
					    dat.insert(0, subaddr) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  dat.extend([0x0] * (8 - len(dat))) | 
				
			||||||
 | 
					  return make_can_msg(addr, bytes(dat), bus) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def is_tester_present_response(msg: capnp.lib.capnp._DynamicStructReader, subaddr: Optional[int] = None) -> bool: | 
				
			||||||
 | 
					  # ISO-TP messages are always padded to 8 bytes | 
				
			||||||
 | 
					  # tester present response is always a single frame | 
				
			||||||
 | 
					  dat_offset = 1 if subaddr is not None else 0 | 
				
			||||||
 | 
					  if len(msg.dat) == 8 and 1 <= msg.dat[dat_offset] <= 7: | 
				
			||||||
 | 
					    # success response | 
				
			||||||
 | 
					    if msg.dat[dat_offset + 1] == (SERVICE_TYPE.TESTER_PRESENT + 0x40): | 
				
			||||||
 | 
					      return True | 
				
			||||||
 | 
					    # error response | 
				
			||||||
 | 
					    if msg.dat[dat_offset + 1] == 0x7F and msg.dat[dat_offset + 2] == SERVICE_TYPE.TESTER_PRESENT: | 
				
			||||||
 | 
					      return True | 
				
			||||||
 | 
					  return False | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def get_all_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, bus: int, timeout: float = 1, debug: bool = True) -> Set[Tuple[int, Optional[int], int]]: | 
				
			||||||
 | 
					  addr_list = [0x700 + i for i in range(256)] + [0x18da00f1 + (i << 8) for i in range(256)] | 
				
			||||||
 | 
					  queries: Set[Tuple[int, Optional[int], int]] = {(addr, None, bus) for addr in addr_list} | 
				
			||||||
 | 
					  responses = queries | 
				
			||||||
 | 
					  return get_ecu_addrs(logcan, sendcan, queries, responses, timeout=timeout, debug=debug) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def get_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, queries: Set[Tuple[int, Optional[int], int]], | 
				
			||||||
 | 
					                  responses: Set[Tuple[int, Optional[int], int]], timeout: float = 1, debug: bool = False) -> Set[Tuple[int, Optional[int], int]]: | 
				
			||||||
 | 
					  ecu_responses: Set[Tuple[int, Optional[int], int]] = set()  # set((addr, subaddr, bus),) | 
				
			||||||
 | 
					  try: | 
				
			||||||
 | 
					    msgs = [make_tester_present_msg(addr, bus, subaddr) for addr, subaddr, bus in queries] | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    messaging.drain_sock_raw(logcan) | 
				
			||||||
 | 
					    sendcan.send(can_list_to_can_capnp(msgs, msgtype='sendcan')) | 
				
			||||||
 | 
					    start_time = time.monotonic() | 
				
			||||||
 | 
					    while time.monotonic() - start_time < timeout: | 
				
			||||||
 | 
					      can_packets = messaging.drain_sock(logcan, wait_for_one=True) | 
				
			||||||
 | 
					      for packet in can_packets: | 
				
			||||||
 | 
					        for msg in packet.can: | 
				
			||||||
 | 
					          subaddr = None if (msg.address, None, msg.src) in responses else msg.dat[0] | 
				
			||||||
 | 
					          if (msg.address, subaddr, msg.src) in responses and is_tester_present_response(msg, subaddr): | 
				
			||||||
 | 
					            if debug: | 
				
			||||||
 | 
					              print(f"CAN-RX: {hex(msg.address)} - 0x{bytes.hex(msg.dat)}") | 
				
			||||||
 | 
					              if (msg.address, subaddr, msg.src) in ecu_responses: | 
				
			||||||
 | 
					                print(f"Duplicate ECU address: {hex(msg.address)}") | 
				
			||||||
 | 
					            ecu_responses.add((msg.address, subaddr, msg.src)) | 
				
			||||||
 | 
					  except Exception: | 
				
			||||||
 | 
					    cloudlog.warning(f"ECU addr scan exception: {traceback.format_exc()}") | 
				
			||||||
 | 
					  return ecu_responses | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					if __name__ == "__main__": | 
				
			||||||
 | 
					  import argparse | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  parser = argparse.ArgumentParser(description='Get addresses of all ECUs') | 
				
			||||||
 | 
					  parser.add_argument('--debug', action='store_true') | 
				
			||||||
 | 
					  args = parser.parse_args() | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  logcan = messaging.sub_sock('can') | 
				
			||||||
 | 
					  sendcan = messaging.pub_sock('sendcan') | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  time.sleep(1.0) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  print("Getting ECU addresses ...") | 
				
			||||||
 | 
					  ecu_addrs = get_all_ecu_addrs(logcan, sendcan, 1, debug=args.debug) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  print() | 
				
			||||||
 | 
					  print("Found ECUs on addresses:") | 
				
			||||||
 | 
					  for addr, subaddr, bus in ecu_addrs: | 
				
			||||||
 | 
					    msg = f"  0x{hex(addr)}" | 
				
			||||||
 | 
					    if subaddr is not None: | 
				
			||||||
 | 
					      msg += f" (sub-address: 0x{hex(subaddr)})" | 
				
			||||||
 | 
					    print(msg) | 
				
			||||||
					Loading…
					
					
				
		Reference in new issue