downsampling

jotpluggler
Quantizr (Jimmy) 5 days ago
parent 9645c979b8
commit 9414b4a9cb
  1. 8
      tools/jotpluggler/data.py
  2. 9
      tools/jotpluggler/layout.py
  3. 66
      tools/jotpluggler/pluggle.py
  4. 187
      tools/jotpluggler/views.py

@ -246,6 +246,11 @@ class DataManager:
with self._lock:
self._observers.append(callback)
def remove_observer(self, callback):
with self._lock:
if callback in self._observers:
self._observers.remove(callback)
def _reset(self):
with self._lock:
self.loading = True
@ -261,7 +266,8 @@ class DataManager:
cloudlog.warning(f"Warning: No log segments found for route: {route}")
return
with multiprocessing.Pool() as pool, tqdm.tqdm(total=len(lr.logreader_identifiers), desc="Processing Segments") as pbar:
num_processes = max(1, multiprocessing.cpu_count() // 2)
with multiprocessing.Pool(processes=num_processes) as pool, tqdm.tqdm(total=len(lr.logreader_identifiers), desc="Processing Segments") as pbar:
for segment_result, start_time, end_time in pool.imap(_process_segment, lr.logreader_identifiers):
pbar.update(1)
if segment_result:

@ -7,14 +7,15 @@ MIN_PANE_SIZE = 60
class PlotLayoutManager:
def __init__(self, data_manager: DataManager, playback_manager, scale: float = 1.0):
def __init__(self, data_manager: DataManager, playback_manager, worker_manager, scale: float = 1.0):
self.data_manager = data_manager
self.playback_manager = playback_manager
self.worker_manager = worker_manager
self.scale = scale
self.container_tag = "plot_layout_container"
self.active_panels: list = []
initial_panel = TimeSeriesPanel(data_manager, playback_manager)
initial_panel = TimeSeriesPanel(data_manager, playback_manager, worker_manager)
self.layout: dict = {"type": "panel", "panel": initial_panel}
def create_ui(self, parent_tag: str):
@ -82,7 +83,7 @@ class PlotLayoutManager:
old_panel = self.layout["panel"]
old_panel.destroy_ui()
self.active_panels.remove(old_panel)
new_panel = TimeSeriesPanel(self.data_manager, self.playback_manager)
new_panel = TimeSeriesPanel(self.data_manager, self.playback_manager, self.worker_manager)
self.layout = {"type": "panel", "panel": new_panel}
self._rebuild_ui_at_path([])
return
@ -112,7 +113,7 @@ class PlotLayoutManager:
def split_panel(self, panel_path: list[int], orientation: int):
current_layout = self._get_layout_at_path(panel_path)
existing_panel = current_layout["panel"]
new_panel = TimeSeriesPanel(self.data_manager, self.playback_manager)
new_panel = TimeSeriesPanel(self.data_manager, self.playback_manager, self.worker_manager)
parent, child_index = self._get_parent_and_index(panel_path)
if parent is None: # Root split

@ -5,6 +5,9 @@ import pyautogui
import subprocess
import dearpygui.dearpygui as dpg
import threading
import multiprocessing
import uuid
import signal
import numpy as np
from openpilot.common.basedir import BASEDIR
from openpilot.tools.jotpluggler.data import DataManager
@ -14,6 +17,50 @@ from openpilot.tools.jotpluggler.layout import PlotLayoutManager
DEMO_ROUTE = "a2a0ccea32023010|2023-07-27--13-01-19"
class WorkerManager:
def __init__(self, max_workers=None):
self.pool = multiprocessing.Pool(max_workers or min(4, multiprocessing.cpu_count()), initializer=WorkerManager.worker_initializer)
self.active_tasks = {}
def submit_task(self, func, args_list, callback=None, task_id=None):
task_id = task_id or str(uuid.uuid4())
if task_id in self.active_tasks:
try:
self.active_tasks[task_id].terminate()
except Exception:
pass
def handle_success(result):
self.active_tasks.pop(task_id, None)
if callback:
try:
callback(result)
except Exception as e:
print(f"Callback for task {task_id} failed: {e}")
def handle_error(error):
self.active_tasks.pop(task_id, None)
print(f"Task {task_id} failed: {error}")
async_result = self.pool.starmap_async(func, args_list, callback=handle_success, error_callback=handle_error)
self.active_tasks[task_id] = async_result
return task_id
@staticmethod
def worker_initializer():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def shutdown(self):
for task in self.active_tasks.values():
try:
task.terminate()
except Exception:
pass
self.pool.terminate()
self.pool.join()
class PlaybackManager:
def __init__(self):
self.is_playing = False
@ -62,9 +109,10 @@ class MainController:
self.scale = scale
self.data_manager = DataManager()
self.playback_manager = PlaybackManager()
self.worker_manager = WorkerManager()
self._create_global_themes()
self.data_tree_view = DataTreeView(self.data_manager, self.ui_lock)
self.plot_layout_manager = PlotLayoutManager(self.data_manager, self.playback_manager, scale=self.scale)
self.plot_layout_manager = PlotLayoutManager(self.data_manager, self.playback_manager, self.worker_manager, scale=self.scale)
self.data_manager.add_observer(self.on_data_loaded)
self.avg_char_width = None
self.visible_paths: set[str] = set()
@ -209,6 +257,9 @@ class MainController:
def _update_timeline_indicators(self, current_time_s: float):
self.plot_layout_manager.update_all_panels()
def shutdown(self):
self.worker_manager.shutdown()
def main(route_to_load=None):
dpg.create_context()
@ -241,12 +292,13 @@ def main(route_to_load=None):
dpg.show_viewport()
# Main loop
while dpg.is_dearpygui_running():
controller.update_frame(default_font)
dpg.render_dearpygui_frame()
dpg.destroy_context()
try:
while dpg.is_dearpygui_running():
controller.update_frame(default_font)
dpg.render_dearpygui_frame()
finally:
controller.shutdown()
dpg.destroy_context()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="A tool for visualizing openpilot logs.")

@ -1,4 +1,5 @@
import os
import queue
import re
import uuid
import threading
@ -38,25 +39,25 @@ class ViewPanel(ABC):
class TimeSeriesPanel(ViewPanel):
def __init__(self, data_manager: DataManager, playback_manager, panel_id: str | None = None):
def __init__(self, data_manager, playback_manager, worker_manager, panel_id: str | None = None):
super().__init__(panel_id)
self.data_manager = data_manager
self.playback_manager = playback_manager
self.worker_manager = worker_manager
self.title = "Time Series Plot"
self.plot_tag: str | None = None
self.x_axis_tag: str | None = None
self.y_axis_tag: str | None = None
self.timeline_indicator_tag: str | None = None
self._ui_created = False
self._series_data: dict[str, tuple] = {}
self.data_manager.add_observer(self.on_data_loaded)
def create_ui(self, parent_tag: str):
self.plot_tag = f"plot_{self.panel_id}"
self.x_axis_tag = f"{self.plot_tag}_x_axis"
self.y_axis_tag = f"{self.plot_tag}_y_axis"
self.timeline_indicator_tag = f"{self.plot_tag}_timeline"
self._ui_created = False
self._series_data = {}
self._last_plot_duration = 0
self._update_lock = threading.RLock()
self.results_queue = queue.Queue()
self._new_data = False
def create_ui(self, parent_tag: str):
self.data_manager.add_observer(self.on_data_loaded)
with dpg.plot(height=-1, width=-1, tag=self.plot_tag, parent=parent_tag, drop_callback=self._on_series_drop, payload_type="TIMESERIES_PAYLOAD"):
dpg.add_plot_legend()
dpg.add_plot_axis(dpg.mvXAxis, no_label=True, tag=self.x_axis_tag)
@ -66,80 +67,138 @@ class TimeSeriesPanel(ViewPanel):
for series_path in list(self._series_data.keys()):
self.add_series(series_path)
self._ui_created = True
def update(self):
if self._ui_created:
self.update_timeline_indicator(self.playback_manager.current_time_s)
def update_timeline_indicator(self, current_time_s: float):
if not self._ui_created or not dpg.does_item_exist(self.timeline_indicator_tag):
with self._update_lock:
if not self._ui_created:
return
if self._new_data:
self._new_data = False
for series_path in list(self._series_data.keys()):
self.add_series(series_path, update=True)
try: # check downsample result queue
results = self.results_queue.get_nowait()
for series_path, downsampled_time, downsampled_values in results:
series_tag = f"series_{self.panel_id}_{series_path}"
if dpg.does_item_exist(series_tag):
dpg.set_value(series_tag, [downsampled_time, downsampled_values])
except queue.Empty:
pass
current_time_s = self.playback_manager.current_time_s
dpg.set_value(self.timeline_indicator_tag, [[current_time_s], [0]])
# update timeseries legend label
for series_path, (time_array, value_array) in self._series_data.items():
position = np.searchsorted(time_array, current_time_s, side='right') - 1
if position >= 0 and (current_time_s - time_array[position]) <= 1.0:
value = value_array[position]
formatted_value = f"{value:.5f}" if np.issubdtype(type(value), np.floating) else str(value)
series_tag = f"series_{self.panel_id}_{series_path}"
if dpg.does_item_exist(series_tag):
dpg.configure_item(series_tag, label=f"{series_path}: {formatted_value}")
# downsample if plot zoom changed significantly
plot_duration = dpg.get_axis_limits(self.x_axis_tag)[1] - dpg.get_axis_limits(self.x_axis_tag)[0]
if plot_duration > self._last_plot_duration * 2 or plot_duration < self._last_plot_duration * 0.5:
self._downsample_all_series(plot_duration)
def _downsample_all_series(self, plot_duration):
plot_width = dpg.get_item_rect_size(self.plot_tag)[0]
if plot_width <= 0 or plot_duration <= 0:
return
dpg.set_value(self.timeline_indicator_tag, [[current_time_s], [0]])
for series_path, (rel_time_array, value_array) in self._series_data.items():
position = np.searchsorted(rel_time_array, current_time_s, side='right') - 1
value = None
if position >= 0 and (current_time_s - rel_time_array[position]) <= 1.0:
value = value_array[position]
if value is not None:
if np.issubdtype(type(value), np.floating):
formatted_value = f"{value:.5f}"
else:
formatted_value = str(value)
series_tag = f"series_{self.panel_id}_{series_path}"
legend_label = f"{series_path}: {formatted_value}"
if dpg.does_item_exist(series_tag):
dpg.configure_item(series_tag, label=legend_label)
def add_series(self, series_path: str, update: bool = False) -> bool:
if update or series_path not in self._series_data:
self._series_data[series_path] = self.data_manager.get_timeseries(series_path)
rel_time_array, value_array = self._series_data[series_path]
series_tag = f"series_{self.panel_id}_{series_path}"
if dpg.does_item_exist(series_tag):
dpg.set_value(series_tag, [rel_time_array, value_array])
else:
line_series_tag = dpg.add_line_series(x=rel_time_array, y=value_array, label=series_path, parent=self.y_axis_tag, tag=series_tag)
dpg.bind_item_theme(line_series_tag, "global_line_theme")
dpg.fit_axis_data(self.x_axis_tag)
dpg.fit_axis_data(self.y_axis_tag)
return True
self._last_plot_duration = plot_duration
target_points_per_second = plot_width / plot_duration
work_items = []
for series_path, (time_array, value_array) in self._series_data.items():
if len(time_array) == 0:
continue
series_duration = time_array[-1] - time_array[0] if len(time_array) > 1 else 1
points_per_second = len(time_array) / series_duration
if points_per_second > target_points_per_second * 2:
target_points = max(int(target_points_per_second * series_duration), plot_width)
work_items.append((series_path, time_array, value_array, target_points))
elif dpg.does_item_exist(f"series_{self.panel_id}_{series_path}"):
dpg.set_value(f"series_{self.panel_id}_{series_path}", [time_array, value_array])
if work_items:
self.worker_manager.submit_task(
TimeSeriesPanel._downsample_worker, work_items, callback=lambda results: self.results_queue.put(results), task_id=f"downsample_{self.panel_id}"
)
def add_series(self, series_path: str, update: bool = False):
with self._update_lock:
if update or series_path not in self._series_data:
self._series_data[series_path] = self.data_manager.get_timeseries(series_path)
time_array, value_array = self._series_data[series_path]
series_tag = f"series_{self.panel_id}_{series_path}"
if dpg.does_item_exist(series_tag):
dpg.set_value(series_tag, [time_array, value_array])
else:
line_series_tag = dpg.add_line_series(x=time_array, y=value_array, label=series_path, parent=self.y_axis_tag, tag=series_tag)
dpg.bind_item_theme(line_series_tag, "global_line_theme")
dpg.fit_axis_data(self.x_axis_tag)
dpg.fit_axis_data(self.y_axis_tag)
plot_duration = dpg.get_axis_limits(self.x_axis_tag)[1] - dpg.get_axis_limits(self.x_axis_tag)[0]
self._downsample_all_series(plot_duration)
def destroy_ui(self):
if self.plot_tag and dpg.does_item_exist(self.plot_tag):
dpg.delete_item(self.plot_tag)
self._ui_created = False
with self._update_lock:
self.data_manager.remove_observer(self.on_data_loaded)
if dpg.does_item_exist(self.plot_tag):
dpg.delete_item(self.plot_tag)
self._ui_created = False
def get_panel_type(self) -> str:
return "timeseries"
def clear(self):
for series_path in list(self._series_data.keys()):
self.remove_series(series_path)
with self._update_lock:
for series_path in list(self._series_data.keys()):
self.remove_series(series_path)
def remove_series(self, series_path: str):
if series_path in self._series_data:
series_tag = f"series_{self.panel_id}_{series_path}"
if dpg.does_item_exist(series_tag):
dpg.delete_item(series_tag)
del self._series_data[series_path]
with self._update_lock:
if series_path in self._series_data:
if dpg.does_item_exist(f"series_{self.panel_id}_{series_path}"):
dpg.delete_item(f"series_{self.panel_id}_{series_path}")
del self._series_data[series_path]
def on_data_loaded(self, data: dict):
for series_path in list(self._series_data.keys()):
self.add_series(series_path, update=True)
self._new_data = True
def _on_series_drop(self, sender, app_data, user_data):
self.add_series(app_data)
@staticmethod
def _downsample_worker(series_path, time_array, value_array, target_points):
if len(time_array) <= target_points:
return series_path, time_array, value_array
step = len(time_array) / target_points
indices = []
for i in range(target_points):
start_idx = int(i * step)
end_idx = int((i + 1) * step)
if start_idx == end_idx:
indices.append(start_idx)
else:
bucket_values = value_array[start_idx:end_idx]
min_idx = start_idx + np.argmin(bucket_values)
max_idx = start_idx + np.argmax(bucket_values)
if min_idx != max_idx:
indices.extend([min(min_idx, max_idx), max(min_idx, max_idx)])
else:
indices.append(min_idx)
indices = sorted(set(indices))
return series_path, time_array[indices], value_array[indices]
class DataTreeNode:
def __init__(self, name: str, full_path: str = ""):

Loading…
Cancel
Save