|
|
@ -8,24 +8,13 @@ import signal |
|
|
|
from collections import defaultdict |
|
|
|
from collections import defaultdict |
|
|
|
|
|
|
|
|
|
|
|
import cereal.messaging as messaging |
|
|
|
import cereal.messaging as messaging |
|
|
|
|
|
|
|
from tools.lib.logreader import logreader_from_route_or_segment |
|
|
|
|
|
|
|
|
|
|
|
def sigint_handler(signal, frame): |
|
|
|
def sigint_handler(signal, frame): |
|
|
|
print("handler!") |
|
|
|
|
|
|
|
exit(0) |
|
|
|
exit(0) |
|
|
|
signal.signal(signal.SIGINT, sigint_handler) |
|
|
|
signal.signal(signal.SIGINT, sigint_handler) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
class SteeringAccuracyTool: |
|
|
|
|
|
|
|
|
|
|
|
parser = argparse.ArgumentParser(description='Sniff a communication socket') |
|
|
|
|
|
|
|
parser.add_argument('control_type', help="[pid|indi|lqr|angle]") |
|
|
|
|
|
|
|
parser.add_argument('--addr', default='127.0.0.1', help="IP address for optional ZMQ listener, default to msgq") |
|
|
|
|
|
|
|
parser.add_argument('--group', default='all', help="speed group to display, [crawl|slow|medium|fast|veryfast|germany|all], default to all") |
|
|
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if args.addr != "127.0.0.1": |
|
|
|
|
|
|
|
os.environ["ZMQ"] = "1" |
|
|
|
|
|
|
|
messaging.context = messaging.Context() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
all_groups = {"germany": (45, "45 - up m/s // 162 - up km/h // 101 - up mph"), |
|
|
|
all_groups = {"germany": (45, "45 - up m/s // 162 - up km/h // 101 - up mph"), |
|
|
|
"veryfast": (35, "35 - 45 m/s // 126 - 162 km/h // 78 - 101 mph"), |
|
|
|
"veryfast": (35, "35 - 45 m/s // 126 - 162 km/h // 78 - 101 mph"), |
|
|
|
"fast": (25, "25 - 35 m/s // 90 - 126 km/h // 56 - 78 mph"), |
|
|
|
"fast": (25, "25 - 35 m/s // 90 - 126 km/h // 56 - 78 mph"), |
|
|
@ -33,39 +22,28 @@ if __name__ == "__main__": |
|
|
|
"slow": (5, " 5 - 15 m/s // 18 - 54 km/h // 11 - 34 mph"), |
|
|
|
"slow": (5, " 5 - 15 m/s // 18 - 54 km/h // 11 - 34 mph"), |
|
|
|
"crawl": (0, " 0 - 5 m/s // 0 - 18 km/h // 0 - 11 mph")} |
|
|
|
"crawl": (0, " 0 - 5 m/s // 0 - 18 km/h // 0 - 11 mph")} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, args): |
|
|
|
|
|
|
|
self.msg_cnt = 0 |
|
|
|
|
|
|
|
self.cnt = 0 |
|
|
|
|
|
|
|
self.total_error = 0 |
|
|
|
|
|
|
|
|
|
|
|
if args.group == "all": |
|
|
|
if args.group == "all": |
|
|
|
display_groups = all_groups.keys() |
|
|
|
self.display_groups = self.all_groups.keys() |
|
|
|
elif args.group in all_groups.keys(): |
|
|
|
elif args.group in self.all_groups.keys(): |
|
|
|
display_groups = [args.group] |
|
|
|
self.display_groups = [args.group] |
|
|
|
else: |
|
|
|
else: |
|
|
|
raise ValueError("invalid speed group, see help") |
|
|
|
raise ValueError("invalid speed group, see help") |
|
|
|
|
|
|
|
|
|
|
|
speed_group_stats = {} |
|
|
|
self.speed_group_stats = {} |
|
|
|
for group in all_groups: |
|
|
|
for group in self.all_groups: |
|
|
|
speed_group_stats[group] = defaultdict(lambda: {'err': 0, "cnt": 0, "=": 0, "+": 0, "-": 0, "steer": 0, "limited": 0, "saturated": 0, "dpp": 0}) |
|
|
|
self.speed_group_stats[group] = defaultdict(lambda: {'err': 0, "cnt": 0, "=": 0, "+": 0, "-": 0, "steer": 0, "limited": 0, "saturated": 0, "dpp": 0}) |
|
|
|
|
|
|
|
|
|
|
|
carControl = messaging.sub_sock('carControl', addr=args.addr, conflate=True) |
|
|
|
def update(self, sm): |
|
|
|
sm = messaging.SubMaster(['carState', 'carControl', 'controlsState', 'lateralPlan'], addr=args.addr) |
|
|
|
self.msg_cnt += 1 |
|
|
|
time.sleep(1) # Make sure all submaster data is available before going further |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
msg_cnt = 0 |
|
|
|
lateralControlState = sm['controlsState'].lateralControlState |
|
|
|
cnt = 0 |
|
|
|
control_type = list(lateralControlState.to_dict().keys())[0] |
|
|
|
total_error = 0 |
|
|
|
control_state = lateralControlState.__getattr__(control_type) |
|
|
|
|
|
|
|
|
|
|
|
while messaging.recv_one(carControl): |
|
|
|
|
|
|
|
sm.update() |
|
|
|
|
|
|
|
msg_cnt += 1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if args.control_type == "pid": |
|
|
|
|
|
|
|
control_state = sm['controlsState'].lateralControlState.pidState |
|
|
|
|
|
|
|
elif args.control_type == "indi": |
|
|
|
|
|
|
|
control_state = sm['controlsState'].lateralControlState.indiState |
|
|
|
|
|
|
|
elif args.control_type == "lqr": |
|
|
|
|
|
|
|
control_state = sm['controlsState'].lateralControlState.lqrState |
|
|
|
|
|
|
|
elif args.control_type == "angle": |
|
|
|
|
|
|
|
control_state = sm['controlsState'].lateralControlState.angleState |
|
|
|
|
|
|
|
else: |
|
|
|
|
|
|
|
raise ValueError("invalid lateral control type, see help") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
v_ego = sm['carState'].vEgo |
|
|
|
v_ego = sm['carState'].vEgo |
|
|
|
active = sm['controlsState'].active |
|
|
|
active = sm['controlsState'].active |
|
|
@ -77,10 +55,10 @@ if __name__ == "__main__": |
|
|
|
d_path_points = sm['lateralPlan'].dPathPoints |
|
|
|
d_path_points = sm['lateralPlan'].dPathPoints |
|
|
|
# must be engaged, not at standstill, not overriding steering, and not changing lanes |
|
|
|
# must be engaged, not at standstill, not overriding steering, and not changing lanes |
|
|
|
if active and not standstill and not overriding and not changing_lanes: |
|
|
|
if active and not standstill and not overriding and not changing_lanes: |
|
|
|
cnt += 1 |
|
|
|
self.cnt += 1 |
|
|
|
|
|
|
|
|
|
|
|
# wait 5 seconds after engage / standstill / override / lane change |
|
|
|
# wait 5 seconds after engage / standstill / override / lane change |
|
|
|
if cnt >= 500: |
|
|
|
if self.cnt >= 500: |
|
|
|
actual_angle = control_state.steeringAngleDeg |
|
|
|
actual_angle = control_state.steeringAngleDeg |
|
|
|
desired_angle = control_state.steeringAngleDesiredDeg |
|
|
|
desired_angle = control_state.steeringAngleDesiredDeg |
|
|
|
|
|
|
|
|
|
|
@ -91,41 +69,88 @@ if __name__ == "__main__": |
|
|
|
angle_error = round(angle_error, 2) |
|
|
|
angle_error = round(angle_error, 2) |
|
|
|
angle_abs = int(abs(round(desired_angle, 0))) |
|
|
|
angle_abs = int(abs(round(desired_angle, 0))) |
|
|
|
|
|
|
|
|
|
|
|
for group, group_props in all_groups.items(): |
|
|
|
for group, group_props in self.all_groups.items(): |
|
|
|
if v_ego > group_props[0]: |
|
|
|
if v_ego > group_props[0]: |
|
|
|
# collect stats |
|
|
|
# collect stats |
|
|
|
speed_group_stats[group][angle_abs]["cnt"] += 1 |
|
|
|
self.speed_group_stats[group][angle_abs]["cnt"] += 1 |
|
|
|
speed_group_stats[group][angle_abs]["err"] += angle_error |
|
|
|
self.speed_group_stats[group][angle_abs]["err"] += angle_error |
|
|
|
speed_group_stats[group][angle_abs]["steer"] += abs(steer) |
|
|
|
self.speed_group_stats[group][angle_abs]["steer"] += abs(steer) |
|
|
|
if len(d_path_points): |
|
|
|
if len(d_path_points): |
|
|
|
speed_group_stats[group][angle_abs]["dpp"] += abs(d_path_points[0]) |
|
|
|
self.speed_group_stats[group][angle_abs]["dpp"] += abs(d_path_points[0]) |
|
|
|
if steer_limited: |
|
|
|
if steer_limited: |
|
|
|
speed_group_stats[group][angle_abs]["limited"] += 1 |
|
|
|
self.speed_group_stats[group][angle_abs]["limited"] += 1 |
|
|
|
if control_state.saturated: |
|
|
|
if control_state.saturated: |
|
|
|
speed_group_stats[group][angle_abs]["saturated"] += 1 |
|
|
|
self.speed_group_stats[group][angle_abs]["saturated"] += 1 |
|
|
|
if actual_angle == desired_angle: |
|
|
|
if actual_angle == desired_angle: |
|
|
|
speed_group_stats[group][angle_abs]["="] += 1 |
|
|
|
self.speed_group_stats[group][angle_abs]["="] += 1 |
|
|
|
else: |
|
|
|
else: |
|
|
|
if desired_angle == 0.: |
|
|
|
if desired_angle == 0.: |
|
|
|
overshoot = True |
|
|
|
overshoot = True |
|
|
|
else: |
|
|
|
else: |
|
|
|
overshoot = desired_angle < actual_angle if desired_angle > 0. else desired_angle > actual_angle |
|
|
|
overshoot = desired_angle < actual_angle if desired_angle > 0. else desired_angle > actual_angle |
|
|
|
speed_group_stats[group][angle_abs]["+" if overshoot else "-"] += 1 |
|
|
|
self.speed_group_stats[group][angle_abs]["+" if overshoot else "-"] += 1 |
|
|
|
break |
|
|
|
break |
|
|
|
else: |
|
|
|
else: |
|
|
|
cnt = 0 |
|
|
|
self.cnt = 0 |
|
|
|
|
|
|
|
|
|
|
|
if msg_cnt % 100 == 0: |
|
|
|
if self.msg_cnt % 100 == 0: |
|
|
|
print(chr(27) + "[2J") |
|
|
|
print(chr(27) + "[2J") |
|
|
|
if cnt != 0: |
|
|
|
if self.cnt != 0: |
|
|
|
print("COLLECTING ...\n") |
|
|
|
print("COLLECTING ...\n") |
|
|
|
else: |
|
|
|
else: |
|
|
|
print("DISABLED (not active, standstill, steering override, or lane change)\n") |
|
|
|
print("DISABLED (not active, standstill, steering override, or lane change)\n") |
|
|
|
for group in display_groups: |
|
|
|
for group in self.display_groups: |
|
|
|
if len(speed_group_stats[group]) > 0: |
|
|
|
if len(self.speed_group_stats[group]) > 0: |
|
|
|
print(f"speed group: {group:10s} {all_groups[group][1]:>96s}") |
|
|
|
print(f"speed group: {group:10s} {self.all_groups[group][1]:>96s}") |
|
|
|
print(f" {'-'*118}") |
|
|
|
print(f" {'-'*118}") |
|
|
|
for k in sorted(speed_group_stats[group].keys()): |
|
|
|
for k in sorted(self.speed_group_stats[group].keys()): |
|
|
|
v = speed_group_stats[group][k] |
|
|
|
v = self.speed_group_stats[group][k] |
|
|
|
print(f' {k:#2}° | actuator:{int(v["steer"] / v["cnt"] * 100):#3}% | error: {round(v["err"] / v["cnt"], 2):2.2f}° | -:{int(v["-"] / v["cnt"] * 100):#3}% | =:{int(v["="] / v["cnt"] * 100):#3}% | +:{int(v["+"] / v["cnt"] * 100):#3}% | lim:{v["limited"]:#5} | sat:{v["saturated"]:#5} | path dev: {round(v["dpp"] / v["cnt"], 2):2.2f}m | total: {v["cnt"]:#5}') |
|
|
|
print(f' {k:#2}° | actuator:{int(v["steer"] / v["cnt"] * 100):#3}% | error: {round(v["err"] / v["cnt"], 2):2.2f}° | -:{int(v["-"] / v["cnt"] * 100):#3}% | =:{int(v["="] / v["cnt"] * 100):#3}% | +:{int(v["+"] / v["cnt"] * 100):#3}% | lim:{v["limited"]:#5} | sat:{v["saturated"]:#5} | path dev: {round(v["dpp"] / v["cnt"], 2):2.2f}m | total: {v["cnt"]:#5}') |
|
|
|
print("") |
|
|
|
print("") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
parser = argparse.ArgumentParser(description='Steering accuracy measurement tool') |
|
|
|
|
|
|
|
parser.add_argument('--route', help="route name") |
|
|
|
|
|
|
|
parser.add_argument('--addr', default='127.0.0.1', help="IP address for optional ZMQ listener, default to msgq") |
|
|
|
|
|
|
|
parser.add_argument('--group', default='all', help="speed group to display, [crawl|slow|medium|fast|veryfast|germany|all], default to all") |
|
|
|
|
|
|
|
parser.add_argument('--cache', default=False, action='store_true', help="use cached data, default to False") |
|
|
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if args.cache: |
|
|
|
|
|
|
|
os.environ['FILEREADER_CACHE'] = '1' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tool = SteeringAccuracyTool(args) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if args.route is not None: |
|
|
|
|
|
|
|
print(f"loading {args.route}...") |
|
|
|
|
|
|
|
lr = logreader_from_route_or_segment(args.route, sort_by_time=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sm = {} |
|
|
|
|
|
|
|
for msg in lr: |
|
|
|
|
|
|
|
if msg.which() == 'carState': |
|
|
|
|
|
|
|
sm['carState'] = msg.carState |
|
|
|
|
|
|
|
elif msg.which() == 'carControl': |
|
|
|
|
|
|
|
sm['carControl'] = msg.carControl |
|
|
|
|
|
|
|
elif msg.which() == 'controlsState': |
|
|
|
|
|
|
|
sm['controlsState'] = msg.controlsState |
|
|
|
|
|
|
|
elif msg.which() == 'lateralPlan': |
|
|
|
|
|
|
|
sm['lateralPlan'] = msg.lateralPlan |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if msg.which() == 'carControl' and 'carState' in sm and 'controlsState' in sm and 'lateralPlan' in sm: |
|
|
|
|
|
|
|
tool.update(sm) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
else: |
|
|
|
|
|
|
|
if args.addr != "127.0.0.1": |
|
|
|
|
|
|
|
os.environ["ZMQ"] = "1" |
|
|
|
|
|
|
|
messaging.context = messaging.Context() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
carControl = messaging.sub_sock('carControl', addr=args.addr, conflate=True) |
|
|
|
|
|
|
|
sm = messaging.SubMaster(['carState', 'carControl', 'controlsState', 'lateralPlan'], addr=args.addr) |
|
|
|
|
|
|
|
time.sleep(1) # Make sure all submaster data is available before going further |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("waiting for messages...") |
|
|
|
|
|
|
|
while messaging.recv_one(carControl): |
|
|
|
|
|
|
|
sm.update() |
|
|
|
|
|
|
|
tool.update(sm) |
|
|
|