openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

122 lines
3.6 KiB

#!/usr/bin/env python3
5 years ago
import os
import sys
import bz2
import urllib.parse
5 years ago
import capnp
from cereal import log as capnp_log
from tools.lib.filereader import FileReader
from tools.lib.route import Route, SegmentName
5 years ago
# this is an iterator itself, and uses private variables from LogReader
class MultiLogIterator:
def __init__(self, log_paths, sort_by_time=False):
5 years ago
self._log_paths = log_paths
self.sort_by_time = sort_by_time
5 years ago
self._first_log_idx = next(i for i in range(len(log_paths)) if log_paths[i] is not None)
self._current_log = self._first_log_idx
self._idx = 0
self._log_readers = [None]*len(log_paths)
self.start_time = self._log_reader(self._first_log_idx)._ts[0]
def _log_reader(self, i):
if self._log_readers[i] is None and self._log_paths[i] is not None:
log_path = self._log_paths[i]
self._log_readers[i] = LogReader(log_path, sort_by_time=self.sort_by_time)
5 years ago
return self._log_readers[i]
def __iter__(self):
return self
def _inc(self):
lr = self._log_reader(self._current_log)
if self._idx < len(lr._ents)-1:
self._idx += 1
else:
self._idx = 0
self._current_log = next(i for i in range(self._current_log + 1, len(self._log_readers) + 1)
if i == len(self._log_readers) or self._log_paths[i] is not None)
5 years ago
if self._current_log == len(self._log_readers):
raise StopIteration
5 years ago
def __next__(self):
while 1:
lr = self._log_reader(self._current_log)
ret = lr._ents[self._idx]
self._inc()
return ret
def tell(self):
# returns seconds from start of log
return (self._log_reader(self._current_log)._ts[self._idx] - self.start_time) * 1e-9
def seek(self, ts):
# seek to nearest minute
minute = int(ts/60)
if minute >= len(self._log_paths) or self._log_paths[minute] is None:
return False
self._current_log = minute
# HACK: O(n) seek afterward
self._idx = 0
while self.tell() < ts:
self._inc()
return True
class LogReader:
def __init__(self, fn, canonicalize=True, only_union_types=False, sort_by_time=False):
5 years ago
data_version = None
_, ext = os.path.splitext(urllib.parse.urlparse(fn).path)
5 years ago
with FileReader(fn) as f:
dat = f.read()
if ext == "":
# old rlogs weren't bz2 compressed
ents = capnp_log.Event.read_multiple_bytes(dat)
5 years ago
elif ext == ".bz2":
dat = bz2.decompress(dat)
ents = capnp_log.Event.read_multiple_bytes(dat)
5 years ago
else:
raise Exception(f"unknown extension {ext}")
5 years ago
self._ents = list(sorted(ents, key=lambda x: x.logMonoTime) if sort_by_time else ents)
self._ts = [x.logMonoTime for x in self._ents]
5 years ago
self.data_version = data_version
self._only_union_types = only_union_types
def __iter__(self):
for ent in self._ents:
if self._only_union_types:
5 years ago
try:
ent.which()
yield ent
except capnp.lib.capnp.KjException:
pass
else:
yield ent
def logreader_from_route_or_segment(r):
sn = SegmentName(r, allow_route_name=True)
route = Route(sn.route_name.canonical_name)
if sn.segment_num < 0:
return MultiLogIterator(route.log_paths())
else:
return LogReader(route.log_paths()[sn.segment_num])
5 years ago
if __name__ == "__main__":
import codecs
# capnproto <= 0.8.0 throws errors converting byte data to string
# below line catches those errors and replaces the bytes with \x__
codecs.register_error("strict", codecs.backslashreplace_errors)
5 years ago
log_path = sys.argv[1]
lr = LogReader(log_path, sort_by_time=True)
5 years ago
for msg in lr:
print(msg)