openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

103 lines
2.7 KiB

import os
import shutil
import tempfile
from atomicwrites import AtomicWriter
def mkdirs_exists_ok(path):
try:
os.makedirs(path)
except OSError:
if not os.path.isdir(path):
raise
def rm_not_exists_ok(path):
try:
os.remove(path)
except OSError:
if os.path.exists(path):
raise
def rm_tree_or_link(path):
if os.path.islink(path):
os.unlink(path)
elif os.path.isdir(path):
shutil.rmtree(path)
def get_tmpdir_on_same_filesystem(path):
# TODO(mgraczyk): HACK, we should actually check for which filesystem.
normpath = os.path.normpath(path)
parts = normpath.split("/")
if len(parts) > 1:
if parts[1].startswith("raid"):
if len(parts) > 2 and parts[2] == "runner":
return "/{}/runner/tmp".format(parts[1])
elif len(parts) > 2 and parts[2] == "aws":
return "/{}/aws/tmp".format(parts[1])
else:
return "/{}/tmp".format(parts[1])
elif parts[1] == "aws":
return "/aws/tmp"
elif parts[1] == "scratch":
return "/scratch/tmp"
return "/tmp"
class AutoMoveTempdir(object):
def __init__(self, target_path, temp_dir=None):
self._target_path = target_path
self._path = tempfile.mkdtemp(dir=temp_dir)
@property
def name(self):
return self._path
def close(self):
os.rename(self._path, self._target_path)
def __enter__(self): return self
def __exit__(self, type, value, traceback):
if type is None:
self.close()
else:
shutil.rmtree(self._path)
class NamedTemporaryDir(object):
def __init__(self, temp_dir=None):
self._path = tempfile.mkdtemp(dir=temp_dir)
@property
def name(self):
return self._path
def close(self):
shutil.rmtree(self._path)
def __enter__(self): return self
def __exit__(self, type, value, traceback):
self.close()
def _get_fileobject_func(writer, temp_dir):
def _get_fileobject():
file_obj = writer.get_fileobject(dir=temp_dir)
os.chmod(file_obj.name, 0o644)
return file_obj
return _get_fileobject
def atomic_write_on_fs_tmp(path, **kwargs):
"""Creates an atomic writer using a temporary file in a temporary directory
on the same filesystem as path.
"""
# TODO(mgraczyk): This use of AtomicWriter relies on implementation details to set the temp
# directory.
writer = AtomicWriter(path, **kwargs)
return writer._open(_get_fileobject_func(writer, get_tmpdir_on_same_filesystem(path)))
def atomic_write_in_dir(path, **kwargs):
"""Creates an atomic writer using a temporary file in the same directory
as the destination file.
"""
writer = AtomicWriter(path, **kwargs)
return writer._open(_get_fileobject_func(writer, os.path.dirname(path)))