You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							109 lines
						
					
					
						
							3.4 KiB
						
					
					
				
			
		
		
	
	
							109 lines
						
					
					
						
							3.4 KiB
						
					
					
				| #!/usr/bin/env python3
 | |
| import argparse
 | |
| import binascii
 | |
| import time
 | |
| from collections import defaultdict
 | |
| 
 | |
| import cereal.messaging as messaging
 | |
| from openpilot.selfdrive.debug.can_table import can_table
 | |
| from openpilot.tools.lib.logreader import LogIterable, LogReader
 | |
| 
 | |
| RED = '\033[91m'
 | |
| CLEAR = '\033[0m'
 | |
| 
 | |
| def update(msgs, bus, dat, low_to_high, high_to_low, quiet=False):
 | |
|   for x in msgs:
 | |
|     if x.which() != 'can':
 | |
|       continue
 | |
| 
 | |
|     for y in x.can:
 | |
|       if y.src == bus:
 | |
|         dat[y.address] = y.dat
 | |
| 
 | |
|         i = int.from_bytes(y.dat, byteorder='big')
 | |
|         l_h = low_to_high[y.address]
 | |
|         h_l = high_to_low[y.address]
 | |
| 
 | |
|         change = None
 | |
|         if (i | l_h) != l_h:
 | |
|           low_to_high[y.address] = i | l_h
 | |
|           change = "+"
 | |
| 
 | |
|         if (~i | h_l) != h_l:
 | |
|           high_to_low[y.address] = ~i | h_l
 | |
|           change = "-"
 | |
| 
 | |
|         if change and not quiet:
 | |
|           print(f"{time.monotonic():.2f}\t{hex(y.address)} ({y.address})\t{change}{binascii.hexlify(y.dat)}")
 | |
| 
 | |
| 
 | |
| def can_printer(bus=0, init_msgs=None, new_msgs=None, table=False):
 | |
|   logcan = messaging.sub_sock('can', timeout=10)
 | |
| 
 | |
|   dat = defaultdict(int)
 | |
|   low_to_high = defaultdict(int)
 | |
|   high_to_low = defaultdict(int)
 | |
| 
 | |
|   if init_msgs is not None:
 | |
|     update(init_msgs, bus, dat, low_to_high, high_to_low, quiet=True)
 | |
| 
 | |
|   low_to_high_init = low_to_high.copy()
 | |
|   high_to_low_init = high_to_low.copy()
 | |
| 
 | |
|   if new_msgs is not None:
 | |
|     update(new_msgs, bus, dat, low_to_high, high_to_low)
 | |
|   else:
 | |
|     # Live mode
 | |
|     print(f"Waiting for messages on bus {bus}")
 | |
|     try:
 | |
|       while 1:
 | |
|         can_recv = messaging.drain_sock(logcan)
 | |
|         update(can_recv, bus, dat, low_to_high, high_to_low)
 | |
|         time.sleep(0.02)
 | |
|     except KeyboardInterrupt:
 | |
|       pass
 | |
| 
 | |
|   print("\n\n")
 | |
|   tables = ""
 | |
|   for addr in sorted(dat.keys()):
 | |
|     init = low_to_high_init[addr] & high_to_low_init[addr]
 | |
|     now = low_to_high[addr] & high_to_low[addr]
 | |
|     d = now & ~init
 | |
|     if d == 0:
 | |
|       continue
 | |
|     b = d.to_bytes(len(dat[addr]), byteorder='big')
 | |
| 
 | |
|     byts = ''.join([(c if c == '0' else f'{RED}{c}{CLEAR}') for c in str(binascii.hexlify(b))[2:-1]])
 | |
|     header = f"{hex(addr).ljust(6)}({str(addr).ljust(4)})"
 | |
|     print(header, byts)
 | |
|     tables += f"{header}\n"
 | |
|     tables += can_table(b) + "\n\n"
 | |
| 
 | |
|   if table:
 | |
|     print(tables)
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|   desc = """Collects messages and prints when a new bit transition is observed.
 | |
|   This is very useful to find signals based on user triggered actions, such as blinkers and seatbelt.
 | |
|   Leave the script running until no new transitions are seen, then perform the action."""
 | |
|   parser = argparse.ArgumentParser(description=desc,
 | |
|                                    formatter_class=argparse.ArgumentDefaultsHelpFormatter)
 | |
|   parser.add_argument("--bus", type=int, help="CAN bus to print out", default=0)
 | |
|   parser.add_argument("--table", action="store_true", help="Print a cabana-like table")
 | |
|   parser.add_argument("init", type=str, nargs='?', help="Route or segment to initialize with. Use empty quotes to compare against all zeros.")
 | |
|   parser.add_argument("comp", type=str, nargs='?', help="Route or segment to compare against init")
 | |
| 
 | |
|   args = parser.parse_args()
 | |
| 
 | |
|   init_lr: LogIterable | None = None
 | |
|   new_lr: LogIterable | None = None
 | |
| 
 | |
|   if args.init:
 | |
|     if args.init == '':
 | |
|       init_lr = []
 | |
|     else:
 | |
|       init_lr = LogReader(args.init)
 | |
|   if args.comp:
 | |
|     new_lr = LogReader(args.comp)
 | |
| 
 | |
|   can_printer(args.bus, init_msgs=init_lr, new_msgs=new_lr, table=args.table)
 | |
| 
 |