diff --git a/selfdrive/debug/can_print_changes.py b/selfdrive/debug/can_print_changes.py index 4fa775ac1..caf001eea 100755 --- a/selfdrive/debug/can_print_changes.py +++ b/selfdrive/debug/can_print_changes.py @@ -1,46 +1,75 @@ #!/usr/bin/env python3 +import argparse import binascii -import sys +import time from collections import defaultdict import cereal.messaging as messaging -from common.realtime import sec_since_boot +from tools.lib.logreader import logreader_from_route_or_segment -def can_printer(bus=0): - """Collects messages and prints when a new bit transition is observed. - This is very useful to find signals based on user triggered actions, such as blinkers and seatbelt. - Leave the script running until no new transitions are seen, then perform the action.""" +def update(msgs, bus, low_to_high, high_to_low, quiet=False): + for x in msgs: + if x.which() != 'can': + continue + + for y in x.can: + if y.src == bus: + i = int.from_bytes(y.dat, byteorder='big') + + l_h = low_to_high[y.address] + h_l = high_to_low[y.address] + + change = None + if (i | l_h) != l_h: + low_to_high[y.address] = i | l_h + change = "+" + + if (~i | h_l) != h_l: + high_to_low[y.address] = ~i | h_l + change = "-" + + if change and not quiet: + print(f"{time.monotonic():.2f}\t{hex(y.address)} ({y.address})\t{change}{binascii.hexlify(y.dat)}") + + +def can_printer(bus=0, init_msgs=None, new_msgs=None): logcan = messaging.sub_sock('can') low_to_high = defaultdict(int) high_to_low = defaultdict(int) - while 1: - can_recv = messaging.drain_sock(logcan, wait_for_one=True) - for x in can_recv: - for y in x.can: - if y.src == bus: - i = int.from_bytes(y.dat, byteorder='big') + if init_msgs is not None: + update(init_msgs, bus, low_to_high, high_to_low, quiet=True) - l_h = low_to_high[y.address] - h_l = high_to_low[y.address] + if new_msgs is not None: + update(new_msgs, bus, low_to_high, high_to_low) + else: + # Live mode + while 1: + can_recv = messaging.drain_sock(logcan, wait_for_one=True) + update(can_recv, bus, low_to_high, high_to_low) - change = None - if (i | l_h) != l_h: - low_to_high[y.address] = i | l_h - change = "+" - if (~i | h_l) != h_l: - high_to_low[y.address] = ~i | h_l - change = "-" +if __name__ == "__main__": + desc = """Collects messages and prints when a new bit transition is observed. + This is very useful to find signals based on user triggered actions, such as blinkers and seatbelt. + Leave the script running until no new transitions are seen, then perform the action.""" - if change: - print(f"{sec_since_boot():.2f}\t{hex(y.address)} ({y.address})\t{change}{binascii.hexlify(y.dat)}") + parser = argparse.ArgumentParser(description=desc, + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument("--bus", type=int, help="CAN bus to print out", default=0) -if __name__ == "__main__": - if len(sys.argv) > 1: - can_printer(int(sys.argv[1])) - else: - can_printer() + parser.add_argument("--init", type=str, help="Route or segment to initialize with") + parser.add_argument("--comp", type=str, help="Route or segment to compare against init") + + args = parser.parse_args() + + init_lr, new_lr = None, None + if args.init: + init_lr = logreader_from_route_or_segment(args.init) + if args.comp: + new_lr = logreader_from_route_or_segment(args.comp) + + can_printer(args.bus, init_msgs=init_lr, new_msgs=new_lr) diff --git a/tools/lib/logreader.py b/tools/lib/logreader.py index c8d506b4b..5597a5ffd 100755 --- a/tools/lib/logreader.py +++ b/tools/lib/logreader.py @@ -5,8 +5,9 @@ import bz2 import urllib.parse import capnp -from tools.lib.filereader import FileReader from cereal import log as capnp_log +from tools.lib.filereader import FileReader +from tools.lib.route import Route, SegmentName # this is an iterator itself, and uses private variables from LogReader class MultiLogIterator: @@ -99,6 +100,16 @@ class LogReader: else: yield ent + +def logreader_from_route_or_segment(r): + sn = SegmentName(r, allow_route_name=True) + route = Route(sn.route_name.canonical_name) + if sn.segment_num < 0: + return MultiLogIterator(route.log_paths()) + else: + return LogReader(route.log_paths()[sn.segment_num]) + + if __name__ == "__main__": import codecs # capnproto <= 0.8.0 throws errors converting byte data to string