You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							87 lines
						
					
					
						
							2.9 KiB
						
					
					
				
			
		
		
	
	
							87 lines
						
					
					
						
							2.9 KiB
						
					
					
				#!/usr/bin/env python3
 | 
						|
import json
 | 
						|
import os
 | 
						|
import tempfile
 | 
						|
import unittest
 | 
						|
from Crypto.PublicKey import RSA
 | 
						|
from pathlib import Path
 | 
						|
from unittest import mock
 | 
						|
 | 
						|
from common.params import Params
 | 
						|
from selfdrive.athena.registration import register, UNREGISTERED_DONGLE_ID
 | 
						|
from selfdrive.athena.tests.helpers import MockResponse
 | 
						|
 | 
						|
 | 
						|
class TestRegistration(unittest.TestCase):
 | 
						|
 | 
						|
  def setUp(self):
 | 
						|
    # clear params and setup key paths
 | 
						|
    self.params = Params()
 | 
						|
    self.params.clear_all()
 | 
						|
 | 
						|
    self.persist = tempfile.TemporaryDirectory()
 | 
						|
    os.mkdir(os.path.join(self.persist.name, "comma"))
 | 
						|
    self.priv_key = Path(os.path.join(self.persist.name, "comma/id_rsa"))
 | 
						|
    self.pub_key = Path(os.path.join(self.persist.name, "comma/id_rsa.pub"))
 | 
						|
    self.persist_patcher = mock.patch("selfdrive.athena.registration.PERSIST", self.persist.name)
 | 
						|
    self.persist_patcher.start()
 | 
						|
 | 
						|
  def tearDown(self):
 | 
						|
    self.persist_patcher.stop()
 | 
						|
    self.persist.cleanup()
 | 
						|
 | 
						|
  def _generate_keys(self):
 | 
						|
    self.pub_key.touch()
 | 
						|
    k = RSA.generate(2048)
 | 
						|
    with open(self.priv_key, "wb") as f:
 | 
						|
      f.write(k.export_key())
 | 
						|
    with open(self.pub_key, "wb") as f:
 | 
						|
      f.write(k.publickey().export_key())
 | 
						|
 | 
						|
  def test_valid_cache(self):
 | 
						|
    # if all params are written, return the cached dongle id
 | 
						|
    self.params.put("IMEI", "imei")
 | 
						|
    self.params.put("HardwareSerial", "serial")
 | 
						|
    self._generate_keys()
 | 
						|
 | 
						|
    with mock.patch("selfdrive.athena.registration.api_get", autospec=True) as m:
 | 
						|
      dongle = "DONGLE_ID_123"
 | 
						|
      self.params.put("DongleId", dongle)
 | 
						|
      self.assertEqual(register(), dongle)
 | 
						|
      self.assertFalse(m.called)
 | 
						|
 | 
						|
  def test_no_keys(self):
 | 
						|
    # missing pubkey
 | 
						|
    with mock.patch("selfdrive.athena.registration.api_get", autospec=True) as m:
 | 
						|
      dongle = register()
 | 
						|
      self.assertEqual(m.call_count, 0)
 | 
						|
      self.assertEqual(dongle, UNREGISTERED_DONGLE_ID)
 | 
						|
    self.assertEqual(self.params.get("DongleId", encoding='utf-8'), dongle)
 | 
						|
 | 
						|
  def test_missing_cache(self):
 | 
						|
    # keys exist but no dongle id
 | 
						|
    self._generate_keys()
 | 
						|
    with mock.patch("selfdrive.athena.registration.api_get", autospec=True) as m:
 | 
						|
      dongle = "DONGLE_ID_123"
 | 
						|
      m.return_value = MockResponse(json.dumps({'dongle_id': dongle}), 200)
 | 
						|
      self.assertEqual(register(), dongle)
 | 
						|
      self.assertEqual(m.call_count, 1)
 | 
						|
 | 
						|
      # call again, shouldn't hit the API this time
 | 
						|
      self.assertEqual(register(), dongle)
 | 
						|
      self.assertEqual(m.call_count, 1)
 | 
						|
    self.assertEqual(self.params.get("DongleId", encoding='utf-8'), dongle)
 | 
						|
 | 
						|
  def test_unregistered(self):
 | 
						|
    # keys exist, but unregistered
 | 
						|
    self._generate_keys()
 | 
						|
    with mock.patch("selfdrive.athena.registration.api_get", autospec=True) as m:
 | 
						|
      m.return_value = MockResponse(None, 402)
 | 
						|
      dongle = register()
 | 
						|
      self.assertEqual(m.call_count, 1)
 | 
						|
      self.assertEqual(dongle, UNREGISTERED_DONGLE_ID)
 | 
						|
    self.assertEqual(self.params.get("DongleId", encoding='utf-8'), dongle)
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
  unittest.main()
 | 
						|
 |