LogReader: add time series helper (#34608)

* simple time series

* example
pull/34610/head
Adeeb Shihadeh 2 months ago committed by GitHub
parent 6f5f8e7746
commit bab1254a02
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 89
      tools/lib/log_time_series.py

@ -0,0 +1,89 @@
import numpy as np
def flatten_type_dict(d, sep="/", prefix=None):
res = {}
if isinstance(d, dict):
for key, val in d.items():
if prefix is None:
res.update(flatten_type_dict(val, prefix=key))
else:
res.update(flatten_type_dict(val, prefix=prefix + sep + key))
return res
elif isinstance(d, list):
return {prefix: np.array(d)}
else:
return {prefix: d}
def get_message_dict(message, typ):
valid = message.valid
message = message._get(typ)
if not hasattr(message, 'to_dict'):
# TODO: support these
#print("skipping", typ)
return
msg_dict = message.to_dict(verbose=True)
msg_dict = flatten_type_dict(msg_dict)
msg_dict['_valid'] = valid
return msg_dict
def append_dict(path, t, d, values, arrays=False):
if path not in values:
group = {}
group["t"] = []
for k in d:
group[k] = []
values[path] = group
else:
group = values[path]
if arrays:
group["t"] += t.tolist()
for k, v in d.items():
group[k] += v.tolist()
else:
group["t"].append(t)
for k, v in d.items():
group[k].append(v)
def potentially_ragged_array(arr, dtype=None, **kwargs):
# TODO: is there a better way to detect inhomogeneous shapes?
try:
return np.array(arr, dtype=dtype, **kwargs)
except ValueError:
return np.array(arr, dtype=object, **kwargs)
def msgs_to_time_series(msgs):
"""
Convert an iterable of canonical capnp messages into a dictionary of time series.
Each time series has a value with key "t" which consists of monotonically increasing timestamps
in seconds.
"""
values = {}
for msg in msgs:
typ = msg.which()
tm = msg.logMonoTime / 1.0e9
msg_dict = get_message_dict(msg, typ)
if msg_dict is not None:
append_dict(typ, tm, msg_dict, values)
# Sort values by time.
for group in values.values():
order = np.argsort(group["t"])
for name, group_values in group.items():
group[name] = potentially_ragged_array(group_values)[order]
return values
if __name__ == "__main__":
import sys
from openpilot.tools.lib.logreader import LogReader
m = msgs_to_time_series(LogReader(sys.argv[1]))
print(m['driverCameraState']['t'])
print(np.diff(m['driverCameraState']['timestampSof']))
Loading…
Cancel
Save