# cython: language_level=3 # cython: profile=False # distutils: language = c # -*- coding: future_fstrings -*- # # Copyright 2019 Gianluca Frison, Dimitris Kouzoupis, Robin Verschueren, # Andrea Zanelli, Niels van Duijkeren, Jonathan Frey, Tommaso Sartor, # Branimir Novoselnik, Rien Quirynen, Rezart Qelibari, Dang Doan, # Jonas Koenemann, Yutao Chen, Tobias Schöls, Jonas Schlagenhauf, Moritz Diehl # # This file is part of acados. # # The 2-Clause BSD License # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # 1. Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # # 2. Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE.; # cimport cython from libc cimport string from libc cimport bool cimport numpy as np from cpython cimport array cimport acados_solver_common cimport acados_solver import os import json import numpy as np import array from datetime import datetime # from .utils import np_array_to_list cdef class AcadosOcpSolverFast: """ Class to interact with the acados ocp solver C object. :param acados_ocp: type AcadosOcp - description of the OCP for acados :param json_file: name for the json file used to render the templated code - default: acados_ocp_nlp.json :param simulink_opts: Options to configure Simulink S-function blocks, mainly to activate possible Inputs and Outputs """ cdef acados_solver.nlp_solver_capsule *capsule cdef void *nlp_opts cdef acados_solver_common.ocp_nlp_dims *nlp_dims cdef acados_solver_common.ocp_nlp_config *nlp_config cdef acados_solver_common.ocp_nlp_out *nlp_out cdef acados_solver_common.ocp_nlp_in *nlp_in cdef acados_solver_common.ocp_nlp_solver *nlp_solver cdef str model_name cdef int N cdef bint solver_created def __cinit__(self, str model_name, int N, str code_export_dir): self.model_name = model_name self.N = N self.solver_created = False # create capsule self.capsule = acados_solver.acados_create_capsule() # create solver assert acados_solver.acados_create(self.capsule) == 0 self.solver_created = True # get pointers solver self.nlp_opts = acados_solver.acados_get_nlp_opts(self.capsule) self.nlp_dims = acados_solver.acados_get_nlp_dims(self.capsule) self.nlp_config = acados_solver.acados_get_nlp_config(self.capsule) self.nlp_out = acados_solver.acados_get_nlp_out(self.capsule) self.nlp_in = acados_solver.acados_get_nlp_in(self.capsule) self.nlp_solver = acados_solver.acados_get_nlp_solver(self.capsule) def solve(self): """ Solve the ocp with current input. """ status = acados_solver.acados_solve(self.capsule) return status def get_slice(self, int start_stage_, int end_stage_, str field_): field = field_.encode('utf-8') dims = acados_solver_common.ocp_nlp_dims_get_from_attr(self.nlp_config, self.nlp_dims, self.nlp_out, start_stage_, field) out = np.ascontiguousarray(np.zeros((end_stage_ - start_stage_, dims)), dtype=np.float64) self.fill_in_slice(start_stage_, end_stage_, field_, out) return out def fill_in_slice(self, int start_stage_, int end_stage_, str field_, np.ndarray[np.float64_t, ndim=2, mode='c'] arr_): out_fields = ['x', 'u', 'z', 'pi', 'lam', 't'] mem_fields = ['sl', 'su'] field = field_.encode('utf-8') if (field_ not in out_fields + mem_fields): raise Exception('AcadosOcpSolver.get_slice(): {} is an invalid argument.\ \n Possible values are {}. Exiting.'.format(field_, out_fields)) if start_stage_ >= end_stage_: raise Exception('AcadosOcpSolver.get_slice(): end stage index must be larger than start stage index') if start_stage_ < 0 or end_stage_ > self.N + 1: raise Exception('AcadosOcpSolver.get_slice(): stage index must be in [0, N], got: {}.'.format(self.N)) cdef np.ndarray[np.float64_t, ndim=2, mode='c'] arr = np.ascontiguousarray(arr_, dtype=np.double) if (field_ in out_fields): acados_solver_common.ocp_nlp_out_get_slice(self.nlp_config, self.nlp_dims, self.nlp_out, start_stage_, end_stage_, field, arr.data) elif field_ in mem_fields: raise NotImplementedError() # acados_solver_common.ocp_nlp_get_at_stage(self.nlp_config, self.nlp_dims, self.nlp_solver, start_stage_, end_stage_, # field, arr.data) def get(self, stage_, field_): return self.get_slice(stage_, stage_ + 1, field_)[0] def print_statistics(self): """ prints statistics of previous solver run as a table: - iter: iteration number - res_stat: stationarity residual - res_eq: residual wrt equality constraints (dynamics) - res_ineq: residual wrt inequality constraints (constraints) - res_comp: residual wrt complementarity conditions - qp_stat: status of QP solver - qp_iter: number of QP iterations - qp_res_stat: stationarity residual of the last QP solution - qp_res_eq: residual wrt equality constraints (dynamics) of the last QP solution - qp_res_ineq: residual wrt inequality constraints (constraints) of the last QP solution - qp_res_comp: residual wrt complementarity conditions of the last QP solution """ stat = self.get_stats("statistics") if self.acados_ocp.solver_options.nlp_solver_type == 'SQP': print('\niter\tres_stat\tres_eq\t\tres_ineq\tres_comp\tqp_stat\tqp_iter') if stat.shape[0]>7: print('\tqp_res_stat\tqp_res_eq\tqp_res_ineq\tqp_res_comp') for jj in range(stat.shape[1]): print('{:d}\t{:e}\t{:e}\t{:e}\t{:e}\t{:d}\t{:d}'.format( \ int(stat[0][jj]), stat[1][jj], stat[2][jj], \ stat[3][jj], stat[4][jj], int(stat[5][jj]), int(stat[6][jj]))) if stat.shape[0]>7: print('\t{:e}\t{:e}\t{:e}\t{:e}'.format( \ stat[7][jj], stat[8][jj], stat[9][jj], stat[10][jj])) print('\n') elif self.acados_ocp.solver_options.nlp_solver_type == 'SQP_RTI': print('\niter\tqp_stat\tqp_iter') if stat.shape[0]>3: print('\tqp_res_stat\tqp_res_eq\tqp_res_ineq\tqp_res_comp') for jj in range(stat.shape[1]): print('{:d}\t{:d}\t{:d}'.format( int(stat[0][jj]), int(stat[1][jj]), int(stat[2][jj]))) if stat.shape[0]>3: print('\t{:e}\t{:e}\t{:e}\t{:e}'.format( \ stat[3][jj], stat[4][jj], stat[5][jj], stat[6][jj])) print('\n') return def store_iterate(self, filename='', overwrite=False): """ Stores the current iterate of the ocp solver in a json file. :param filename: if not set, use model_name + timestamp + '.json' :param overwrite: if false and filename exists add timestamp to filename """ if filename == '': filename += self.model_name + '_' + 'iterate' + '.json' if not overwrite: # append timestamp if os.path.isfile(filename): filename = filename[:-5] filename += datetime.utcnow().strftime('%Y-%m-%d-%H:%M:%S.%f') + '.json' # get iterate: solution = dict() for i in range(self.N+1): solution['x_'+str(i)] = self.get(i,'x') solution['u_'+str(i)] = self.get(i,'u') solution['z_'+str(i)] = self.get(i,'z') solution['lam_'+str(i)] = self.get(i,'lam') solution['t_'+str(i)] = self.get(i, 't') solution['sl_'+str(i)] = self.get(i, 'sl') solution['su_'+str(i)] = self.get(i, 'su') for i in range(self.N): solution['pi_'+str(i)] = self.get(i,'pi') # save raise NotImplementedError() # with open(filename, 'w') as f: # json.dump(solution, f, default=np_array_to_list, indent=4, sort_keys=True) # print("stored current iterate in ", os.path.join(os.getcwd(), filename)) def load_iterate(self, filename): """ Loads the iterate stored in json file with filename into the ocp solver. """ if not os.path.isfile(filename): raise Exception('load_iterate: failed, file does not exist: ' + os.path.join(os.getcwd(), filename)) with open(filename, 'r') as f: solution = json.load(f) for key in solution.keys(): (field, stage) = key.split('_') self.set(int(stage), field, np.array(solution[key])) def get_stats(self, field_): """ Get the information of the last solver call. :param field: string in ['statistics', 'time_tot', 'time_lin', 'time_sim', 'time_sim_ad', 'time_sim_la', 'time_qp', 'time_qp_solver_call', 'time_reg', 'sqp_iter'] """ fields = ['time_tot', # total cpu time previous call 'time_lin', # cpu time for linearization 'time_sim', # cpu time for integrator 'time_sim_ad', # cpu time for integrator contribution of external function calls 'time_sim_la', # cpu time for integrator contribution of linear algebra 'time_qp', # cpu time qp solution 'time_qp_solver_call', # cpu time inside qp solver (without converting the QP) 'time_qp_xcond', 'time_glob', # cpu time globalization 'time_reg', # cpu time regularization 'sqp_iter', # number of SQP iterations 'qp_iter', # vector of QP iterations for last SQP call 'statistics', # table with info about last iteration 'stat_m', 'stat_n',] field = field_ field = field.encode('utf-8') if (field_ not in fields): raise Exception('AcadosOcpSolver.get_stats(): {} is not a valid argument.\ \n Possible values are {}. Exiting.'.format(fields, fields)) if field_ in ['sqp_iter', 'stat_m', 'stat_n']: out = np.ascontiguousarray(np.zeros((1,)), dtype=np.int64) out_data = out.data elif field_ == 'statistics': sqp_iter = self.get_stats("sqp_iter") stat_m = self.get_stats("stat_m") stat_n = self.get_stats("stat_n") min_size = min([stat_m, sqp_iter+1]) out = np.ascontiguousarray(np.zeros((stat_n[0]+1, min_size[0])), dtype=np.float64) out_data = out.data elif field_ == 'qp_iter': full_stats = self.get_stats('statistics') if self.acados_ocp.solver_options.nlp_solver_type == 'SQP': out = full_stats[6, :] elif self.acados_ocp.solver_options.nlp_solver_type == 'SQP_RTI': out = full_stats[2, :] else: out = np.ascontiguousarray(np.zeros((1,)), dtype=np.float64) out_data = out.data if not field_ == 'qp_iter': acados_solver_common.ocp_nlp_get(self.nlp_config, self.nlp_solver, field, out_data) return out def get_cost(self): """ Returns the cost value of the current solution. """ # compute cost internally acados_solver_common.ocp_nlp_eval_cost(self.nlp_solver, self.nlp_in, self.nlp_out) # create output array out = np.ascontiguousarray(np.zeros((1,)), dtype=np.float64) # call getter field = "cost_value".encode('utf-8') acados_solver_common.ocp_nlp_get(self.nlp_config, self.nlp_solver, field, out.data) return out[0] def get_residuals(self): """ Returns an array of the form [res_stat, res_eq, res_ineq, res_comp]. """ # compute residuals if RTI if self.acados_ocp.solver_options.nlp_solver_type == 'SQP_RTI': acados_solver_common.ocp_nlp_eval_residuals(self.nlp_solver, self.nlp_in, self.nlp_out) # create output array out = np.ascontiguousarray(np.zeros((4, 1)), dtype=np.float64) # call getters field = "res_stat".encode('utf-8') acados_solver_common.ocp_nlp_get(self.nlp_config, self.nlp_solver, field, out.data) field = "res_eq".encode('utf-8') acados_solver_common.ocp_nlp_get(self.nlp_config, self.nlp_solver, field, out[1].data) field = "res_ineq".encode('utf-8') acados_solver_common.ocp_nlp_get(self.nlp_config, self.nlp_solver, field, out[2].data) field = "res_comp".encode('utf-8') acados_solver_common.ocp_nlp_get(self.nlp_config, self.nlp_solver, field, out[3].data) return out.flatten() # Note: this function should not be used anymore, better use cost_set, constraints_set def set(self, int stage, str field_, value_): """ Set numerical data inside the solver. :param stage: integer corresponding to shooting node :param field: string in ['x', 'u', 'pi', 'lam', 't', 'p'] .. note:: regarding lam, t: \n the inequalities are internally organized in the following order: \n [ lbu lbx lg lh lphi ubu ubx ug uh uphi; \n lsbu lsbx lsg lsh lsphi usbu usbx usg ush usphi] .. note:: pi: multipliers for dynamics equality constraints \n lam: multipliers for inequalities \n t: slack variables corresponding to evaluation of all inequalities (at the solution) \n sl: slack variables of soft lower inequality constraints \n su: slack variables of soft upper inequality constraints \n """ cost_fields = ['y_ref', 'yref'] constraints_fields = ['lbx', 'ubx', 'lbu', 'ubu'] out_fields = ['x', 'u', 'pi', 'lam', 't', 'z'] mem_fields = ['sl', 'su'] # cast value_ to avoid conversion issues if isinstance(value_, (float, int)): value_ = np.array([value_]) value_ = value_.astype(np.double) field = field_ field = field.encode('utf-8') if field_ not in constraints_fields + cost_fields + out_fields + mem_fields: raise Exception("AcadosOcpSolver.set(): {} is not a valid argument.\ \nPossible values are {}. Exiting.".format(field, \ constraints_fields + cost_fields + out_fields + ['p'])) dims = acados_solver_common.ocp_nlp_dims_get_from_attr(self.nlp_config, self.nlp_dims, self.nlp_out, stage, field) if value_.shape[0] != dims: msg = 'AcadosOcpSolver.set(): mismatching dimension for field "{}" '.format(field_) msg += 'with dimension {} (you have {})'.format(dims, value_.shape) raise Exception(msg) cdef np.ndarray[np.float64_t, ndim=1, mode='c'] value = np.ascontiguousarray(value_, dtype=np.double) value_data_p = value.data if field_ in constraints_fields: acados_solver_common.ocp_nlp_constraints_model_set(self.nlp_config, self.nlp_dims, self.nlp_in, stage, field, value_data_p) elif field_ in cost_fields: acados_solver_common.ocp_nlp_cost_model_set(self.nlp_config, self.nlp_dims, self.nlp_in, stage, field, value_data_p) elif field_ in out_fields: acados_solver_common.ocp_nlp_out_set(self.nlp_config, self.nlp_dims, self.nlp_out, stage, field, value_data_p) elif field_ in mem_fields: acados_solver_common.ocp_nlp_set(self.nlp_config, self.nlp_solver, stage, field, value_data_p) return def set_param(self, stage_, np.ndarray[np.float64_t, ndim=1] value_): cdef np.ndarray[np.float64_t, ndim=1, mode='c'] value = np.ascontiguousarray(value_, dtype=np.double) acados_solver.acados_update_params(self.capsule, stage_, value_.data, value_.shape[0]) def cost_set(self, start_stage_, field_, value_, api='warn'): self.cost_set_slice(start_stage_, start_stage_+1, field_, value_[None], api='warn') def cost_set_slice(self, start_stage_, end_stage_, field_, value_, api='warn'): """ Set numerical data in the cost module of the solver. :param stage: integer corresponding to shooting node :param field: string, e.g. 'yref', 'W', 'ext_cost_num_hess' :param value: of appropriate size """ # cast value_ to avoid conversion issues field = field_.encode('utf-8') if len(value_.shape) > 2: dim = value_.shape[1]*value_.shape[2] else: dim = value_.shape[1] value_ = value_[None,:,:] cdef np.ndarray[np.float64_t, ndim=3, mode='c'] value = np.ascontiguousarray(value_, dtype=np.double) acados_solver_common.ocp_nlp_cost_model_set_slice(self.nlp_config, self.nlp_dims, self.nlp_in, start_stage_, end_stage_, field, value.data, dim) def constraints_set(self, start_stage_, field_, value_, api='warn'): self.constraints_set_slice(start_stage_, start_stage_+1, field_, value_[None], api='warn') def constraints_set_slice(self, start_stage_, end_stage_, field_, value_, api='warn'): """ Set numerical data in the constraint module of the solver. :param stage: integer corresponding to shooting node :param field: string in ['lbx', 'ubx', 'lbu', 'ubu', 'lg', 'ug', 'lh', 'uh', 'uphi'] :param value: of appropriate size """ field = field_.encode('utf-8') if len(value_.shape) > 2: dim = value_.shape[1]*value_.shape[2] else: dim = value_.shape[1] value_ = value_[None,:,:] cdef np.ndarray[np.float64_t, ndim=3, mode='c'] value = np.ascontiguousarray(value_, dtype=np.double) acados_solver_common.ocp_nlp_constraints_model_set_slice(self.nlp_config, self.nlp_dims, self.nlp_in, start_stage_, end_stage_, field, value.data, dim) def dynamics_get(self, int stage, field_): """ Get numerical data from the dynamics module of the solver: :param stage: integer corresponding to shooting node :param field: string, e.g. 'A' """ field = field_ field = field.encode('utf-8') # get dims cdef int[2] dims acados_solver_common.ocp_nlp_dynamics_dims_get_from_attr(self.nlp_config, self.nlp_dims, self.nlp_out, stage, field, &dims[0]) # create output data out = np.ascontiguousarray(np.zeros((dims[0]*dims[1],)), dtype=np.float64) out = out.reshape(dims[0], dims[1], order='F') # call getter acados_solver_common.ocp_nlp_get_at_stage(self.nlp_config, self.nlp_dims, self.nlp_solver, stage, field, out.data) return out def options_set(self, bytes field_, value_): """ Set options of the solver. :param field: string, e.g. 'print_level', 'rti_phase', 'initialize_t_slacks', 'step_length', 'alpha_min', 'alpha_reduction' :param value: of type int, float """ int_fields = ['print_level', 'rti_phase', 'initialize_t_slacks'] double_fields = ['step_length', 'tol_eq', 'tol_stat', 'tol_ineq', 'tol_comp', 'alpha_min', 'alpha_reduction'] string_fields = ['globalization'] # encode field = field_ field = field.encode('utf-8') cdef int int_value cdef double double_value cdef unsigned char[:] string_value # check field availability and type if field_ in int_fields: if not isinstance(value_, int): raise Exception('solver option {} must be of type int. You have {}.'.format(field_, type(value_))) if field_ == 'rti_phase': if value_ < 0 or value_ > 2: raise Exception('AcadosOcpSolver.solve(): argument \'rti_phase\' can ' 'take only values 0, 1, 2 for SQP-RTI-type solvers') if self.acados_ocp.solver_options.nlp_solver_type != 'SQP_RTI' and value_ > 0: raise Exception('AcadosOcpSolver.solve(): argument \'rti_phase\' can ' 'take only value 0 for SQP-type solvers') int_value = value_ acados_solver_common.ocp_nlp_solver_opts_set(self.nlp_config, self.nlp_opts, field, &int_value) elif field_ in double_fields: if not isinstance(value_, float): raise Exception('solver option {} must be of type float. You have {}.'.format(field_, type(value_))) double_value = value_ acados_solver_common.ocp_nlp_solver_opts_set(self.nlp_config, self.nlp_opts, field, &double_value) elif field_ in string_fields: if not isinstance(value_, bytes): raise Exception('solver option {} must be of type str. You have {}.'.format(field_, type(value_))) string_value = value_.encode('utf-8') acados_solver_common.ocp_nlp_solver_opts_set(self.nlp_config, self.nlp_opts, field, &string_value[0]) raise Exception('AcadosOcpSolver.options_set() does not support field {}.'\ '\n Possible values are {}.'.format(field_, ', '.join(int_fields + double_fields + string_fields))) def __del__(self): if self.solver_created: acados_solver.acados_free(self.capsule) acados_solver.acados_free_capsule(self.capsule) # try: # self.dlclose(self.shared_lib._handle) # except: # pass