|  |  |  | #!/usr/bin/env python3
 | 
					
						
							|  |  |  | import argparse
 | 
					
						
							|  |  |  | import binascii
 | 
					
						
							|  |  |  | import time
 | 
					
						
							|  |  |  | from collections import defaultdict
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import cereal.messaging as messaging
 | 
					
						
							|  |  |  | from openpilot.selfdrive.debug.can_table import can_table
 | 
					
						
							|  |  |  | from openpilot.tools.lib.logreader import logreader_from_route_or_segment
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | RED = '\033[91m'
 | 
					
						
							|  |  |  | CLEAR = '\033[0m'
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def update(msgs, bus, dat, low_to_high, high_to_low, quiet=False):
 | 
					
						
							|  |  |  |   for x in msgs:
 | 
					
						
							|  |  |  |     if x.which() != 'can':
 | 
					
						
							|  |  |  |       continue
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     for y in x.can:
 | 
					
						
							|  |  |  |       if y.src == bus:
 | 
					
						
							|  |  |  |         dat[y.address] = y.dat
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         i = int.from_bytes(y.dat, byteorder='big')
 | 
					
						
							|  |  |  |         l_h = low_to_high[y.address]
 | 
					
						
							|  |  |  |         h_l = high_to_low[y.address]
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         change = None
 | 
					
						
							|  |  |  |         if (i | l_h) != l_h:
 | 
					
						
							|  |  |  |           low_to_high[y.address] = i | l_h
 | 
					
						
							|  |  |  |           change = "+"
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if (~i | h_l) != h_l:
 | 
					
						
							|  |  |  |           high_to_low[y.address] = ~i | h_l
 | 
					
						
							|  |  |  |           change = "-"
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if change and not quiet:
 | 
					
						
							|  |  |  |           print(f"{time.monotonic():.2f}\t{hex(y.address)} ({y.address})\t{change}{binascii.hexlify(y.dat)}")
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def can_printer(bus=0, init_msgs=None, new_msgs=None, table=False):
 | 
					
						
							|  |  |  |   logcan = messaging.sub_sock('can', timeout=10)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   dat = defaultdict(int)
 | 
					
						
							|  |  |  |   low_to_high = defaultdict(int)
 | 
					
						
							|  |  |  |   high_to_low = defaultdict(int)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   if init_msgs is not None:
 | 
					
						
							|  |  |  |     update(init_msgs, bus, dat, low_to_high, high_to_low, quiet=True)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   low_to_high_init = low_to_high.copy()
 | 
					
						
							|  |  |  |   high_to_low_init = high_to_low.copy()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   if new_msgs is not None:
 | 
					
						
							|  |  |  |     update(new_msgs, bus, dat, low_to_high, high_to_low)
 | 
					
						
							|  |  |  |   else:
 | 
					
						
							|  |  |  |     # Live mode
 | 
					
						
							|  |  |  |     print(f"Waiting for messages on bus {bus}")
 | 
					
						
							|  |  |  |     try:
 | 
					
						
							|  |  |  |       while 1:
 | 
					
						
							|  |  |  |         can_recv = messaging.drain_sock(logcan)
 | 
					
						
							|  |  |  |         update(can_recv, bus, dat, low_to_high, high_to_low)
 | 
					
						
							|  |  |  |         time.sleep(0.02)
 | 
					
						
							|  |  |  |     except KeyboardInterrupt:
 | 
					
						
							|  |  |  |       pass
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   print("\n\n")
 | 
					
						
							|  |  |  |   tables = ""
 | 
					
						
							|  |  |  |   for addr in sorted(dat.keys()):
 | 
					
						
							|  |  |  |     init = low_to_high_init[addr] & high_to_low_init[addr]
 | 
					
						
							|  |  |  |     now = low_to_high[addr] & high_to_low[addr]
 | 
					
						
							|  |  |  |     d = now & ~init
 | 
					
						
							|  |  |  |     if d == 0:
 | 
					
						
							|  |  |  |       continue
 | 
					
						
							|  |  |  |     b = d.to_bytes(len(dat[addr]), byteorder='big')
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     byts = ''.join([(c if c == '0' else f'{RED}{c}{CLEAR}') for c in str(binascii.hexlify(b))[2:-1]])
 | 
					
						
							|  |  |  |     header = f"{hex(addr).ljust(6)}({str(addr).ljust(4)})"
 | 
					
						
							|  |  |  |     print(header, byts)
 | 
					
						
							|  |  |  |     tables += f"{header}\n"
 | 
					
						
							|  |  |  |     tables += can_table(b) + "\n\n"
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   if table:
 | 
					
						
							|  |  |  |     print(tables)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == "__main__":
 | 
					
						
							|  |  |  |   desc = """Collects messages and prints when a new bit transition is observed.
 | 
					
						
							|  |  |  |   This is very useful to find signals based on user triggered actions, such as blinkers and seatbelt.
 | 
					
						
							|  |  |  |   Leave the script running until no new transitions are seen, then perform the action."""
 | 
					
						
							|  |  |  |   parser = argparse.ArgumentParser(description=desc,
 | 
					
						
							|  |  |  |                                    formatter_class=argparse.ArgumentDefaultsHelpFormatter)
 | 
					
						
							|  |  |  |   parser.add_argument("--bus", type=int, help="CAN bus to print out", default=0)
 | 
					
						
							|  |  |  |   parser.add_argument("--table", action="store_true", help="Print a cabana-like table")
 | 
					
						
							|  |  |  |   parser.add_argument("init", type=str, nargs='?', help="Route or segment to initialize with. Use empty quotes to compare against all zeros.")
 | 
					
						
							|  |  |  |   parser.add_argument("comp", type=str, nargs='?', help="Route or segment to compare against init")
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   args = parser.parse_args()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   init_lr, new_lr = None, None
 | 
					
						
							|  |  |  |   if args.init:
 | 
					
						
							|  |  |  |     if args.init == '':
 | 
					
						
							|  |  |  |       init_lr = []
 | 
					
						
							|  |  |  |     else:
 | 
					
						
							|  |  |  |       init_lr = logreader_from_route_or_segment(args.init)
 | 
					
						
							|  |  |  |   if args.comp:
 | 
					
						
							|  |  |  |     new_lr = logreader_from_route_or_segment(args.comp)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   can_printer(args.bus, init_msgs=init_lr, new_msgs=new_lr, table=args.table)
 |