parent
30ae47de05
commit
1680f1e251
2 changed files with 69 additions and 29 deletions
@ -1,46 +1,75 @@ |
|||||||
#!/usr/bin/env python3 |
#!/usr/bin/env python3 |
||||||
|
import argparse |
||||||
import binascii |
import binascii |
||||||
import sys |
import time |
||||||
from collections import defaultdict |
from collections import defaultdict |
||||||
|
|
||||||
import cereal.messaging as messaging |
import cereal.messaging as messaging |
||||||
from common.realtime import sec_since_boot |
from tools.lib.logreader import logreader_from_route_or_segment |
||||||
|
|
||||||
|
|
||||||
def can_printer(bus=0): |
def update(msgs, bus, low_to_high, high_to_low, quiet=False): |
||||||
"""Collects messages and prints when a new bit transition is observed. |
for x in msgs: |
||||||
This is very useful to find signals based on user triggered actions, such as blinkers and seatbelt. |
if x.which() != 'can': |
||||||
Leave the script running until no new transitions are seen, then perform the action.""" |
continue |
||||||
|
|
||||||
|
for y in x.can: |
||||||
|
if y.src == bus: |
||||||
|
i = int.from_bytes(y.dat, byteorder='big') |
||||||
|
|
||||||
|
l_h = low_to_high[y.address] |
||||||
|
h_l = high_to_low[y.address] |
||||||
|
|
||||||
|
change = None |
||||||
|
if (i | l_h) != l_h: |
||||||
|
low_to_high[y.address] = i | l_h |
||||||
|
change = "+" |
||||||
|
|
||||||
|
if (~i | h_l) != h_l: |
||||||
|
high_to_low[y.address] = ~i | h_l |
||||||
|
change = "-" |
||||||
|
|
||||||
|
if change and not quiet: |
||||||
|
print(f"{time.monotonic():.2f}\t{hex(y.address)} ({y.address})\t{change}{binascii.hexlify(y.dat)}") |
||||||
|
|
||||||
|
|
||||||
|
def can_printer(bus=0, init_msgs=None, new_msgs=None): |
||||||
logcan = messaging.sub_sock('can') |
logcan = messaging.sub_sock('can') |
||||||
|
|
||||||
low_to_high = defaultdict(int) |
low_to_high = defaultdict(int) |
||||||
high_to_low = defaultdict(int) |
high_to_low = defaultdict(int) |
||||||
|
|
||||||
while 1: |
if init_msgs is not None: |
||||||
can_recv = messaging.drain_sock(logcan, wait_for_one=True) |
update(init_msgs, bus, low_to_high, high_to_low, quiet=True) |
||||||
for x in can_recv: |
|
||||||
for y in x.can: |
|
||||||
if y.src == bus: |
|
||||||
i = int.from_bytes(y.dat, byteorder='big') |
|
||||||
|
|
||||||
l_h = low_to_high[y.address] |
if new_msgs is not None: |
||||||
h_l = high_to_low[y.address] |
update(new_msgs, bus, low_to_high, high_to_low) |
||||||
|
else: |
||||||
|
# Live mode |
||||||
|
while 1: |
||||||
|
can_recv = messaging.drain_sock(logcan, wait_for_one=True) |
||||||
|
update(can_recv, bus, low_to_high, high_to_low) |
||||||
|
|
||||||
change = None |
|
||||||
if (i | l_h) != l_h: |
|
||||||
low_to_high[y.address] = i | l_h |
|
||||||
change = "+" |
|
||||||
|
|
||||||
if (~i | h_l) != h_l: |
if __name__ == "__main__": |
||||||
high_to_low[y.address] = ~i | h_l |
desc = """Collects messages and prints when a new bit transition is observed. |
||||||
change = "-" |
This is very useful to find signals based on user triggered actions, such as blinkers and seatbelt. |
||||||
|
Leave the script running until no new transitions are seen, then perform the action.""" |
||||||
|
|
||||||
if change: |
parser = argparse.ArgumentParser(description=desc, |
||||||
print(f"{sec_since_boot():.2f}\t{hex(y.address)} ({y.address})\t{change}{binascii.hexlify(y.dat)}") |
formatter_class=argparse.ArgumentDefaultsHelpFormatter) |
||||||
|
|
||||||
|
parser.add_argument("--bus", type=int, help="CAN bus to print out", default=0) |
||||||
|
|
||||||
if __name__ == "__main__": |
parser.add_argument("--init", type=str, help="Route or segment to initialize with") |
||||||
if len(sys.argv) > 1: |
parser.add_argument("--comp", type=str, help="Route or segment to compare against init") |
||||||
can_printer(int(sys.argv[1])) |
|
||||||
else: |
args = parser.parse_args() |
||||||
can_printer() |
|
||||||
|
init_lr, new_lr = None, None |
||||||
|
if args.init: |
||||||
|
init_lr = logreader_from_route_or_segment(args.init) |
||||||
|
if args.comp: |
||||||
|
new_lr = logreader_from_route_or_segment(args.comp) |
||||||
|
|
||||||
|
can_printer(args.bus, init_msgs=init_lr, new_msgs=new_lr) |
||||||
|
Loading…
Reference in new issue