process replay: prefix setup in helper (#24636)

* prefix setup in helper

* context manager

* name change + none default

* skip checking prefix

* type hints + unset env
old-commit-hash: d6fb78b9a3
taco
Lukas Petersson 3 years ago committed by GitHub
parent 97bb8e2702
commit bcc6ca0cf5
  1. 26
      selfdrive/test/process_replay/helpers.py
  2. 27
      selfdrive/test/process_replay/process_replay.py

@ -0,0 +1,26 @@
import os
import shutil
import uuid
from common.params import Params
class OpenpilotPrefix(object):
def __init__(self, prefix: str = None) -> None:
self.prefix = prefix if prefix else str(uuid.uuid4())
self.msgq_path = os.path.join('/dev/shm', self.prefix)
def __enter__(self):
os.environ['OPENPILOT_PREFIX'] = self.prefix
try:
os.mkdir(self.msgq_path)
except FileExistsError:
pass
def __exit__(self, exc_type, exc_obj, exc_tb):
symlink_path = Params().get_param_path()
if os.path.exists(symlink_path):
shutil.rmtree(os.path.realpath(symlink_path), ignore_errors=True)
os.remove(symlink_path)
shutil.rmtree(self.msgq_path, ignore_errors=True)
del os.environ['OPENPILOT_PREFIX']
return True

@ -4,9 +4,7 @@ import os
import sys
import threading
import time
import shutil
import signal
import uuid
from collections import namedtuple
import capnp
@ -18,6 +16,7 @@ from common.params import Params
from common.timeout import Timeout
from selfdrive.car.fingerprints import FW_VERSIONS
from selfdrive.car.car_helpers import get_car, interfaces
from selfdrive.test.process_replay.helpers import OpenpilotPrefix
from selfdrive.manager.process import PythonProcess
from selfdrive.manager.process_config import managed_processes
@ -337,35 +336,13 @@ CONFIGS = [
),
]
def setup_prefix():
os.environ['OPENPILOT_PREFIX'] = str(uuid.uuid4())
msgq_path = os.path.join('/dev/shm', os.environ['OPENPILOT_PREFIX'])
try:
os.mkdir(msgq_path)
except FileExistsError:
pass
def teardown_prefix():
if not os.environ.get("OPENPILOT_PREFIX", 0):
return
symlink_path = Params().get_param_path()
if os.path.exists(symlink_path):
shutil.rmtree(os.path.realpath(symlink_path), ignore_errors=True)
os.remove(symlink_path)
msgq_path = os.path.join('/dev/shm', os.environ['OPENPILOT_PREFIX'])
shutil.rmtree(msgq_path, ignore_errors=True)
def replay_process(cfg, lr, fingerprint=None):
setup_prefix()
try:
with OpenpilotPrefix():
if cfg.fake_pubsubmaster:
return python_replay_process(cfg, lr, fingerprint)
else:
return cpp_replay_process(cfg, lr, fingerprint)
finally:
teardown_prefix()
def setup_env(simulation=False):
params = Params()

Loading…
Cancel
Save