tizi: retry amp comms (#27735)

* tizi: retry amp comms

* ensure all config is written together

* simple test

* check errors

* test shutdown

* a tici exclusive

---------

Co-authored-by: Comma Device <device@comma.ai>
pull/27828/head
Adeeb Shihadeh 2 years ago committed by GitHub
parent 6381546075
commit 767ed4295f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      Jenkinsfile
  2. 43
      system/hardware/tici/amplifier.py
  3. 70
      system/hardware/tici/tests/test_amplifier.py

1
Jenkinsfile vendored

@ -135,6 +135,7 @@ pipeline {
["test sensord", "cd system/sensord/tests && python -m unittest test_sensord.py"],
["test camerad", "python system/camerad/test/test_camerad.py"],
["test exposure", "python system/camerad/test/test_exposure.py"],
["test amp", "python system/hardware/tici/tests/test_amplifier.py"],
])
}
}

@ -1,5 +1,7 @@
import time
from smbus2 import SMBus
from collections import namedtuple
from typing import List
# https://datasheets.maximintegrated.com/en/ds/MAX98089.pdf
@ -104,8 +106,12 @@ class Amplifier:
def __init__(self, debug=False):
self.debug = debug
def set_config(self, config):
def _get_shutdown_config(self, amp_disabled: bool) -> AmpConfig:
return AmpConfig("Global shutdown", 0b0 if amp_disabled else 0b1, 0x51, 7, 0b10000000)
def _set_configs(self, configs: List[AmpConfig]) -> None:
with SMBus(self.AMP_I2C_BUS) as bus:
for config in configs:
if self.debug:
print(f"Setting \"{config.name}\" to {config.value}:")
@ -116,19 +122,28 @@ class Amplifier:
if self.debug:
print(f" Changed {hex(config.register)}: {hex(old_value)} -> {hex(new_value)}")
def set_global_shutdown(self, amp_disabled):
self.set_config(AmpConfig("Global shutdown", 0b0 if amp_disabled else 0b1, 0x51, 7, 0b10000000))
def initialize_configuration(self, model):
self.set_global_shutdown(amp_disabled=True)
for config in BASE_CONFIG:
self.set_config(config)
for config in CONFIGS[model]:
self.set_config(config)
self.set_global_shutdown(amp_disabled=False)
def set_configs(self, configs: List[AmpConfig]) -> bool:
# retry in case panda is using the amp
for _ in range(10):
try:
self._set_configs(configs)
return True
except OSError:
print("Failed to set amp config, retrying...")
time.sleep(0.02)
return False
def set_global_shutdown(self, amp_disabled: bool) -> bool:
return self.set_configs([self._get_shutdown_config(amp_disabled), ])
def initialize_configuration(self, model: str) -> bool:
cfgs = [
self._get_shutdown_config(True),
*BASE_CONFIG,
*CONFIGS[model],
self._get_shutdown_config(False),
]
return self.set_configs(cfgs)
if __name__ == "__main__":

@ -0,0 +1,70 @@
#!/usr/bin/env python3
import time
import random
import unittest
import subprocess
from panda import Panda
from system.hardware import TICI
from system.hardware.tici.hardware import Tici
from system.hardware.tici.amplifier import Amplifier
class TestAmplifier(unittest.TestCase):
@classmethod
def setUpClass(cls):
if not TICI:
raise unittest.SkipTest
def setUp(self):
# clear dmesg
subprocess.check_call("sudo dmesg -C", shell=True)
self.panda = Panda()
self.panda.reset()
def tearDown(self):
self.panda.reset(reconnect=False)
def _check_for_i2c_errors(self, expected):
dmesg = subprocess.check_output("dmesg", shell=True, encoding='utf8')
i2c_lines = [l for l in dmesg.strip().splitlines() if 'i2c_geni a88000.i2c' in l]
i2c_str = '\n'.join(i2c_lines)
if not expected:
assert len(i2c_lines) == 0
else:
assert "i2c error :-107" in i2c_str or "Bus arbitration lost" in i2c_str
def test_init(self):
amp = Amplifier(debug=True)
r = amp.initialize_configuration(Tici().model)
assert r
self._check_for_i2c_errors(False)
def test_shutdown(self):
amp = Amplifier(debug=True)
for _ in range(10):
r = amp.set_global_shutdown(True)
r = amp.set_global_shutdown(False)
assert r
self._check_for_i2c_errors(False)
def test_init_while_siren_play(self):
for _ in range(5):
self.panda.set_siren(False)
time.sleep(0.1)
self.panda.set_siren(True)
time.sleep(random.randint(0, 5))
amp = Amplifier(debug=True)
r = amp.initialize_configuration(Tici().model)
assert r
# make sure we're a good test
self._check_for_i2c_errors(True)
if __name__ == "__main__":
unittest.main()
Loading…
Cancel
Save