You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							49 lines
						
					
					
						
							1.2 KiB
						
					
					
				
			
		
		
	
	
							49 lines
						
					
					
						
							1.2 KiB
						
					
					
				import os
 | 
						|
import shutil
 | 
						|
import uuid
 | 
						|
 | 
						|
from typing import List, Optional
 | 
						|
 | 
						|
from common.params import Params
 | 
						|
 | 
						|
class OpenpilotPrefix(object):
 | 
						|
  def __init__(self, prefix: Optional[str] = None, clean_dirs_on_exit: bool = True):
 | 
						|
    self.prefix = prefix if prefix else str(uuid.uuid4())
 | 
						|
    self.msgq_path = os.path.join('/dev/shm', self.prefix)
 | 
						|
    self.clean_dirs_on_exit = clean_dirs_on_exit
 | 
						|
 | 
						|
  def __enter__(self):
 | 
						|
    os.environ['OPENPILOT_PREFIX'] = self.prefix
 | 
						|
    try:
 | 
						|
      os.mkdir(self.msgq_path)
 | 
						|
    except FileExistsError:
 | 
						|
      pass
 | 
						|
 | 
						|
    return self
 | 
						|
 | 
						|
  def __exit__(self, exc_type, exc_obj, exc_tb):
 | 
						|
    if self.clean_dirs_on_exit:
 | 
						|
      self.clean_dirs()
 | 
						|
    del os.environ['OPENPILOT_PREFIX']
 | 
						|
    return False
 | 
						|
 | 
						|
  def clean_dirs(self):
 | 
						|
    symlink_path = Params().get_param_path()
 | 
						|
    if os.path.exists(symlink_path):
 | 
						|
      shutil.rmtree(os.path.realpath(symlink_path), ignore_errors=True)
 | 
						|
      os.remove(symlink_path)
 | 
						|
    shutil.rmtree(self.msgq_path, ignore_errors=True)
 | 
						|
 | 
						|
 | 
						|
class DummySocket:
 | 
						|
  def __init__(self):
 | 
						|
    self.data: List[bytes] = []
 | 
						|
 | 
						|
  def receive(self, non_blocking: bool = False) -> Optional[bytes]:
 | 
						|
    if non_blocking:
 | 
						|
      return None
 | 
						|
 | 
						|
    return self.data.pop()
 | 
						|
 | 
						|
  def send(self, data: bytes):
 | 
						|
    self.data.append(data)
 | 
						|
 |