diff --git a/common/colors.py b/common/colors.py new file mode 100644 index 0000000000..95aab123a4 --- /dev/null +++ b/common/colors.py @@ -0,0 +1,36 @@ +class COLORS: + def __init__(self): + self.HEADER = '\033[95m' + self.OKBLUE = '\033[94m' + self.CBLUE = '\33[44m' + self.BOLD = '\033[1m' + self.CITALIC = '\33[3m' + self.OKGREEN = '\033[92m' + self.CWHITE = '\33[37m' + self.ENDC = '\033[0m' + self.CWHITE + self.UNDERLINE = '\033[4m' + self.PINK = '\33[38;5;207m' + self.PRETTY_YELLOW = self.BASE(220) + + self.RED = '\033[91m' + self.PURPLE_BG = '\33[45m' + self.YELLOW = '\033[93m' + self.BLUE_GREEN = self.BASE(85) + + self.FAIL = self.RED + # self.INFO = self.PURPLE_BG + self.INFO = self.BASE(207) + self.SUCCESS = self.OKGREEN + self.PROMPT = self.YELLOW + self.DBLUE = '\033[36m' + self.CYAN = self.BASE(39) + self.WARNING = '\033[33m' + + def BASE(self, col): # seems to support more colors + return '\33[38;5;{}m'.format(col) + + def BASEBG(self, col): # seems to support more colors + return '\33[48;5;{}m'.format(col) + + +COLORS = COLORS() diff --git a/common/op_params.py b/common/op_params.py new file mode 100644 index 0000000000..028964075b --- /dev/null +++ b/common/op_params.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python3 +import os +import json +from atomicwrites import atomic_write +from common.colors import COLORS +from common.basedir import BASEDIR +from selfdrive.hardware import TICI +try: + from common.realtime import sec_since_boot +except ImportError: + import time + sec_since_boot = time.time + +warning = lambda msg: print('{}opParams WARNING: {}{}'.format(COLORS.WARNING, msg, COLORS.ENDC)) +error = lambda msg: print('{}opParams ERROR: {}{}'.format(COLORS.FAIL, msg, COLORS.ENDC)) + +NUMBER = [float, int] # value types +NONE_OR_NUMBER = [type(None), float, int] + +BASEDIR = os.path.dirname(BASEDIR) +PARAMS_DIR = os.path.join(BASEDIR, 'community', 'params') +IMPORTED_PATH = os.path.join(PARAMS_DIR, '.imported') +OLD_PARAMS_FILE = os.path.join(BASEDIR, 'op_params.json') + + +class Param: + def __init__(self, default, allowed_types=[], description=None, *, static=False, live=False, hidden=False): # pylint: disable=dangerous-default-value + self.default_value = default # value first saved and returned if actual value isn't a valid type + if not isinstance(allowed_types, list): + allowed_types = [allowed_types] + self.allowed_types = allowed_types # allowed python value types for opEdit + self.description = description # description to be shown in opEdit + self.hidden = hidden # hide this param to user in opEdit + self.live = live # show under the live menu in opEdit + self.static = static # use cached value, never reads to update + self._create_attrs() + + def is_valid(self, value): + if not self.has_allowed_types: # always valid if no allowed types, otherwise checks to make sure + return True + return type(value) in self.allowed_types + + def _create_attrs(self): # Create attributes and check Param is valid + self.has_allowed_types = isinstance(self.allowed_types, list) and len(self.allowed_types) > 0 + self.has_description = self.description is not None + self.is_list = list in self.allowed_types + self.read_frequency = None if self.static else (1 if self.live else 10) # how often to read param file (sec) + self.last_read = -1 + if self.has_allowed_types: + assert type(self.default_value) in self.allowed_types, 'Default value type must be in specified allowed_types!' + if self.is_list: + self.allowed_types.remove(list) + + +def _read_param(key): # Returns None, False if a json error occurs + try: + with open(os.path.join(PARAMS_DIR, key), 'r') as f: + value = json.loads(f.read()) + return value, True + except json.decoder.JSONDecodeError: + return None, False + + +def _write_param(key, value): + param_path = os.path.join(PARAMS_DIR, key) + with atomic_write(param_path, overwrite=True) as f: + f.write(json.dumps(value)) + + +def _import_params(): + if os.path.exists(OLD_PARAMS_FILE) and not os.path.exists(IMPORTED_PATH): # if opParams needs to import from old params file + try: + with open(OLD_PARAMS_FILE, 'r') as f: + old_params = json.loads(f.read()) + for key in old_params: + _write_param(key, old_params[key]) + open(IMPORTED_PATH, 'w').close() + except: # pylint: disable=bare-except + pass + + +class opParams: + def __init__(self): + """ + To add your own parameter to opParams in your fork, simply add a new entry in self.fork_params, instancing a new Param class with at minimum a default value. + The allowed_types and description args are not required but highly recommended to help users edit their parameters with opEdit safely. + - The description value will be shown to users when they use opEdit to change the value of the parameter. + - The allowed_types arg is used to restrict what kinds of values can be entered with opEdit so that users can't crash openpilot with unintended behavior. + (setting a param intended to be a number with a boolean, or viceversa for example) + Limiting the range of floats or integers is still recommended when `.get`ting the parameter. + When a None value is allowed, use `type(None)` instead of None, as opEdit checks the type against the values in the arg with `isinstance()`. + - If you want your param to update within a second, specify live=True. If your param is designed to be read once, specify static=True. + Specifying neither will have the param update every 10 seconds if constantly .get() + If the param is not static, call the .get() function on it in the update function of the file you're reading from to use live updating + + Here's an example of a good fork_param entry: + self.fork_params = {'camera_offset': Param(0.06, allowed_types=NUMBER), live=True} # NUMBER allows both floats and ints + """ + + self.fork_params = { + 'SETME_X1': Param(1, NUMBER, 'Always 1', live=True), + 'SETME_X3': Param(1, NUMBER, 'Sometimes 3, mostly 1?', live=True), + 'PERCENTAGE': Param(100, NUMBER, '100 when not touching wheel, 0 when touching wheel', live=True), + 'SETME_X64': Param(100, NUMBER, 'Unsure', live=True), + 'ANGLE': Param(0, NUMBER, 'Rate limit? Lower is better?', live=True), + 'LTA_REQUEST_TYPE': Param(1, NUMBER, '1: LTA, 3: LTA for lane keeping', live=True), + 'BIT': Param(0, NUMBER, '1: LTA, 2: LTA for lane keeping', live=True), + } + + self._to_delete = [] # a list of unused params you want to delete from users' params file + self._to_reset = [] # a list of params you want reset to their default values + self._run_init() # restores, reads, and updates params + + def _run_init(self): # does first time initializing of default params + # Two required parameters for opEdit + self.fork_params['username'] = Param(None, [type(None), str, bool], 'Your identifier provided with any crash logs sent to Sentry.\nHelps the developer reach out to you if anything goes wrong') + self.fork_params['op_edit_live_mode'] = Param(False, bool, 'This parameter controls which mode opEdit starts in', hidden=True) + + self.params = self._load_params(can_import=True) + self._add_default_params() # adds missing params and resets values with invalid types to self.params + self._delete_and_reset() # removes old params + + def get(self, key=None, *, force_update=False): # key=None returns dict of all params + if key is None: + return self._get_all_params(to_update=force_update) + self._check_key_exists(key, 'get') + param_info = self.fork_params[key] + rate = param_info.read_frequency # will be None if param is static, so check below + + if (not param_info.static and sec_since_boot() - self.fork_params[key].last_read >= rate) or force_update: + value, success = _read_param(key) + self.fork_params[key].last_read = sec_since_boot() + if not success: # in case of read error, use default and overwrite param + value = param_info.default_value + _write_param(key, value) + self.params[key] = value + + if param_info.is_valid(value := self.params[key]): + return value # all good, returning user's value + print(warning('User\'s value type is not valid! Returning default')) # somehow... it should always be valid + return param_info.default_value # return default value because user's value of key is not in allowed_types to avoid crashing openpilot + + def put(self, key, value): + self._check_key_exists(key, 'put') + if not self.fork_params[key].is_valid(value): + raise Exception('opParams: Tried to put a value of invalid type!') + self.params.update({key: value}) + _write_param(key, value) + + def _load_params(self, can_import=False): + if not os.path.exists(PARAMS_DIR): + os.makedirs(PARAMS_DIR) + if can_import: + _import_params() # just imports old params. below we read them in + + params = {} + for key in os.listdir(PARAMS_DIR): # PARAMS_DIR is guaranteed to exist + if key.startswith('.') or key not in self.fork_params: + continue + value, success = _read_param(key) + if not success: + value = self.fork_params[key].default_value + _write_param(key, value) + params[key] = value + return params + + def _get_all_params(self, to_update=False): + if to_update: + self.params = self._load_params() + return {k: self.params[k] for k, p in self.fork_params.items() if k in self.params and not p.hidden} + + def _check_key_exists(self, key, met): + if key not in self.fork_params: + raise Exception('opParams: Tried to {} an unknown parameter! Key not in fork_params: {}'.format(met, key)) + + def _add_default_params(self): + for key, param in self.fork_params.items(): + if key not in self.params: + self.params[key] = param.default_value + _write_param(key, self.params[key]) + elif not param.is_valid(self.params[key]): + print(warning('Value type of user\'s {} param not in allowed types, replacing with default!'.format(key))) + self.params[key] = param.default_value + _write_param(key, self.params[key]) + + def _delete_and_reset(self): + for key in list(self.params): + if key in self._to_delete: + del self.params[key] + os.remove(os.path.join(PARAMS_DIR, key)) + elif key in self._to_reset and key in self.fork_params: + self.params[key] = self.fork_params[key].default_value + _write_param(key, self.params[key]) diff --git a/op_edit.py b/op_edit.py new file mode 100755 index 0000000000..212baad770 --- /dev/null +++ b/op_edit.py @@ -0,0 +1,325 @@ +#!/usr/bin/env python3 +import time +from common.op_params import opParams +import ast +import difflib +from common.colors import COLORS + + +class opEdit: # use by running `python /data/openpilot/op_edit.py` + def __init__(self): + self.op_params = opParams() + self.params = None + self.sleep_time = 0.5 + self.live_tuning = self.op_params.get('op_edit_live_mode') + self.username = self.op_params.get('username') + self.type_colors = {int: COLORS.BASE(179), float: COLORS.BASE(179), + bool: {False: COLORS.RED, True: COLORS.OKGREEN}, + type(None): COLORS.BASE(177), + str: COLORS.BASE(77)} + + self.last_choice = None + + self.run_init() + + def run_init(self): + if self.username is None: + self.success('\nWelcome to the {}opParams{} command line editor!'.format(COLORS.CYAN, COLORS.SUCCESS), sleep_time=0) + self.prompt('Would you like to add your Discord username for easier crash debugging for the fork owner?') + self.prompt('Your username is only used for reaching out if a crash occurs.') + + username_choice = self.input_with_options(['Y', 'N', 'don\'t ask again'], default='n')[0] + if username_choice == 0: + self.prompt('Enter a unique identifer/Discord username:') + username = '' + while username == '': + username = input('>> ').strip() + self.op_params.put('username', username) + self.username = username + self.success('Thanks! Saved your username\n' + 'Edit the \'username\' parameter at any time to update', sleep_time=1.5) + elif username_choice == 2: + self.op_params.put('username', False) + self.info('Got it, bringing you into opEdit\n' + 'Edit the \'username\' parameter at any time to update', sleep_time=1.0) + else: + self.success('\nWelcome to the {}opParams{} command line editor, {}!'.format(COLORS.CYAN, COLORS.SUCCESS, self.username), sleep_time=0) + + self.run_loop() + + def run_loop(self): + while True: + if not self.live_tuning: + self.info('Here are all your parameters:', sleep_time=0) + self.info('(non-static params update while driving)', end='\n', sleep_time=0) + else: + self.info('Here are your live parameters:', sleep_time=0) + self.info('(changes take effect within a second)', end='\n', sleep_time=0) + self.params = self.op_params.get(force_update=True) + if self.live_tuning: # only display live tunable params + self.params = {k: v for k, v in self.params.items() if self.op_params.fork_params[k].live} + + values_list = [] + for k, v in self.params.items(): + if len(str(v)) < 20: + v = self.color_from_type(v) + else: + v = '{} ... {}'.format(str(v)[:30], str(v)[-15:]) + values_list.append(v) + + static = [COLORS.INFO + '(static)' + COLORS.ENDC if self.op_params.fork_params[k].static else '' for k in self.params] + + to_print = [] + blue_gradient = [33, 39, 45, 51, 87] + for idx, param in enumerate(self.params): + line = '{}. {}: {} {}'.format(idx + 1, param, values_list[idx], static[idx]) + if idx == self.last_choice and self.last_choice is not None: + line = COLORS.OKGREEN + line + else: + _color = blue_gradient[min(round(idx / len(self.params) * len(blue_gradient)), len(blue_gradient) - 1)] + line = COLORS.BASE(_color) + line + to_print.append(line) + + extras = {'l': ('Toggle live params', COLORS.WARNING), + 'e': ('Exit opEdit', COLORS.PINK)} + + to_print += ['---'] + ['{}. {}'.format(ext_col + e, ext_txt + COLORS.ENDC) for e, (ext_txt, ext_col) in extras.items()] + print('\n'.join(to_print)) + self.prompt('\nChoose a parameter to edit (by index or name):') + + choice = input('>> ').strip().lower() + parsed, choice = self.parse_choice(choice, len(to_print) - len(extras)) + if parsed == 'continue': + continue + elif parsed == 'change': + self.last_choice = choice + self.change_parameter(choice) + elif parsed == 'live': + self.last_choice = None + self.live_tuning = not self.live_tuning + self.op_params.put('op_edit_live_mode', self.live_tuning) # for next opEdit startup + elif parsed == 'exit': + return + + def parse_choice(self, choice, opt_len): + if choice.isdigit(): + choice = int(choice) + choice -= 1 + if choice not in range(opt_len): # number of options to choose from + self.error('Not in range!') + return 'continue', choice + return 'change', choice + + if choice in ['l', 'live']: # live tuning mode + return 'live', choice + elif choice in ['exit', 'e', '']: + self.error('Exiting opEdit!', sleep_time=0) + return 'exit', choice + else: # find most similar param to user's input + param_sims = [(idx, self.str_sim(choice, param.lower())) for idx, param in enumerate(self.params)] + param_sims = [param for param in param_sims if param[1] > 0.33] + if len(param_sims) > 0: + chosen_param = sorted(param_sims, key=lambda param: param[1], reverse=True)[0] + return 'change', chosen_param[0] # return idx + + self.error('Invalid choice!') + return 'continue', choice + + def str_sim(self, a, b): + return difflib.SequenceMatcher(a=a, b=b).ratio() + + def change_parameter(self, choice): + while True: + chosen_key = list(self.params)[choice] + param_info = self.op_params.fork_params[chosen_key] + + old_value = self.params[chosen_key] + if not param_info.static: + self.info2('Chosen parameter: {}{} (live!)'.format(chosen_key, COLORS.BASE(207)), sleep_time=0) + else: + self.info2('Chosen parameter: {}{} (static)'.format(chosen_key, COLORS.BASE(207)), sleep_time=0) + + to_print = [] + if param_info.has_description: + to_print.append(COLORS.OKGREEN + '>> Description: {}'.format(param_info.description.replace('\n', '\n > ')) + COLORS.ENDC) + if param_info.static: + to_print.append(COLORS.WARNING + '>> A reboot is required for changes to this parameter!' + COLORS.ENDC) + if not param_info.static and not param_info.live: + to_print.append(COLORS.WARNING + '>> Changes take effect within 10 seconds for this parameter!' + COLORS.ENDC) + if param_info.has_allowed_types: + to_print.append(COLORS.RED + '>> Allowed types: {}'.format(', '.join([at.__name__ for at in param_info.allowed_types])) + COLORS.ENDC) + to_print.append(COLORS.WARNING + '>> Default value: {}'.format(self.color_from_type(param_info.default_value)) + COLORS.ENDC) + + if to_print: + print('\n{}\n'.format('\n'.join(to_print))) + + if param_info.is_list: + self.change_param_list(old_value, param_info, chosen_key) # TODO: need to merge the code in this function with the below to reduce redundant code + return + + self.info('Current value: {}{} (type: {})'.format(self.color_from_type(old_value), COLORS.INFO, type(old_value).__name__), sleep_time=0) + + while True: + self.prompt('\nEnter your new value (enter to exit):') + new_value = input('>> ').strip() + if new_value == '': + self.info('Exiting this parameter...\n') + return + + new_value = self.str_eval(new_value) + if not param_info.is_valid(new_value): + self.error('The type of data you entered ({}) is not allowed with this parameter!'.format(type(new_value).__name__)) + continue + + if not param_info.static: # stay in live tuning interface + self.op_params.put(chosen_key, new_value) + self.success('Saved {} with value: {}{}! (type: {})'.format(chosen_key, self.color_from_type(new_value), COLORS.SUCCESS, type(new_value).__name__)) + else: # else ask to save and break + self.warning('\nOld value: {}{} (type: {})'.format(self.color_from_type(old_value), COLORS.WARNING, type(old_value).__name__)) + self.success('New value: {}{} (type: {})'.format(self.color_from_type(new_value), COLORS.OKGREEN, type(new_value).__name__), sleep_time=0) + self.prompt('\nDo you want to save this?') + if self.input_with_options(['Y', 'N'], 'N')[0] == 0: + self.op_params.put(chosen_key, new_value) + self.success('Saved!') + else: + self.info('Not saved!') + return + + def change_param_list(self, old_value, param_info, chosen_key): + while True: + self.info('Current value: {} (type: {})'.format(old_value, type(old_value).__name__), sleep_time=0) + self.prompt('\nEnter index to edit (0 to {}):'.format(len(old_value) - 1)) + choice_idx = self.str_eval(input('>> ')) + if choice_idx == '': + self.info('Exiting this parameter...') + return + + if not isinstance(choice_idx, int) or choice_idx not in range(len(old_value)): + self.error('Must be an integar within list range!') + continue + + while True: + self.info('Chosen index: {}'.format(choice_idx), sleep_time=0) + self.info('Value: {} (type: {})'.format(old_value[choice_idx], type(old_value[choice_idx]).__name__), sleep_time=0) + self.prompt('\nEnter your new value:') + new_value = input('>> ').strip() + if new_value == '': + self.info('Exiting this list item...') + break + + new_value = self.str_eval(new_value) + if not param_info.is_valid(new_value): + self.error('The type of data you entered ({}) is not allowed with this parameter!'.format(type(new_value).__name__)) + continue + + old_value[choice_idx] = new_value + + self.op_params.put(chosen_key, old_value) + self.success('Saved {} with value: {}{}! (type: {})'.format(chosen_key, self.color_from_type(new_value), COLORS.SUCCESS, type(new_value).__name__), end='\n') + break + + def color_from_type(self, v): + v_color = '' + if type(v) in self.type_colors: + v_color = self.type_colors[type(v)] + if isinstance(v, bool): + v_color = v_color[v] + v = '{}{}{}'.format(v_color, v, COLORS.ENDC) + return v + + def cyan(self, msg, end=''): + msg = self.str_color(msg, style='cyan') + # print(msg, flush=True, end='\n' + end) + return msg + + def prompt(self, msg, end=''): + msg = self.str_color(msg, style='prompt') + print(msg, flush=True, end='\n' + end) + + def warning(self, msg, end=''): + msg = self.str_color(msg, style='warning') + print(msg, flush=True, end='\n' + end) + + def info(self, msg, sleep_time=None, end=''): + if sleep_time is None: + sleep_time = self.sleep_time + msg = self.str_color(msg, style='info') + + print(msg, flush=True, end='\n' + end) + time.sleep(sleep_time) + + def info2(self, msg, sleep_time=None, end=''): + if sleep_time is None: + sleep_time = self.sleep_time + msg = self.str_color(msg, style=86) + + print(msg, flush=True, end='\n' + end) + time.sleep(sleep_time) + + def error(self, msg, sleep_time=None, end='', surround=True): + if sleep_time is None: + sleep_time = self.sleep_time + msg = self.str_color(msg, style='fail', surround=surround) + + print(msg, flush=True, end='\n' + end) + time.sleep(sleep_time) + + def success(self, msg, sleep_time=None, end=''): + if sleep_time is None: + sleep_time = self.sleep_time + msg = self.str_color(msg, style='success') + + print(msg, flush=True, end='\n' + end) + time.sleep(sleep_time) + + @staticmethod + def str_color(msg, style, surround=False): + if style == 'success': + style = COLORS.SUCCESS + elif style == 'fail': + style = COLORS.FAIL + elif style == 'prompt': + style = COLORS.PROMPT + elif style == 'info': + style = COLORS.INFO + elif style == 'cyan': + style = COLORS.CYAN + elif style == 'warning': + style = COLORS.WARNING + elif isinstance(style, int): + style = COLORS.BASE(style) + + if surround: + msg = '{}--------\n{}\n{}--------{}'.format(style, msg, COLORS.ENDC + style, COLORS.ENDC) + else: + msg = '{}{}{}'.format(style, msg, COLORS.ENDC) + + return msg + + def input_with_options(self, options, default=None): + """ + Takes in a list of options and asks user to make a choice. + The most similar option list index is returned along with the similarity percentage from 0 to 1 + """ + user_input = input('[{}]: '.format('/'.join(options))).lower().strip() + if not user_input: + return default, 0.0 + sims = [self.str_sim(i.lower().strip(), user_input) for i in options] + argmax = sims.index(max(sims)) + return argmax, sims[argmax] + + def str_eval(self, dat): + dat = dat.strip() + try: + dat = ast.literal_eval(dat) + except: + if dat.lower() == 'none': + dat = None + elif dat.lower() == 'false': + dat = False + elif dat.lower() == 'true': # else, assume string + dat = True + return dat + + +opEdit() diff --git a/selfdrive/car/toyota/carcontroller.py b/selfdrive/car/toyota/carcontroller.py index 8840731ab3..56db35aadd 100644 --- a/selfdrive/car/toyota/carcontroller.py +++ b/selfdrive/car/toyota/carcontroller.py @@ -8,6 +8,7 @@ from selfdrive.car.toyota.values import CAR, STATIC_DSU_MSGS, NO_STOP_TIMER_CAR, MIN_ACC_SPEED, PEDAL_TRANSITION, CarControllerParams, \ UNSUPPORTED_DSU_CAR from opendbc.can.packer import CANPacker +from common.op_params import opParams VisualAlert = car.CarControl.HUDControl.VisualAlert @@ -21,6 +22,7 @@ MAX_USER_TORQUE = 500 class CarController: def __init__(self, dbc_name, CP, VM): + self.op_params = opParams() self.CP = CP self.torque_rate_limits = CarControllerParams(self.CP) self.frame = 0 @@ -112,7 +114,7 @@ class CarController: can_sends.append(create_lta_steer_command(self.packer, actuators.steeringAngleDeg + CS.out.steeringAngleOffsetDeg, CS.out.steeringAngleDeg + CS.out.steeringAngleOffsetDeg, CS.out.steeringTorque, - apply_steer_req)) + apply_steer_req, self.op_params)) # we can spam can to cancel the system even if we are using lat only control if (self.frame % 3 == 0 and self.CP.openpilotLongitudinalControl) or pcm_cancel_cmd: diff --git a/selfdrive/car/toyota/toyotacan.py b/selfdrive/car/toyota/toyotacan.py index 8d307159ae..a6a0076382 100644 --- a/selfdrive/car/toyota/toyotacan.py +++ b/selfdrive/car/toyota/toyotacan.py @@ -12,32 +12,32 @@ def create_steer_command(packer, steer, steer_req): return packer.make_can_msg("STEERING_LKA", 0, values) -def create_lta_steer_command(packer, apply_steer, steer_angle, driver_torque, steer_req): +def create_lta_steer_command(packer, apply_steer, steer_angle, driver_torque, steer_req, op_params): """Creates a CAN message for the Toyota LTA Steer Command.""" percentage = interp(abs(driver_torque), [50, 100], [100, 0]) apply_steer = interp(percentage, [-10, 100], [steer_angle, apply_steer]) values = { # seems to actually be 1. Even 1 on 2023 RAV4 2023 (TODO: check from data) - "SETME_X1": 1, + "SETME_X1": op_params.get("SETME_X1"), # On a RAV4 2023, it seems to be always 1 # But other cars it can change randomly? # TODO: figure that out - "SETME_X3": 1, + "SETME_X3": op_params.get("SETME_X3"), # 100 when driver not touching wheel, 0 when driver touching wheel. ramps smoothly between # TODO: find actual breakpoints and determine how this affects the control - "PERCENTAGE": 100, + "PERCENTAGE": op_params.get("PERCENTAGE"), # ramps to 0 smoothly then back on falling edge of STEER_REQUEST if BIT isn't 1 # but on 2023 RAV4, it was constant 100 on falling edge and BIT was 0 # TODO: figure out if important. torque wind down? maybe user torque blending? - "SETME_X64": 0x64, + "SETME_X64": op_params.get("SETME_X64"), # TODO: need to understand this better, it's always 1.5-2x higher than angle cmd # TODO: revisit on 2023 RAV4 - "ANGLE": 0, + "ANGLE": op_params.get("ANGLE"), # seems to just be desired angle cmd # TODO: does this have offset on cars where accurate steering angle signal has offset? @@ -49,7 +49,7 @@ def create_lta_steer_command(packer, apply_steer, steer_angle, driver_torque, st # 1 when actively using LTA. 3 when LTA is activated for LKA. 0 when LTA_REQUEST is 0 # TODO: see if 3 gets us any more torque, or better blending, or SOMETHING. EPS_STATUS doesn't change based on this, so maybe it doesn't do anything - "LTA_REQUEST_TYPE": steer_req, + "LTA_REQUEST_TYPE": op_params.get("LTA_REQUEST_TYPE") if steer_req else 0, # 1 when STEER_REQUEST changes state (usually) # except not true on 2023 RAV4. TODO: revisit, could it be UI related?