add opParams

pull/27494/head
Shane Smiskol 3 years ago
parent 03cfc4c80a
commit 876c8b53cc
  1. 36
      common/colors.py
  2. 193
      common/op_params.py
  3. 325
      op_edit.py
  4. 4
      selfdrive/car/toyota/carcontroller.py
  5. 14
      selfdrive/car/toyota/toyotacan.py

@ -0,0 +1,36 @@
class COLORS:
def __init__(self):
self.HEADER = '\033[95m'
self.OKBLUE = '\033[94m'
self.CBLUE = '\33[44m'
self.BOLD = '\033[1m'
self.CITALIC = '\33[3m'
self.OKGREEN = '\033[92m'
self.CWHITE = '\33[37m'
self.ENDC = '\033[0m' + self.CWHITE
self.UNDERLINE = '\033[4m'
self.PINK = '\33[38;5;207m'
self.PRETTY_YELLOW = self.BASE(220)
self.RED = '\033[91m'
self.PURPLE_BG = '\33[45m'
self.YELLOW = '\033[93m'
self.BLUE_GREEN = self.BASE(85)
self.FAIL = self.RED
# self.INFO = self.PURPLE_BG
self.INFO = self.BASE(207)
self.SUCCESS = self.OKGREEN
self.PROMPT = self.YELLOW
self.DBLUE = '\033[36m'
self.CYAN = self.BASE(39)
self.WARNING = '\033[33m'
def BASE(self, col): # seems to support more colors
return '\33[38;5;{}m'.format(col)
def BASEBG(self, col): # seems to support more colors
return '\33[48;5;{}m'.format(col)
COLORS = COLORS()

@ -0,0 +1,193 @@
#!/usr/bin/env python3
import os
import json
from atomicwrites import atomic_write
from common.colors import COLORS
from common.basedir import BASEDIR
from selfdrive.hardware import TICI
try:
from common.realtime import sec_since_boot
except ImportError:
import time
sec_since_boot = time.time
warning = lambda msg: print('{}opParams WARNING: {}{}'.format(COLORS.WARNING, msg, COLORS.ENDC))
error = lambda msg: print('{}opParams ERROR: {}{}'.format(COLORS.FAIL, msg, COLORS.ENDC))
NUMBER = [float, int] # value types
NONE_OR_NUMBER = [type(None), float, int]
BASEDIR = os.path.dirname(BASEDIR)
PARAMS_DIR = os.path.join(BASEDIR, 'community', 'params')
IMPORTED_PATH = os.path.join(PARAMS_DIR, '.imported')
OLD_PARAMS_FILE = os.path.join(BASEDIR, 'op_params.json')
class Param:
def __init__(self, default, allowed_types=[], description=None, *, static=False, live=False, hidden=False): # pylint: disable=dangerous-default-value
self.default_value = default # value first saved and returned if actual value isn't a valid type
if not isinstance(allowed_types, list):
allowed_types = [allowed_types]
self.allowed_types = allowed_types # allowed python value types for opEdit
self.description = description # description to be shown in opEdit
self.hidden = hidden # hide this param to user in opEdit
self.live = live # show under the live menu in opEdit
self.static = static # use cached value, never reads to update
self._create_attrs()
def is_valid(self, value):
if not self.has_allowed_types: # always valid if no allowed types, otherwise checks to make sure
return True
return type(value) in self.allowed_types
def _create_attrs(self): # Create attributes and check Param is valid
self.has_allowed_types = isinstance(self.allowed_types, list) and len(self.allowed_types) > 0
self.has_description = self.description is not None
self.is_list = list in self.allowed_types
self.read_frequency = None if self.static else (1 if self.live else 10) # how often to read param file (sec)
self.last_read = -1
if self.has_allowed_types:
assert type(self.default_value) in self.allowed_types, 'Default value type must be in specified allowed_types!'
if self.is_list:
self.allowed_types.remove(list)
def _read_param(key): # Returns None, False if a json error occurs
try:
with open(os.path.join(PARAMS_DIR, key), 'r') as f:
value = json.loads(f.read())
return value, True
except json.decoder.JSONDecodeError:
return None, False
def _write_param(key, value):
param_path = os.path.join(PARAMS_DIR, key)
with atomic_write(param_path, overwrite=True) as f:
f.write(json.dumps(value))
def _import_params():
if os.path.exists(OLD_PARAMS_FILE) and not os.path.exists(IMPORTED_PATH): # if opParams needs to import from old params file
try:
with open(OLD_PARAMS_FILE, 'r') as f:
old_params = json.loads(f.read())
for key in old_params:
_write_param(key, old_params[key])
open(IMPORTED_PATH, 'w').close()
except: # pylint: disable=bare-except
pass
class opParams:
def __init__(self):
"""
To add your own parameter to opParams in your fork, simply add a new entry in self.fork_params, instancing a new Param class with at minimum a default value.
The allowed_types and description args are not required but highly recommended to help users edit their parameters with opEdit safely.
- The description value will be shown to users when they use opEdit to change the value of the parameter.
- The allowed_types arg is used to restrict what kinds of values can be entered with opEdit so that users can't crash openpilot with unintended behavior.
(setting a param intended to be a number with a boolean, or viceversa for example)
Limiting the range of floats or integers is still recommended when `.get`ting the parameter.
When a None value is allowed, use `type(None)` instead of None, as opEdit checks the type against the values in the arg with `isinstance()`.
- If you want your param to update within a second, specify live=True. If your param is designed to be read once, specify static=True.
Specifying neither will have the param update every 10 seconds if constantly .get()
If the param is not static, call the .get() function on it in the update function of the file you're reading from to use live updating
Here's an example of a good fork_param entry:
self.fork_params = {'camera_offset': Param(0.06, allowed_types=NUMBER), live=True} # NUMBER allows both floats and ints
"""
self.fork_params = {
'SETME_X1': Param(1, NUMBER, 'Always 1', live=True),
'SETME_X3': Param(1, NUMBER, 'Sometimes 3, mostly 1?', live=True),
'PERCENTAGE': Param(100, NUMBER, '100 when not touching wheel, 0 when touching wheel', live=True),
'SETME_X64': Param(100, NUMBER, 'Unsure', live=True),
'ANGLE': Param(0, NUMBER, 'Rate limit? Lower is better?', live=True),
'LTA_REQUEST_TYPE': Param(1, NUMBER, '1: LTA, 3: LTA for lane keeping', live=True),
'BIT': Param(0, NUMBER, '1: LTA, 2: LTA for lane keeping', live=True),
}
self._to_delete = [] # a list of unused params you want to delete from users' params file
self._to_reset = [] # a list of params you want reset to their default values
self._run_init() # restores, reads, and updates params
def _run_init(self): # does first time initializing of default params
# Two required parameters for opEdit
self.fork_params['username'] = Param(None, [type(None), str, bool], 'Your identifier provided with any crash logs sent to Sentry.\nHelps the developer reach out to you if anything goes wrong')
self.fork_params['op_edit_live_mode'] = Param(False, bool, 'This parameter controls which mode opEdit starts in', hidden=True)
self.params = self._load_params(can_import=True)
self._add_default_params() # adds missing params and resets values with invalid types to self.params
self._delete_and_reset() # removes old params
def get(self, key=None, *, force_update=False): # key=None returns dict of all params
if key is None:
return self._get_all_params(to_update=force_update)
self._check_key_exists(key, 'get')
param_info = self.fork_params[key]
rate = param_info.read_frequency # will be None if param is static, so check below
if (not param_info.static and sec_since_boot() - self.fork_params[key].last_read >= rate) or force_update:
value, success = _read_param(key)
self.fork_params[key].last_read = sec_since_boot()
if not success: # in case of read error, use default and overwrite param
value = param_info.default_value
_write_param(key, value)
self.params[key] = value
if param_info.is_valid(value := self.params[key]):
return value # all good, returning user's value
print(warning('User\'s value type is not valid! Returning default')) # somehow... it should always be valid
return param_info.default_value # return default value because user's value of key is not in allowed_types to avoid crashing openpilot
def put(self, key, value):
self._check_key_exists(key, 'put')
if not self.fork_params[key].is_valid(value):
raise Exception('opParams: Tried to put a value of invalid type!')
self.params.update({key: value})
_write_param(key, value)
def _load_params(self, can_import=False):
if not os.path.exists(PARAMS_DIR):
os.makedirs(PARAMS_DIR)
if can_import:
_import_params() # just imports old params. below we read them in
params = {}
for key in os.listdir(PARAMS_DIR): # PARAMS_DIR is guaranteed to exist
if key.startswith('.') or key not in self.fork_params:
continue
value, success = _read_param(key)
if not success:
value = self.fork_params[key].default_value
_write_param(key, value)
params[key] = value
return params
def _get_all_params(self, to_update=False):
if to_update:
self.params = self._load_params()
return {k: self.params[k] for k, p in self.fork_params.items() if k in self.params and not p.hidden}
def _check_key_exists(self, key, met):
if key not in self.fork_params:
raise Exception('opParams: Tried to {} an unknown parameter! Key not in fork_params: {}'.format(met, key))
def _add_default_params(self):
for key, param in self.fork_params.items():
if key not in self.params:
self.params[key] = param.default_value
_write_param(key, self.params[key])
elif not param.is_valid(self.params[key]):
print(warning('Value type of user\'s {} param not in allowed types, replacing with default!'.format(key)))
self.params[key] = param.default_value
_write_param(key, self.params[key])
def _delete_and_reset(self):
for key in list(self.params):
if key in self._to_delete:
del self.params[key]
os.remove(os.path.join(PARAMS_DIR, key))
elif key in self._to_reset and key in self.fork_params:
self.params[key] = self.fork_params[key].default_value
_write_param(key, self.params[key])

@ -0,0 +1,325 @@
#!/usr/bin/env python3
import time
from common.op_params import opParams
import ast
import difflib
from common.colors import COLORS
class opEdit: # use by running `python /data/openpilot/op_edit.py`
def __init__(self):
self.op_params = opParams()
self.params = None
self.sleep_time = 0.5
self.live_tuning = self.op_params.get('op_edit_live_mode')
self.username = self.op_params.get('username')
self.type_colors = {int: COLORS.BASE(179), float: COLORS.BASE(179),
bool: {False: COLORS.RED, True: COLORS.OKGREEN},
type(None): COLORS.BASE(177),
str: COLORS.BASE(77)}
self.last_choice = None
self.run_init()
def run_init(self):
if self.username is None:
self.success('\nWelcome to the {}opParams{} command line editor!'.format(COLORS.CYAN, COLORS.SUCCESS), sleep_time=0)
self.prompt('Would you like to add your Discord username for easier crash debugging for the fork owner?')
self.prompt('Your username is only used for reaching out if a crash occurs.')
username_choice = self.input_with_options(['Y', 'N', 'don\'t ask again'], default='n')[0]
if username_choice == 0:
self.prompt('Enter a unique identifer/Discord username:')
username = ''
while username == '':
username = input('>> ').strip()
self.op_params.put('username', username)
self.username = username
self.success('Thanks! Saved your username\n'
'Edit the \'username\' parameter at any time to update', sleep_time=1.5)
elif username_choice == 2:
self.op_params.put('username', False)
self.info('Got it, bringing you into opEdit\n'
'Edit the \'username\' parameter at any time to update', sleep_time=1.0)
else:
self.success('\nWelcome to the {}opParams{} command line editor, {}!'.format(COLORS.CYAN, COLORS.SUCCESS, self.username), sleep_time=0)
self.run_loop()
def run_loop(self):
while True:
if not self.live_tuning:
self.info('Here are all your parameters:', sleep_time=0)
self.info('(non-static params update while driving)', end='\n', sleep_time=0)
else:
self.info('Here are your live parameters:', sleep_time=0)
self.info('(changes take effect within a second)', end='\n', sleep_time=0)
self.params = self.op_params.get(force_update=True)
if self.live_tuning: # only display live tunable params
self.params = {k: v for k, v in self.params.items() if self.op_params.fork_params[k].live}
values_list = []
for k, v in self.params.items():
if len(str(v)) < 20:
v = self.color_from_type(v)
else:
v = '{} ... {}'.format(str(v)[:30], str(v)[-15:])
values_list.append(v)
static = [COLORS.INFO + '(static)' + COLORS.ENDC if self.op_params.fork_params[k].static else '' for k in self.params]
to_print = []
blue_gradient = [33, 39, 45, 51, 87]
for idx, param in enumerate(self.params):
line = '{}. {}: {} {}'.format(idx + 1, param, values_list[idx], static[idx])
if idx == self.last_choice and self.last_choice is not None:
line = COLORS.OKGREEN + line
else:
_color = blue_gradient[min(round(idx / len(self.params) * len(blue_gradient)), len(blue_gradient) - 1)]
line = COLORS.BASE(_color) + line
to_print.append(line)
extras = {'l': ('Toggle live params', COLORS.WARNING),
'e': ('Exit opEdit', COLORS.PINK)}
to_print += ['---'] + ['{}. {}'.format(ext_col + e, ext_txt + COLORS.ENDC) for e, (ext_txt, ext_col) in extras.items()]
print('\n'.join(to_print))
self.prompt('\nChoose a parameter to edit (by index or name):')
choice = input('>> ').strip().lower()
parsed, choice = self.parse_choice(choice, len(to_print) - len(extras))
if parsed == 'continue':
continue
elif parsed == 'change':
self.last_choice = choice
self.change_parameter(choice)
elif parsed == 'live':
self.last_choice = None
self.live_tuning = not self.live_tuning
self.op_params.put('op_edit_live_mode', self.live_tuning) # for next opEdit startup
elif parsed == 'exit':
return
def parse_choice(self, choice, opt_len):
if choice.isdigit():
choice = int(choice)
choice -= 1
if choice not in range(opt_len): # number of options to choose from
self.error('Not in range!')
return 'continue', choice
return 'change', choice
if choice in ['l', 'live']: # live tuning mode
return 'live', choice
elif choice in ['exit', 'e', '']:
self.error('Exiting opEdit!', sleep_time=0)
return 'exit', choice
else: # find most similar param to user's input
param_sims = [(idx, self.str_sim(choice, param.lower())) for idx, param in enumerate(self.params)]
param_sims = [param for param in param_sims if param[1] > 0.33]
if len(param_sims) > 0:
chosen_param = sorted(param_sims, key=lambda param: param[1], reverse=True)[0]
return 'change', chosen_param[0] # return idx
self.error('Invalid choice!')
return 'continue', choice
def str_sim(self, a, b):
return difflib.SequenceMatcher(a=a, b=b).ratio()
def change_parameter(self, choice):
while True:
chosen_key = list(self.params)[choice]
param_info = self.op_params.fork_params[chosen_key]
old_value = self.params[chosen_key]
if not param_info.static:
self.info2('Chosen parameter: {}{} (live!)'.format(chosen_key, COLORS.BASE(207)), sleep_time=0)
else:
self.info2('Chosen parameter: {}{} (static)'.format(chosen_key, COLORS.BASE(207)), sleep_time=0)
to_print = []
if param_info.has_description:
to_print.append(COLORS.OKGREEN + '>> Description: {}'.format(param_info.description.replace('\n', '\n > ')) + COLORS.ENDC)
if param_info.static:
to_print.append(COLORS.WARNING + '>> A reboot is required for changes to this parameter!' + COLORS.ENDC)
if not param_info.static and not param_info.live:
to_print.append(COLORS.WARNING + '>> Changes take effect within 10 seconds for this parameter!' + COLORS.ENDC)
if param_info.has_allowed_types:
to_print.append(COLORS.RED + '>> Allowed types: {}'.format(', '.join([at.__name__ for at in param_info.allowed_types])) + COLORS.ENDC)
to_print.append(COLORS.WARNING + '>> Default value: {}'.format(self.color_from_type(param_info.default_value)) + COLORS.ENDC)
if to_print:
print('\n{}\n'.format('\n'.join(to_print)))
if param_info.is_list:
self.change_param_list(old_value, param_info, chosen_key) # TODO: need to merge the code in this function with the below to reduce redundant code
return
self.info('Current value: {}{} (type: {})'.format(self.color_from_type(old_value), COLORS.INFO, type(old_value).__name__), sleep_time=0)
while True:
self.prompt('\nEnter your new value (enter to exit):')
new_value = input('>> ').strip()
if new_value == '':
self.info('Exiting this parameter...\n')
return
new_value = self.str_eval(new_value)
if not param_info.is_valid(new_value):
self.error('The type of data you entered ({}) is not allowed with this parameter!'.format(type(new_value).__name__))
continue
if not param_info.static: # stay in live tuning interface
self.op_params.put(chosen_key, new_value)
self.success('Saved {} with value: {}{}! (type: {})'.format(chosen_key, self.color_from_type(new_value), COLORS.SUCCESS, type(new_value).__name__))
else: # else ask to save and break
self.warning('\nOld value: {}{} (type: {})'.format(self.color_from_type(old_value), COLORS.WARNING, type(old_value).__name__))
self.success('New value: {}{} (type: {})'.format(self.color_from_type(new_value), COLORS.OKGREEN, type(new_value).__name__), sleep_time=0)
self.prompt('\nDo you want to save this?')
if self.input_with_options(['Y', 'N'], 'N')[0] == 0:
self.op_params.put(chosen_key, new_value)
self.success('Saved!')
else:
self.info('Not saved!')
return
def change_param_list(self, old_value, param_info, chosen_key):
while True:
self.info('Current value: {} (type: {})'.format(old_value, type(old_value).__name__), sleep_time=0)
self.prompt('\nEnter index to edit (0 to {}):'.format(len(old_value) - 1))
choice_idx = self.str_eval(input('>> '))
if choice_idx == '':
self.info('Exiting this parameter...')
return
if not isinstance(choice_idx, int) or choice_idx not in range(len(old_value)):
self.error('Must be an integar within list range!')
continue
while True:
self.info('Chosen index: {}'.format(choice_idx), sleep_time=0)
self.info('Value: {} (type: {})'.format(old_value[choice_idx], type(old_value[choice_idx]).__name__), sleep_time=0)
self.prompt('\nEnter your new value:')
new_value = input('>> ').strip()
if new_value == '':
self.info('Exiting this list item...')
break
new_value = self.str_eval(new_value)
if not param_info.is_valid(new_value):
self.error('The type of data you entered ({}) is not allowed with this parameter!'.format(type(new_value).__name__))
continue
old_value[choice_idx] = new_value
self.op_params.put(chosen_key, old_value)
self.success('Saved {} with value: {}{}! (type: {})'.format(chosen_key, self.color_from_type(new_value), COLORS.SUCCESS, type(new_value).__name__), end='\n')
break
def color_from_type(self, v):
v_color = ''
if type(v) in self.type_colors:
v_color = self.type_colors[type(v)]
if isinstance(v, bool):
v_color = v_color[v]
v = '{}{}{}'.format(v_color, v, COLORS.ENDC)
return v
def cyan(self, msg, end=''):
msg = self.str_color(msg, style='cyan')
# print(msg, flush=True, end='\n' + end)
return msg
def prompt(self, msg, end=''):
msg = self.str_color(msg, style='prompt')
print(msg, flush=True, end='\n' + end)
def warning(self, msg, end=''):
msg = self.str_color(msg, style='warning')
print(msg, flush=True, end='\n' + end)
def info(self, msg, sleep_time=None, end=''):
if sleep_time is None:
sleep_time = self.sleep_time
msg = self.str_color(msg, style='info')
print(msg, flush=True, end='\n' + end)
time.sleep(sleep_time)
def info2(self, msg, sleep_time=None, end=''):
if sleep_time is None:
sleep_time = self.sleep_time
msg = self.str_color(msg, style=86)
print(msg, flush=True, end='\n' + end)
time.sleep(sleep_time)
def error(self, msg, sleep_time=None, end='', surround=True):
if sleep_time is None:
sleep_time = self.sleep_time
msg = self.str_color(msg, style='fail', surround=surround)
print(msg, flush=True, end='\n' + end)
time.sleep(sleep_time)
def success(self, msg, sleep_time=None, end=''):
if sleep_time is None:
sleep_time = self.sleep_time
msg = self.str_color(msg, style='success')
print(msg, flush=True, end='\n' + end)
time.sleep(sleep_time)
@staticmethod
def str_color(msg, style, surround=False):
if style == 'success':
style = COLORS.SUCCESS
elif style == 'fail':
style = COLORS.FAIL
elif style == 'prompt':
style = COLORS.PROMPT
elif style == 'info':
style = COLORS.INFO
elif style == 'cyan':
style = COLORS.CYAN
elif style == 'warning':
style = COLORS.WARNING
elif isinstance(style, int):
style = COLORS.BASE(style)
if surround:
msg = '{}--------\n{}\n{}--------{}'.format(style, msg, COLORS.ENDC + style, COLORS.ENDC)
else:
msg = '{}{}{}'.format(style, msg, COLORS.ENDC)
return msg
def input_with_options(self, options, default=None):
"""
Takes in a list of options and asks user to make a choice.
The most similar option list index is returned along with the similarity percentage from 0 to 1
"""
user_input = input('[{}]: '.format('/'.join(options))).lower().strip()
if not user_input:
return default, 0.0
sims = [self.str_sim(i.lower().strip(), user_input) for i in options]
argmax = sims.index(max(sims))
return argmax, sims[argmax]
def str_eval(self, dat):
dat = dat.strip()
try:
dat = ast.literal_eval(dat)
except:
if dat.lower() == 'none':
dat = None
elif dat.lower() == 'false':
dat = False
elif dat.lower() == 'true': # else, assume string
dat = True
return dat
opEdit()

@ -8,6 +8,7 @@ from selfdrive.car.toyota.values import CAR, STATIC_DSU_MSGS, NO_STOP_TIMER_CAR,
MIN_ACC_SPEED, PEDAL_TRANSITION, CarControllerParams, \ MIN_ACC_SPEED, PEDAL_TRANSITION, CarControllerParams, \
UNSUPPORTED_DSU_CAR UNSUPPORTED_DSU_CAR
from opendbc.can.packer import CANPacker from opendbc.can.packer import CANPacker
from common.op_params import opParams
VisualAlert = car.CarControl.HUDControl.VisualAlert VisualAlert = car.CarControl.HUDControl.VisualAlert
@ -21,6 +22,7 @@ MAX_USER_TORQUE = 500
class CarController: class CarController:
def __init__(self, dbc_name, CP, VM): def __init__(self, dbc_name, CP, VM):
self.op_params = opParams()
self.CP = CP self.CP = CP
self.torque_rate_limits = CarControllerParams(self.CP) self.torque_rate_limits = CarControllerParams(self.CP)
self.frame = 0 self.frame = 0
@ -112,7 +114,7 @@ class CarController:
can_sends.append(create_lta_steer_command(self.packer, actuators.steeringAngleDeg + CS.out.steeringAngleOffsetDeg, can_sends.append(create_lta_steer_command(self.packer, actuators.steeringAngleDeg + CS.out.steeringAngleOffsetDeg,
CS.out.steeringAngleDeg + CS.out.steeringAngleOffsetDeg, CS.out.steeringAngleDeg + CS.out.steeringAngleOffsetDeg,
CS.out.steeringTorque, CS.out.steeringTorque,
apply_steer_req)) apply_steer_req, self.op_params))
# we can spam can to cancel the system even if we are using lat only control # we can spam can to cancel the system even if we are using lat only control
if (self.frame % 3 == 0 and self.CP.openpilotLongitudinalControl) or pcm_cancel_cmd: if (self.frame % 3 == 0 and self.CP.openpilotLongitudinalControl) or pcm_cancel_cmd:

@ -12,32 +12,32 @@ def create_steer_command(packer, steer, steer_req):
return packer.make_can_msg("STEERING_LKA", 0, values) return packer.make_can_msg("STEERING_LKA", 0, values)
def create_lta_steer_command(packer, apply_steer, steer_angle, driver_torque, steer_req): def create_lta_steer_command(packer, apply_steer, steer_angle, driver_torque, steer_req, op_params):
"""Creates a CAN message for the Toyota LTA Steer Command.""" """Creates a CAN message for the Toyota LTA Steer Command."""
percentage = interp(abs(driver_torque), [50, 100], [100, 0]) percentage = interp(abs(driver_torque), [50, 100], [100, 0])
apply_steer = interp(percentage, [-10, 100], [steer_angle, apply_steer]) apply_steer = interp(percentage, [-10, 100], [steer_angle, apply_steer])
values = { values = {
# seems to actually be 1. Even 1 on 2023 RAV4 2023 (TODO: check from data) # seems to actually be 1. Even 1 on 2023 RAV4 2023 (TODO: check from data)
"SETME_X1": 1, "SETME_X1": op_params.get("SETME_X1"),
# On a RAV4 2023, it seems to be always 1 # On a RAV4 2023, it seems to be always 1
# But other cars it can change randomly? # But other cars it can change randomly?
# TODO: figure that out # TODO: figure that out
"SETME_X3": 1, "SETME_X3": op_params.get("SETME_X3"),
# 100 when driver not touching wheel, 0 when driver touching wheel. ramps smoothly between # 100 when driver not touching wheel, 0 when driver touching wheel. ramps smoothly between
# TODO: find actual breakpoints and determine how this affects the control # TODO: find actual breakpoints and determine how this affects the control
"PERCENTAGE": 100, "PERCENTAGE": op_params.get("PERCENTAGE"),
# ramps to 0 smoothly then back on falling edge of STEER_REQUEST if BIT isn't 1 # ramps to 0 smoothly then back on falling edge of STEER_REQUEST if BIT isn't 1
# but on 2023 RAV4, it was constant 100 on falling edge and BIT was 0 # but on 2023 RAV4, it was constant 100 on falling edge and BIT was 0
# TODO: figure out if important. torque wind down? maybe user torque blending? # TODO: figure out if important. torque wind down? maybe user torque blending?
"SETME_X64": 0x64, "SETME_X64": op_params.get("SETME_X64"),
# TODO: need to understand this better, it's always 1.5-2x higher than angle cmd # TODO: need to understand this better, it's always 1.5-2x higher than angle cmd
# TODO: revisit on 2023 RAV4 # TODO: revisit on 2023 RAV4
"ANGLE": 0, "ANGLE": op_params.get("ANGLE"),
# seems to just be desired angle cmd # seems to just be desired angle cmd
# TODO: does this have offset on cars where accurate steering angle signal has offset? # TODO: does this have offset on cars where accurate steering angle signal has offset?
@ -49,7 +49,7 @@ def create_lta_steer_command(packer, apply_steer, steer_angle, driver_torque, st
# 1 when actively using LTA. 3 when LTA is activated for LKA. 0 when LTA_REQUEST is 0 # 1 when actively using LTA. 3 when LTA is activated for LKA. 0 when LTA_REQUEST is 0
# TODO: see if 3 gets us any more torque, or better blending, or SOMETHING. EPS_STATUS doesn't change based on this, so maybe it doesn't do anything # TODO: see if 3 gets us any more torque, or better blending, or SOMETHING. EPS_STATUS doesn't change based on this, so maybe it doesn't do anything
"LTA_REQUEST_TYPE": steer_req, "LTA_REQUEST_TYPE": op_params.get("LTA_REQUEST_TYPE") if steer_req else 0,
# 1 when STEER_REQUEST changes state (usually) # 1 when STEER_REQUEST changes state (usually)
# except not true on 2023 RAV4. TODO: revisit, could it be UI related? # except not true on 2023 RAV4. TODO: revisit, could it be UI related?

Loading…
Cancel
Save