openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

85 lines
2.1 KiB

import numpy as np
def flatten_type_dict(d, sep="/", prefix=None):
res = {}
if isinstance(d, dict):
for key, val in d.items():
if prefix is None:
res.update(flatten_type_dict(val, prefix=key))
else:
res.update(flatten_type_dict(val, prefix=prefix + sep + key))
return res
elif isinstance(d, list):
return {prefix: np.array(d)}
else:
return {prefix: d}
def get_message_dict(message, typ):
valid = message.valid
message = message._get(typ)
if not hasattr(message, 'to_dict') or typ in ('qcomGnss', 'ubloxGnss'):
# TODO: support these
#print("skipping", typ)
return
msg_dict = message.to_dict(verbose=True)
msg_dict = flatten_type_dict(msg_dict)
msg_dict['_valid'] = valid
return msg_dict
def append_dict(path, t, d, values):
if path not in values:
group = {}
group["t"] = []
for k in d:
group[k] = []
values[path] = group
else:
group = values[path]
group["t"].append(t)
for k, v in d.items():
group[k].append(v)
def potentially_ragged_array(arr, dtype=None, **kwargs):
# TODO: is there a better way to detect inhomogeneous shapes?
try:
return np.array(arr, dtype=dtype, **kwargs)
except ValueError:
return np.array(arr, dtype=object, **kwargs)
def msgs_to_time_series(msgs):
"""
Convert an iterable of canonical capnp messages into a dictionary of time series.
Each time series has a value with key "t" which consists of monotonically increasing timestamps
in seconds.
"""
values = {}
for msg in msgs:
typ = msg.which()
tm = msg.logMonoTime / 1.0e9
msg_dict = get_message_dict(msg, typ)
if msg_dict is not None:
append_dict(typ, tm, msg_dict, values)
# Sort values by time.
for group in values.values():
order = np.argsort(group["t"])
for name, group_values in group.items():
group[name] = potentially_ragged_array(group_values)[order]
return values
if __name__ == "__main__":
import sys
from openpilot.tools.lib.logreader import LogReader
m = msgs_to_time_series(LogReader(sys.argv[1]))
print(m['driverCameraState']['t'])
print(np.diff(m['driverCameraState']['timestampSof']))