|
|
|
@ -2,6 +2,7 @@ |
|
|
|
|
import capnp |
|
|
|
|
import os |
|
|
|
|
import importlib |
|
|
|
|
import time |
|
|
|
|
import pytest |
|
|
|
|
import random |
|
|
|
|
import unittest |
|
|
|
@ -10,6 +11,8 @@ from typing import List, Optional, Tuple |
|
|
|
|
from parameterized import parameterized_class |
|
|
|
|
import hypothesis.strategies as st |
|
|
|
|
from hypothesis import HealthCheck, Phase, assume, given, settings, seed |
|
|
|
|
from pympler.tracker import SummaryTracker |
|
|
|
|
import gc |
|
|
|
|
|
|
|
|
|
from cereal import messaging, log, car |
|
|
|
|
from openpilot.common.basedir import BASEDIR |
|
|
|
@ -171,6 +174,7 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
|
@classmethod |
|
|
|
|
def tearDownClass(cls): |
|
|
|
|
del cls.can_msgs |
|
|
|
|
gc.collect() |
|
|
|
|
|
|
|
|
|
def setUp(self): |
|
|
|
|
# print('SETUP HEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHERE') |
|
|
|
@ -187,7 +191,18 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
|
self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}") |
|
|
|
|
self.safety.init_tests() |
|
|
|
|
|
|
|
|
|
@settings(max_examples=1000, deadline=None, |
|
|
|
|
# self.tracker = SummaryTracker() |
|
|
|
|
# for _ in range(5): |
|
|
|
|
# self.tracker.print_diff() |
|
|
|
|
|
|
|
|
|
# def tearDown(self): |
|
|
|
|
# self.tracker.print_diff() |
|
|
|
|
# # for _type, num_objects, total_size in self.tracker.diff(): |
|
|
|
|
# # print(_type, num_objects, total_size) |
|
|
|
|
# # # with self.subTest(_type=_type): |
|
|
|
|
# # # self.assertLess(total_size / 1024, 10, f'Object {_type} ({num_objects=}) grew larger than 10 kB while uploading file') |
|
|
|
|
|
|
|
|
|
@settings(max_examples=3000, deadline=None, |
|
|
|
|
phases=(Phase.reuse, Phase.generate, ), |
|
|
|
|
suppress_health_check=[HealthCheck.filter_too_much, HealthCheck.too_slow, HealthCheck.large_base_example], |
|
|
|
|
) |
|
|
|
@ -201,6 +216,7 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
|
# self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}") |
|
|
|
|
# self.safety.init_tests() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# bus = 0 # random.randint(0, 3) |
|
|
|
|
bus_offset = CanBusBase(None, fingerprint=self.fingerprint).offset |
|
|
|
|
bus = bus_offset |
|
|
|
@ -225,6 +241,7 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
|
|
|
|
|
|
msg_strategy = st.binary(min_size=size, max_size=size) |
|
|
|
|
msgs = data.draw(st.lists(msg_strategy, min_size=20)) |
|
|
|
|
# time.sleep(8) |
|
|
|
|
# print(len(msgs)) |
|
|
|
|
|
|
|
|
|
prev_panda_gas = self.safety.get_gas_pressed_prev() |
|
|
|
@ -246,6 +263,7 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
|
# self.safety.set_gas_interceptor_detected(False) |
|
|
|
|
print() |
|
|
|
|
|
|
|
|
|
# for i in range(100): |
|
|
|
|
to_send = libpanda_py.make_CANPacket(address, bus, dat) |
|
|
|
|
did_rx = self.safety.safety_rx_hook(to_send) |
|
|
|
|
|
|
|
|
@ -255,6 +273,7 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
|
|
|
|
|
|
CC = car.CarControl.new_message() |
|
|
|
|
CS = self.CI.update(CC, (can.to_bytes(),)) |
|
|
|
|
# continue |
|
|
|
|
|
|
|
|
|
# test multiple CAN packets as well as multiple messages per CAN packet |
|
|
|
|
# for (_dat1, _dat2) in ((dat1, dat2), (dat3, dat4)): |
|
|
|
|