openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

95 lines
2.9 KiB

5 years ago
import threading
import time
import tempfile
import shutil
import unittest
from common.params import Params, ParamKeyType, UnknownKeyName, put_nonblocking
5 years ago
class TestParams(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
print("using", self.tmpdir)
self.params = Params(self.tmpdir)
def tearDown(self):
shutil.rmtree(self.tmpdir)
def test_params_put_and_get(self):
self.params.put("DongleId", "cb38263377b873ee")
assert self.params.get("DongleId") == b"cb38263377b873ee"
def test_params_non_ascii(self):
st = b"\xe1\x90\xff"
self.params.put("CarParams", st)
assert self.params.get("CarParams") == st
def test_params_get_cleared_manager_start(self):
self.params.put("CarParams", "test")
self.params.put("DongleId", "cb38263377b873ee")
assert self.params.get("CarParams") == b"test"
self.params.clear_all(ParamKeyType.CLEAR_ON_MANAGER_START)
5 years ago
assert self.params.get("CarParams") is None
assert self.params.get("DongleId") is not None
def test_params_two_things(self):
self.params.put("DongleId", "bob")
self.params.put("AthenadPid", "123")
5 years ago
assert self.params.get("DongleId") == b"bob"
assert self.params.get("AthenadPid") == b"123"
5 years ago
def test_params_get_block(self):
def _delayed_writer():
time.sleep(0.1)
self.params.put("CarParams", "test")
threading.Thread(target=_delayed_writer).start()
assert self.params.get("CarParams") is None
assert self.params.get("CarParams", True) == b"test"
def test_params_unknown_key_fails(self):
with self.assertRaises(UnknownKeyName):
self.params.get("swag")
with self.assertRaises(UnknownKeyName):
self.params.get_bool("swag")
with self.assertRaises(UnknownKeyName):
self.params.put("swag", "abc")
with self.assertRaises(UnknownKeyName):
self.params.put_bool("swag", True)
def test_delete_not_there(self):
assert self.params.get("CarParams") is None
self.params.delete("CarParams")
assert self.params.get("CarParams") is None
def test_get_bool(self):
self.params.delete("IsMetric")
self.assertFalse(self.params.get_bool("IsMetric"))
self.params.put_bool("IsMetric", True)
self.assertTrue(self.params.get_bool("IsMetric"))
self.params.put_bool("IsMetric", False)
self.assertFalse(self.params.get_bool("IsMetric"))
self.params.put("IsMetric", "1")
self.assertTrue(self.params.get_bool("IsMetric"))
self.params.put("IsMetric", "0")
self.assertFalse(self.params.get_bool("IsMetric"))
def test_put_non_blocking_with_get_block(self):
q = Params(self.tmpdir)
def _delayed_writer():
time.sleep(0.1)
put_nonblocking("CarParams", "test", self.tmpdir)
threading.Thread(target=_delayed_writer).start()
assert q.get("CarParams") is None
assert q.get("CarParams", True) == b"test"
5 years ago
if __name__ == "__main__":
unittest.main()