diff --git a/common/colors.py b/common/colors.py deleted file mode 100644 index 95aab123a4..0000000000 --- a/common/colors.py +++ /dev/null @@ -1,36 +0,0 @@ -class COLORS: - def __init__(self): - self.HEADER = '\033[95m' - self.OKBLUE = '\033[94m' - self.CBLUE = '\33[44m' - self.BOLD = '\033[1m' - self.CITALIC = '\33[3m' - self.OKGREEN = '\033[92m' - self.CWHITE = '\33[37m' - self.ENDC = '\033[0m' + self.CWHITE - self.UNDERLINE = '\033[4m' - self.PINK = '\33[38;5;207m' - self.PRETTY_YELLOW = self.BASE(220) - - self.RED = '\033[91m' - self.PURPLE_BG = '\33[45m' - self.YELLOW = '\033[93m' - self.BLUE_GREEN = self.BASE(85) - - self.FAIL = self.RED - # self.INFO = self.PURPLE_BG - self.INFO = self.BASE(207) - self.SUCCESS = self.OKGREEN - self.PROMPT = self.YELLOW - self.DBLUE = '\033[36m' - self.CYAN = self.BASE(39) - self.WARNING = '\033[33m' - - def BASE(self, col): # seems to support more colors - return '\33[38;5;{}m'.format(col) - - def BASEBG(self, col): # seems to support more colors - return '\33[48;5;{}m'.format(col) - - -COLORS = COLORS() diff --git a/common/op_params.py b/common/op_params.py deleted file mode 100644 index 673296f0f5..0000000000 --- a/common/op_params.py +++ /dev/null @@ -1,193 +0,0 @@ -#!/usr/bin/env python3 -import os -import json -from atomicwrites import atomic_write -from common.colors import COLORS -from common.basedir import BASEDIR -from selfdrive.hardware import TICI -try: - from common.realtime import sec_since_boot -except ImportError: - import time - sec_since_boot = time.time - -warning = lambda msg: print('{}opParams WARNING: {}{}'.format(COLORS.WARNING, msg, COLORS.ENDC)) -error = lambda msg: print('{}opParams ERROR: {}{}'.format(COLORS.FAIL, msg, COLORS.ENDC)) - -NUMBER = [float, int] # value types -NONE_OR_NUMBER = [type(None), float, int] - -BASEDIR = os.path.dirname(BASEDIR) -PARAMS_DIR = os.path.join(BASEDIR, 'community', 'params') -IMPORTED_PATH = os.path.join(PARAMS_DIR, '.imported') -OLD_PARAMS_FILE = os.path.join(BASEDIR, 'op_params.json') - - -class Param: - def __init__(self, default, allowed_types=[], description=None, *, static=False, live=False, hidden=False): # pylint: disable=dangerous-default-value - self.default_value = default # value first saved and returned if actual value isn't a valid type - if not isinstance(allowed_types, list): - allowed_types = [allowed_types] - self.allowed_types = allowed_types # allowed python value types for opEdit - self.description = description # description to be shown in opEdit - self.hidden = hidden # hide this param to user in opEdit - self.live = live # show under the live menu in opEdit - self.static = static # use cached value, never reads to update - self._create_attrs() - - def is_valid(self, value): - if not self.has_allowed_types: # always valid if no allowed types, otherwise checks to make sure - return True - return type(value) in self.allowed_types - - def _create_attrs(self): # Create attributes and check Param is valid - self.has_allowed_types = isinstance(self.allowed_types, list) and len(self.allowed_types) > 0 - self.has_description = self.description is not None - self.is_list = list in self.allowed_types - self.read_frequency = None if self.static else (1 if self.live else 10) # how often to read param file (sec) - self.last_read = -1 - if self.has_allowed_types: - assert type(self.default_value) in self.allowed_types, 'Default value type must be in specified allowed_types!' - if self.is_list: - self.allowed_types.remove(list) - - -def _read_param(key): # Returns None, False if a json error occurs - try: - with open(os.path.join(PARAMS_DIR, key), 'r') as f: - value = json.loads(f.read()) - return value, True - except json.decoder.JSONDecodeError: - return None, False - - -def _write_param(key, value): - param_path = os.path.join(PARAMS_DIR, key) - with atomic_write(param_path, overwrite=True) as f: - f.write(json.dumps(value)) - - -def _import_params(): - if os.path.exists(OLD_PARAMS_FILE) and not os.path.exists(IMPORTED_PATH): # if opParams needs to import from old params file - try: - with open(OLD_PARAMS_FILE, 'r') as f: - old_params = json.loads(f.read()) - for key in old_params: - _write_param(key, old_params[key]) - open(IMPORTED_PATH, 'w').close() - except: # pylint: disable=bare-except - pass - - -class opParams: - def __init__(self): - """ - To add your own parameter to opParams in your fork, simply add a new entry in self.fork_params, instancing a new Param class with at minimum a default value. - The allowed_types and description args are not required but highly recommended to help users edit their parameters with opEdit safely. - - The description value will be shown to users when they use opEdit to change the value of the parameter. - - The allowed_types arg is used to restrict what kinds of values can be entered with opEdit so that users can't crash openpilot with unintended behavior. - (setting a param intended to be a number with a boolean, or viceversa for example) - Limiting the range of floats or integers is still recommended when `.get`ting the parameter. - When a None value is allowed, use `type(None)` instead of None, as opEdit checks the type against the values in the arg with `isinstance()`. - - If you want your param to update within a second, specify live=True. If your param is designed to be read once, specify static=True. - Specifying neither will have the param update every 10 seconds if constantly .get() - If the param is not static, call the .get() function on it in the update function of the file you're reading from to use live updating - - Here's an example of a good fork_param entry: - self.fork_params = {'camera_offset': Param(0.06, allowed_types=NUMBER), live=True} # NUMBER allows both floats and ints - """ - - self.fork_params = { - 'SETME_X1': Param(1, NUMBER, 'Always 1', live=True), - 'SETME_X3': Param(1, NUMBER, 'Sometimes 3, mostly 1?', live=True), - 'PERCENTAGE': Param(100, NUMBER, '100 when not touching wheel, 0 when touching wheel', live=True), - 'SETME_X64': Param(100, NUMBER, 'Unsure', live=True), - 'ANGLE': Param(0, NUMBER, 'Rate limit? Lower is better?', live=True), - # 'LKA_REQUEST': Param(1, NUMBER, '1 when using LTA for LKA', live=True), - 'BIT': Param(0, NUMBER, '1: LTA, 2: LTA for lane keeping', live=True), - } - - self._to_delete = [] # a list of unused params you want to delete from users' params file - self._to_reset = [] # a list of params you want reset to their default values - self._run_init() # restores, reads, and updates params - - def _run_init(self): # does first time initializing of default params - # Two required parameters for opEdit - self.fork_params['username'] = Param(None, [type(None), str, bool], 'Your identifier provided with any crash logs sent to Sentry.\nHelps the developer reach out to you if anything goes wrong') - self.fork_params['op_edit_live_mode'] = Param(False, bool, 'This parameter controls which mode opEdit starts in', hidden=True) - - self.params = self._load_params(can_import=True) - self._add_default_params() # adds missing params and resets values with invalid types to self.params - self._delete_and_reset() # removes old params - - def get(self, key=None, *, force_update=False): # key=None returns dict of all params - if key is None: - return self._get_all_params(to_update=force_update) - self._check_key_exists(key, 'get') - param_info = self.fork_params[key] - rate = param_info.read_frequency # will be None if param is static, so check below - - if (not param_info.static and sec_since_boot() - self.fork_params[key].last_read >= rate) or force_update: - value, success = _read_param(key) - self.fork_params[key].last_read = sec_since_boot() - if not success: # in case of read error, use default and overwrite param - value = param_info.default_value - _write_param(key, value) - self.params[key] = value - - if param_info.is_valid(value := self.params[key]): - return value # all good, returning user's value - print(warning('User\'s value type is not valid! Returning default')) # somehow... it should always be valid - return param_info.default_value # return default value because user's value of key is not in allowed_types to avoid crashing openpilot - - def put(self, key, value): - self._check_key_exists(key, 'put') - if not self.fork_params[key].is_valid(value): - raise Exception('opParams: Tried to put a value of invalid type!') - self.params.update({key: value}) - _write_param(key, value) - - def _load_params(self, can_import=False): - if not os.path.exists(PARAMS_DIR): - os.makedirs(PARAMS_DIR) - if can_import: - _import_params() # just imports old params. below we read them in - - params = {} - for key in os.listdir(PARAMS_DIR): # PARAMS_DIR is guaranteed to exist - if key.startswith('.') or key not in self.fork_params: - continue - value, success = _read_param(key) - if not success: - value = self.fork_params[key].default_value - _write_param(key, value) - params[key] = value - return params - - def _get_all_params(self, to_update=False): - if to_update: - self.params = self._load_params() - return {k: self.params[k] for k, p in self.fork_params.items() if k in self.params and not p.hidden} - - def _check_key_exists(self, key, met): - if key not in self.fork_params: - raise Exception('opParams: Tried to {} an unknown parameter! Key not in fork_params: {}'.format(met, key)) - - def _add_default_params(self): - for key, param in self.fork_params.items(): - if key not in self.params: - self.params[key] = param.default_value - _write_param(key, self.params[key]) - elif not param.is_valid(self.params[key]): - print(warning('Value type of user\'s {} param not in allowed types, replacing with default!'.format(key))) - self.params[key] = param.default_value - _write_param(key, self.params[key]) - - def _delete_and_reset(self): - for key in list(self.params): - if key in self._to_delete: - del self.params[key] - os.remove(os.path.join(PARAMS_DIR, key)) - elif key in self._to_reset and key in self.fork_params: - self.params[key] = self.fork_params[key].default_value - _write_param(key, self.params[key]) diff --git a/op_edit.py b/op_edit.py deleted file mode 100755 index 212baad770..0000000000 --- a/op_edit.py +++ /dev/null @@ -1,325 +0,0 @@ -#!/usr/bin/env python3 -import time -from common.op_params import opParams -import ast -import difflib -from common.colors import COLORS - - -class opEdit: # use by running `python /data/openpilot/op_edit.py` - def __init__(self): - self.op_params = opParams() - self.params = None - self.sleep_time = 0.5 - self.live_tuning = self.op_params.get('op_edit_live_mode') - self.username = self.op_params.get('username') - self.type_colors = {int: COLORS.BASE(179), float: COLORS.BASE(179), - bool: {False: COLORS.RED, True: COLORS.OKGREEN}, - type(None): COLORS.BASE(177), - str: COLORS.BASE(77)} - - self.last_choice = None - - self.run_init() - - def run_init(self): - if self.username is None: - self.success('\nWelcome to the {}opParams{} command line editor!'.format(COLORS.CYAN, COLORS.SUCCESS), sleep_time=0) - self.prompt('Would you like to add your Discord username for easier crash debugging for the fork owner?') - self.prompt('Your username is only used for reaching out if a crash occurs.') - - username_choice = self.input_with_options(['Y', 'N', 'don\'t ask again'], default='n')[0] - if username_choice == 0: - self.prompt('Enter a unique identifer/Discord username:') - username = '' - while username == '': - username = input('>> ').strip() - self.op_params.put('username', username) - self.username = username - self.success('Thanks! Saved your username\n' - 'Edit the \'username\' parameter at any time to update', sleep_time=1.5) - elif username_choice == 2: - self.op_params.put('username', False) - self.info('Got it, bringing you into opEdit\n' - 'Edit the \'username\' parameter at any time to update', sleep_time=1.0) - else: - self.success('\nWelcome to the {}opParams{} command line editor, {}!'.format(COLORS.CYAN, COLORS.SUCCESS, self.username), sleep_time=0) - - self.run_loop() - - def run_loop(self): - while True: - if not self.live_tuning: - self.info('Here are all your parameters:', sleep_time=0) - self.info('(non-static params update while driving)', end='\n', sleep_time=0) - else: - self.info('Here are your live parameters:', sleep_time=0) - self.info('(changes take effect within a second)', end='\n', sleep_time=0) - self.params = self.op_params.get(force_update=True) - if self.live_tuning: # only display live tunable params - self.params = {k: v for k, v in self.params.items() if self.op_params.fork_params[k].live} - - values_list = [] - for k, v in self.params.items(): - if len(str(v)) < 20: - v = self.color_from_type(v) - else: - v = '{} ... {}'.format(str(v)[:30], str(v)[-15:]) - values_list.append(v) - - static = [COLORS.INFO + '(static)' + COLORS.ENDC if self.op_params.fork_params[k].static else '' for k in self.params] - - to_print = [] - blue_gradient = [33, 39, 45, 51, 87] - for idx, param in enumerate(self.params): - line = '{}. {}: {} {}'.format(idx + 1, param, values_list[idx], static[idx]) - if idx == self.last_choice and self.last_choice is not None: - line = COLORS.OKGREEN + line - else: - _color = blue_gradient[min(round(idx / len(self.params) * len(blue_gradient)), len(blue_gradient) - 1)] - line = COLORS.BASE(_color) + line - to_print.append(line) - - extras = {'l': ('Toggle live params', COLORS.WARNING), - 'e': ('Exit opEdit', COLORS.PINK)} - - to_print += ['---'] + ['{}. {}'.format(ext_col + e, ext_txt + COLORS.ENDC) for e, (ext_txt, ext_col) in extras.items()] - print('\n'.join(to_print)) - self.prompt('\nChoose a parameter to edit (by index or name):') - - choice = input('>> ').strip().lower() - parsed, choice = self.parse_choice(choice, len(to_print) - len(extras)) - if parsed == 'continue': - continue - elif parsed == 'change': - self.last_choice = choice - self.change_parameter(choice) - elif parsed == 'live': - self.last_choice = None - self.live_tuning = not self.live_tuning - self.op_params.put('op_edit_live_mode', self.live_tuning) # for next opEdit startup - elif parsed == 'exit': - return - - def parse_choice(self, choice, opt_len): - if choice.isdigit(): - choice = int(choice) - choice -= 1 - if choice not in range(opt_len): # number of options to choose from - self.error('Not in range!') - return 'continue', choice - return 'change', choice - - if choice in ['l', 'live']: # live tuning mode - return 'live', choice - elif choice in ['exit', 'e', '']: - self.error('Exiting opEdit!', sleep_time=0) - return 'exit', choice - else: # find most similar param to user's input - param_sims = [(idx, self.str_sim(choice, param.lower())) for idx, param in enumerate(self.params)] - param_sims = [param for param in param_sims if param[1] > 0.33] - if len(param_sims) > 0: - chosen_param = sorted(param_sims, key=lambda param: param[1], reverse=True)[0] - return 'change', chosen_param[0] # return idx - - self.error('Invalid choice!') - return 'continue', choice - - def str_sim(self, a, b): - return difflib.SequenceMatcher(a=a, b=b).ratio() - - def change_parameter(self, choice): - while True: - chosen_key = list(self.params)[choice] - param_info = self.op_params.fork_params[chosen_key] - - old_value = self.params[chosen_key] - if not param_info.static: - self.info2('Chosen parameter: {}{} (live!)'.format(chosen_key, COLORS.BASE(207)), sleep_time=0) - else: - self.info2('Chosen parameter: {}{} (static)'.format(chosen_key, COLORS.BASE(207)), sleep_time=0) - - to_print = [] - if param_info.has_description: - to_print.append(COLORS.OKGREEN + '>> Description: {}'.format(param_info.description.replace('\n', '\n > ')) + COLORS.ENDC) - if param_info.static: - to_print.append(COLORS.WARNING + '>> A reboot is required for changes to this parameter!' + COLORS.ENDC) - if not param_info.static and not param_info.live: - to_print.append(COLORS.WARNING + '>> Changes take effect within 10 seconds for this parameter!' + COLORS.ENDC) - if param_info.has_allowed_types: - to_print.append(COLORS.RED + '>> Allowed types: {}'.format(', '.join([at.__name__ for at in param_info.allowed_types])) + COLORS.ENDC) - to_print.append(COLORS.WARNING + '>> Default value: {}'.format(self.color_from_type(param_info.default_value)) + COLORS.ENDC) - - if to_print: - print('\n{}\n'.format('\n'.join(to_print))) - - if param_info.is_list: - self.change_param_list(old_value, param_info, chosen_key) # TODO: need to merge the code in this function with the below to reduce redundant code - return - - self.info('Current value: {}{} (type: {})'.format(self.color_from_type(old_value), COLORS.INFO, type(old_value).__name__), sleep_time=0) - - while True: - self.prompt('\nEnter your new value (enter to exit):') - new_value = input('>> ').strip() - if new_value == '': - self.info('Exiting this parameter...\n') - return - - new_value = self.str_eval(new_value) - if not param_info.is_valid(new_value): - self.error('The type of data you entered ({}) is not allowed with this parameter!'.format(type(new_value).__name__)) - continue - - if not param_info.static: # stay in live tuning interface - self.op_params.put(chosen_key, new_value) - self.success('Saved {} with value: {}{}! (type: {})'.format(chosen_key, self.color_from_type(new_value), COLORS.SUCCESS, type(new_value).__name__)) - else: # else ask to save and break - self.warning('\nOld value: {}{} (type: {})'.format(self.color_from_type(old_value), COLORS.WARNING, type(old_value).__name__)) - self.success('New value: {}{} (type: {})'.format(self.color_from_type(new_value), COLORS.OKGREEN, type(new_value).__name__), sleep_time=0) - self.prompt('\nDo you want to save this?') - if self.input_with_options(['Y', 'N'], 'N')[0] == 0: - self.op_params.put(chosen_key, new_value) - self.success('Saved!') - else: - self.info('Not saved!') - return - - def change_param_list(self, old_value, param_info, chosen_key): - while True: - self.info('Current value: {} (type: {})'.format(old_value, type(old_value).__name__), sleep_time=0) - self.prompt('\nEnter index to edit (0 to {}):'.format(len(old_value) - 1)) - choice_idx = self.str_eval(input('>> ')) - if choice_idx == '': - self.info('Exiting this parameter...') - return - - if not isinstance(choice_idx, int) or choice_idx not in range(len(old_value)): - self.error('Must be an integar within list range!') - continue - - while True: - self.info('Chosen index: {}'.format(choice_idx), sleep_time=0) - self.info('Value: {} (type: {})'.format(old_value[choice_idx], type(old_value[choice_idx]).__name__), sleep_time=0) - self.prompt('\nEnter your new value:') - new_value = input('>> ').strip() - if new_value == '': - self.info('Exiting this list item...') - break - - new_value = self.str_eval(new_value) - if not param_info.is_valid(new_value): - self.error('The type of data you entered ({}) is not allowed with this parameter!'.format(type(new_value).__name__)) - continue - - old_value[choice_idx] = new_value - - self.op_params.put(chosen_key, old_value) - self.success('Saved {} with value: {}{}! (type: {})'.format(chosen_key, self.color_from_type(new_value), COLORS.SUCCESS, type(new_value).__name__), end='\n') - break - - def color_from_type(self, v): - v_color = '' - if type(v) in self.type_colors: - v_color = self.type_colors[type(v)] - if isinstance(v, bool): - v_color = v_color[v] - v = '{}{}{}'.format(v_color, v, COLORS.ENDC) - return v - - def cyan(self, msg, end=''): - msg = self.str_color(msg, style='cyan') - # print(msg, flush=True, end='\n' + end) - return msg - - def prompt(self, msg, end=''): - msg = self.str_color(msg, style='prompt') - print(msg, flush=True, end='\n' + end) - - def warning(self, msg, end=''): - msg = self.str_color(msg, style='warning') - print(msg, flush=True, end='\n' + end) - - def info(self, msg, sleep_time=None, end=''): - if sleep_time is None: - sleep_time = self.sleep_time - msg = self.str_color(msg, style='info') - - print(msg, flush=True, end='\n' + end) - time.sleep(sleep_time) - - def info2(self, msg, sleep_time=None, end=''): - if sleep_time is None: - sleep_time = self.sleep_time - msg = self.str_color(msg, style=86) - - print(msg, flush=True, end='\n' + end) - time.sleep(sleep_time) - - def error(self, msg, sleep_time=None, end='', surround=True): - if sleep_time is None: - sleep_time = self.sleep_time - msg = self.str_color(msg, style='fail', surround=surround) - - print(msg, flush=True, end='\n' + end) - time.sleep(sleep_time) - - def success(self, msg, sleep_time=None, end=''): - if sleep_time is None: - sleep_time = self.sleep_time - msg = self.str_color(msg, style='success') - - print(msg, flush=True, end='\n' + end) - time.sleep(sleep_time) - - @staticmethod - def str_color(msg, style, surround=False): - if style == 'success': - style = COLORS.SUCCESS - elif style == 'fail': - style = COLORS.FAIL - elif style == 'prompt': - style = COLORS.PROMPT - elif style == 'info': - style = COLORS.INFO - elif style == 'cyan': - style = COLORS.CYAN - elif style == 'warning': - style = COLORS.WARNING - elif isinstance(style, int): - style = COLORS.BASE(style) - - if surround: - msg = '{}--------\n{}\n{}--------{}'.format(style, msg, COLORS.ENDC + style, COLORS.ENDC) - else: - msg = '{}{}{}'.format(style, msg, COLORS.ENDC) - - return msg - - def input_with_options(self, options, default=None): - """ - Takes in a list of options and asks user to make a choice. - The most similar option list index is returned along with the similarity percentage from 0 to 1 - """ - user_input = input('[{}]: '.format('/'.join(options))).lower().strip() - if not user_input: - return default, 0.0 - sims = [self.str_sim(i.lower().strip(), user_input) for i in options] - argmax = sims.index(max(sims)) - return argmax, sims[argmax] - - def str_eval(self, dat): - dat = dat.strip() - try: - dat = ast.literal_eval(dat) - except: - if dat.lower() == 'none': - dat = None - elif dat.lower() == 'false': - dat = False - elif dat.lower() == 'true': # else, assume string - dat = True - return dat - - -opEdit() diff --git a/release/files_common b/release/files_common index 7f746959c8..5b310d5c80 100644 --- a/release/files_common +++ b/release/files_common @@ -17,9 +17,6 @@ site_scons/site_tools/cython.py common/.gitignore common/__init__.py -common/op_params.py -common/colors.py -op_edit.py common/conversions.py common/gpio.py common/realtime.py