openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

194 lines
9.0 KiB

2 years ago
#!/usr/bin/env python3
import os
import json
from atomicwrites import atomic_write
from common.colors import COLORS
from common.basedir import BASEDIR
from selfdrive.hardware import TICI
try:
from common.realtime import sec_since_boot
except ImportError:
import time
sec_since_boot = time.time
warning = lambda msg: print('{}opParams WARNING: {}{}'.format(COLORS.WARNING, msg, COLORS.ENDC))
error = lambda msg: print('{}opParams ERROR: {}{}'.format(COLORS.FAIL, msg, COLORS.ENDC))
NUMBER = [float, int] # value types
NONE_OR_NUMBER = [type(None), float, int]
BASEDIR = os.path.dirname(BASEDIR)
PARAMS_DIR = os.path.join(BASEDIR, 'community', 'params')
IMPORTED_PATH = os.path.join(PARAMS_DIR, '.imported')
OLD_PARAMS_FILE = os.path.join(BASEDIR, 'op_params.json')
class Param:
def __init__(self, default, allowed_types=[], description=None, *, static=False, live=False, hidden=False): # pylint: disable=dangerous-default-value
self.default_value = default # value first saved and returned if actual value isn't a valid type
if not isinstance(allowed_types, list):
allowed_types = [allowed_types]
self.allowed_types = allowed_types # allowed python value types for opEdit
self.description = description # description to be shown in opEdit
self.hidden = hidden # hide this param to user in opEdit
self.live = live # show under the live menu in opEdit
self.static = static # use cached value, never reads to update
self._create_attrs()
def is_valid(self, value):
if not self.has_allowed_types: # always valid if no allowed types, otherwise checks to make sure
return True
return type(value) in self.allowed_types
def _create_attrs(self): # Create attributes and check Param is valid
self.has_allowed_types = isinstance(self.allowed_types, list) and len(self.allowed_types) > 0
self.has_description = self.description is not None
self.is_list = list in self.allowed_types
self.read_frequency = None if self.static else (1 if self.live else 10) # how often to read param file (sec)
self.last_read = -1
if self.has_allowed_types:
assert type(self.default_value) in self.allowed_types, 'Default value type must be in specified allowed_types!'
if self.is_list:
self.allowed_types.remove(list)
def _read_param(key): # Returns None, False if a json error occurs
try:
with open(os.path.join(PARAMS_DIR, key), 'r') as f:
value = json.loads(f.read())
return value, True
except json.decoder.JSONDecodeError:
return None, False
def _write_param(key, value):
param_path = os.path.join(PARAMS_DIR, key)
with atomic_write(param_path, overwrite=True) as f:
f.write(json.dumps(value))
def _import_params():
if os.path.exists(OLD_PARAMS_FILE) and not os.path.exists(IMPORTED_PATH): # if opParams needs to import from old params file
try:
with open(OLD_PARAMS_FILE, 'r') as f:
old_params = json.loads(f.read())
for key in old_params:
_write_param(key, old_params[key])
open(IMPORTED_PATH, 'w').close()
except: # pylint: disable=bare-except
pass
class opParams:
def __init__(self):
"""
To add your own parameter to opParams in your fork, simply add a new entry in self.fork_params, instancing a new Param class with at minimum a default value.
The allowed_types and description args are not required but highly recommended to help users edit their parameters with opEdit safely.
- The description value will be shown to users when they use opEdit to change the value of the parameter.
- The allowed_types arg is used to restrict what kinds of values can be entered with opEdit so that users can't crash openpilot with unintended behavior.
(setting a param intended to be a number with a boolean, or viceversa for example)
Limiting the range of floats or integers is still recommended when `.get`ting the parameter.
When a None value is allowed, use `type(None)` instead of None, as opEdit checks the type against the values in the arg with `isinstance()`.
- If you want your param to update within a second, specify live=True. If your param is designed to be read once, specify static=True.
Specifying neither will have the param update every 10 seconds if constantly .get()
If the param is not static, call the .get() function on it in the update function of the file you're reading from to use live updating
Here's an example of a good fork_param entry:
self.fork_params = {'camera_offset': Param(0.06, allowed_types=NUMBER), live=True} # NUMBER allows both floats and ints
"""
self.fork_params = {
'SETME_X1': Param(1, NUMBER, 'Always 1', live=True),
'SETME_X3': Param(1, NUMBER, 'Sometimes 3, mostly 1?', live=True),
'PERCENTAGE': Param(100, NUMBER, '100 when not touching wheel, 0 when touching wheel', live=True),
'SETME_X64': Param(100, NUMBER, 'Unsure', live=True),
'ANGLE': Param(0, NUMBER, 'Rate limit? Lower is better?', live=True),
# 'LKA_REQUEST': Param(1, NUMBER, '1 when using LTA for LKA', live=True),
2 years ago
'BIT': Param(0, NUMBER, '1: LTA, 2: LTA for lane keeping', live=True),
}
self._to_delete = [] # a list of unused params you want to delete from users' params file
self._to_reset = [] # a list of params you want reset to their default values
self._run_init() # restores, reads, and updates params
def _run_init(self): # does first time initializing of default params
# Two required parameters for opEdit
self.fork_params['username'] = Param(None, [type(None), str, bool], 'Your identifier provided with any crash logs sent to Sentry.\nHelps the developer reach out to you if anything goes wrong')
self.fork_params['op_edit_live_mode'] = Param(False, bool, 'This parameter controls which mode opEdit starts in', hidden=True)
self.params = self._load_params(can_import=True)
self._add_default_params() # adds missing params and resets values with invalid types to self.params
self._delete_and_reset() # removes old params
def get(self, key=None, *, force_update=False): # key=None returns dict of all params
if key is None:
return self._get_all_params(to_update=force_update)
self._check_key_exists(key, 'get')
param_info = self.fork_params[key]
rate = param_info.read_frequency # will be None if param is static, so check below
if (not param_info.static and sec_since_boot() - self.fork_params[key].last_read >= rate) or force_update:
value, success = _read_param(key)
self.fork_params[key].last_read = sec_since_boot()
if not success: # in case of read error, use default and overwrite param
value = param_info.default_value
_write_param(key, value)
self.params[key] = value
if param_info.is_valid(value := self.params[key]):
return value # all good, returning user's value
print(warning('User\'s value type is not valid! Returning default')) # somehow... it should always be valid
return param_info.default_value # return default value because user's value of key is not in allowed_types to avoid crashing openpilot
def put(self, key, value):
self._check_key_exists(key, 'put')
if not self.fork_params[key].is_valid(value):
raise Exception('opParams: Tried to put a value of invalid type!')
self.params.update({key: value})
_write_param(key, value)
def _load_params(self, can_import=False):
if not os.path.exists(PARAMS_DIR):
os.makedirs(PARAMS_DIR)
if can_import:
_import_params() # just imports old params. below we read them in
params = {}
for key in os.listdir(PARAMS_DIR): # PARAMS_DIR is guaranteed to exist
if key.startswith('.') or key not in self.fork_params:
continue
value, success = _read_param(key)
if not success:
value = self.fork_params[key].default_value
_write_param(key, value)
params[key] = value
return params
def _get_all_params(self, to_update=False):
if to_update:
self.params = self._load_params()
return {k: self.params[k] for k, p in self.fork_params.items() if k in self.params and not p.hidden}
def _check_key_exists(self, key, met):
if key not in self.fork_params:
raise Exception('opParams: Tried to {} an unknown parameter! Key not in fork_params: {}'.format(met, key))
def _add_default_params(self):
for key, param in self.fork_params.items():
if key not in self.params:
self.params[key] = param.default_value
_write_param(key, self.params[key])
elif not param.is_valid(self.params[key]):
print(warning('Value type of user\'s {} param not in allowed types, replacing with default!'.format(key)))
self.params[key] = param.default_value
_write_param(key, self.params[key])
def _delete_and_reset(self):
for key in list(self.params):
if key in self._to_delete:
del self.params[key]
os.remove(os.path.join(PARAMS_DIR, key))
elif key in self._to_reset and key in self.fork_params:
self.params[key] = self.fork_params[key].default_value
_write_param(key, self.params[key])