openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

144 lines
5.1 KiB

import os
import zmq
from cereal import car
from common.params import Params
from common.basedir import BASEDIR
from selfdrive.car.fingerprints import eliminate_incompatible_cars, all_known_cars
from selfdrive.car.vin import get_vin, VIN_UNKNOWN
from selfdrive.swaglog import cloudlog
import selfdrive.messaging as messaging
from selfdrive.car import gen_empty_fingerprint
def get_one_can(logcan):
while True:
try:
can = messaging.recv_one(logcan)
if len(can.can) > 0:
return can
except zmq.error.Again:
continue
def get_startup_alert(car_recognized, controller_available):
alert = 'startup'
if not car_recognized:
alert = 'startupNoCar'
elif car_recognized and not controller_available:
alert = 'startupNoControl'
return alert
def load_interfaces(brand_names):
ret = {}
for brand_name in brand_names:
path = ('selfdrive.car.%s' % brand_name)
CarInterface = __import__(path + '.interface', fromlist=['CarInterface']).CarInterface
if os.path.exists(BASEDIR + '/' + path.replace('.', '/') + '/carcontroller.py'):
CarController = __import__(path + '.carcontroller', fromlist=['CarController']).CarController
else:
CarController = None
for model_name in brand_names[brand_name]:
ret[model_name] = (CarInterface, CarController)
return ret
def _get_interface_names():
# read all the folders in selfdrive/car and return a dict where:
# - keys are all the car names that which we have an interface for
# - values are lists of spefic car models for a given car
brand_names = {}
for car_folder in [x[0] for x in os.walk(BASEDIR + '/selfdrive/car')]:
try:
brand_name = car_folder.split('/')[-1]
model_names = __import__('selfdrive.car.%s.values' % brand_name, fromlist=['CAR']).CAR
model_names = [getattr(model_names, c) for c in model_names.__dict__.keys() if not c.startswith("__")]
brand_names[brand_name] = model_names
except (ImportError, IOError):
pass
return brand_names
# imports from directory selfdrive/car/<name>/
interfaces = load_interfaces(_get_interface_names())
def only_toyota_left(candidate_cars):
return all(("TOYOTA" in c or "LEXUS" in c) for c in candidate_cars) and len(candidate_cars) > 0
# BOUNTY: every added fingerprint in selfdrive/car/*/values.py is a $100 coupon code on shop.comma.ai
# **** for use live only ****
def fingerprint(logcan, sendcan, has_relay):
params = Params()
car_params = params.get("CarParams")
if car_params is not None:
# use already stored VIN: a new VIN query cannot be done, since panda isn't in ELM327 mode
car_params = car.CarParams.from_bytes(car_params)
vin = VIN_UNKNOWN if car_params.carVin == "" else car_params.carVin
elif has_relay:
# Vin query only reliably works thorugh OBDII
vin = get_vin(logcan, sendcan, 1)
else:
vin = VIN_UNKNOWN
cloudlog.warning("VIN %s", vin)
Params().put("CarVin", vin)
finger = gen_empty_fingerprint()
candidate_cars = {i: all_known_cars() for i in [0, 1]} # attempt fingerprint on both bus 0 and 1
frame = 0
frame_fingerprint = 10 # 0.1s
car_fingerprint = None
done = False
while not done:
a = get_one_can(logcan)
for can in a.can:
# need to independently try to fingerprint both bus 0 and 1 to work
# for the combo black_panda and honda_bosch. Ignore extended messages
# and VIN query response.
# Include bus 2 for toyotas to disambiguate cars using camera messages
# (ideally should be done for all cars but we can't for Honda Bosch)
if can.src in range(0, 4):
finger[can.src][can.address] = len(can.dat)
for b in candidate_cars:
if (can.src == b or (only_toyota_left(candidate_cars[b]) and can.src == 2)) and \
can.address < 0x800 and can.address not in [0x7df, 0x7e0, 0x7e8]:
candidate_cars[b] = eliminate_incompatible_cars(can, candidate_cars[b])
# if we only have one car choice and the time since we got our first
# message has elapsed, exit
for b in candidate_cars:
# Toyota needs higher time to fingerprint, since DSU does not broadcast immediately
if only_toyota_left(candidate_cars[b]):
frame_fingerprint = 100 # 1s
if len(candidate_cars[b]) == 1:
if frame > frame_fingerprint:
# fingerprint done
car_fingerprint = candidate_cars[b][0]
# bail if no cars left or we've been waiting for more than 2s
failed = all(len(cc) == 0 for cc in candidate_cars.values()) or frame > 200
succeeded = car_fingerprint is not None
done = failed or succeeded
frame += 1
cloudlog.warning("fingerprinted %s", car_fingerprint)
return car_fingerprint, finger, vin
def get_car(logcan, sendcan, has_relay=False):
candidate, fingerprints, vin = fingerprint(logcan, sendcan, has_relay)
if candidate is None:
cloudlog.warning("car doesn't match any fingerprints: %r", fingerprints)
candidate = "mock"
CarInterface, CarController = interfaces[candidate]
car_params = CarInterface.get_params(candidate, fingerprints, vin, has_relay)
return CarInterface(car_params, CarController), car_params