diff --git a/tools/jotpluggler/data.py b/tools/jotpluggler/data.py index 0cbc62bf13..fa91b1d7ae 100644 --- a/tools/jotpluggler/data.py +++ b/tools/jotpluggler/data.py @@ -246,6 +246,11 @@ class DataManager: with self._lock: self._observers.append(callback) + def remove_observer(self, callback): + with self._lock: + if callback in self._observers: + self._observers.remove(callback) + def _reset(self): with self._lock: self.loading = True @@ -261,7 +266,8 @@ class DataManager: cloudlog.warning(f"Warning: No log segments found for route: {route}") return - with multiprocessing.Pool() as pool, tqdm.tqdm(total=len(lr.logreader_identifiers), desc="Processing Segments") as pbar: + num_processes = max(1, multiprocessing.cpu_count() // 2) + with multiprocessing.Pool(processes=num_processes) as pool, tqdm.tqdm(total=len(lr.logreader_identifiers), desc="Processing Segments") as pbar: for segment_result, start_time, end_time in pool.imap(_process_segment, lr.logreader_identifiers): pbar.update(1) if segment_result: diff --git a/tools/jotpluggler/layout.py b/tools/jotpluggler/layout.py index a04de407a6..342b5b81e2 100644 --- a/tools/jotpluggler/layout.py +++ b/tools/jotpluggler/layout.py @@ -7,14 +7,15 @@ MIN_PANE_SIZE = 60 class PlotLayoutManager: - def __init__(self, data_manager: DataManager, playback_manager, scale: float = 1.0): + def __init__(self, data_manager: DataManager, playback_manager, worker_manager, scale: float = 1.0): self.data_manager = data_manager self.playback_manager = playback_manager + self.worker_manager = worker_manager self.scale = scale self.container_tag = "plot_layout_container" self.active_panels: list = [] - initial_panel = TimeSeriesPanel(data_manager, playback_manager) + initial_panel = TimeSeriesPanel(data_manager, playback_manager, worker_manager) self.layout: dict = {"type": "panel", "panel": initial_panel} def create_ui(self, parent_tag: str): @@ -82,7 +83,7 @@ class PlotLayoutManager: old_panel = self.layout["panel"] old_panel.destroy_ui() self.active_panels.remove(old_panel) - new_panel = TimeSeriesPanel(self.data_manager, self.playback_manager) + new_panel = TimeSeriesPanel(self.data_manager, self.playback_manager, self.worker_manager) self.layout = {"type": "panel", "panel": new_panel} self._rebuild_ui_at_path([]) return @@ -112,7 +113,7 @@ class PlotLayoutManager: def split_panel(self, panel_path: list[int], orientation: int): current_layout = self._get_layout_at_path(panel_path) existing_panel = current_layout["panel"] - new_panel = TimeSeriesPanel(self.data_manager, self.playback_manager) + new_panel = TimeSeriesPanel(self.data_manager, self.playback_manager, self.worker_manager) parent, child_index = self._get_parent_and_index(panel_path) if parent is None: # Root split diff --git a/tools/jotpluggler/pluggle.py b/tools/jotpluggler/pluggle.py index 616905a5d7..de8f3dd2b2 100755 --- a/tools/jotpluggler/pluggle.py +++ b/tools/jotpluggler/pluggle.py @@ -5,6 +5,9 @@ import pyautogui import subprocess import dearpygui.dearpygui as dpg import threading +import multiprocessing +import uuid +import signal import numpy as np from openpilot.common.basedir import BASEDIR from openpilot.tools.jotpluggler.data import DataManager @@ -14,6 +17,50 @@ from openpilot.tools.jotpluggler.layout import PlotLayoutManager DEMO_ROUTE = "a2a0ccea32023010|2023-07-27--13-01-19" +class WorkerManager: + def __init__(self, max_workers=None): + self.pool = multiprocessing.Pool(max_workers or min(4, multiprocessing.cpu_count()), initializer=WorkerManager.worker_initializer) + self.active_tasks = {} + + def submit_task(self, func, args_list, callback=None, task_id=None): + task_id = task_id or str(uuid.uuid4()) + + if task_id in self.active_tasks: + try: + self.active_tasks[task_id].terminate() + except Exception: + pass + + def handle_success(result): + self.active_tasks.pop(task_id, None) + if callback: + try: + callback(result) + except Exception as e: + print(f"Callback for task {task_id} failed: {e}") + + def handle_error(error): + self.active_tasks.pop(task_id, None) + print(f"Task {task_id} failed: {error}") + + async_result = self.pool.starmap_async(func, args_list, callback=handle_success, error_callback=handle_error) + self.active_tasks[task_id] = async_result + return task_id + + @staticmethod + def worker_initializer(): + signal.signal(signal.SIGINT, signal.SIG_IGN) + + def shutdown(self): + for task in self.active_tasks.values(): + try: + task.terminate() + except Exception: + pass + self.pool.terminate() + self.pool.join() + + class PlaybackManager: def __init__(self): self.is_playing = False @@ -62,9 +109,10 @@ class MainController: self.scale = scale self.data_manager = DataManager() self.playback_manager = PlaybackManager() + self.worker_manager = WorkerManager() self._create_global_themes() self.data_tree_view = DataTreeView(self.data_manager, self.ui_lock) - self.plot_layout_manager = PlotLayoutManager(self.data_manager, self.playback_manager, scale=self.scale) + self.plot_layout_manager = PlotLayoutManager(self.data_manager, self.playback_manager, self.worker_manager, scale=self.scale) self.data_manager.add_observer(self.on_data_loaded) self.avg_char_width = None self.visible_paths: set[str] = set() @@ -209,6 +257,9 @@ class MainController: def _update_timeline_indicators(self, current_time_s: float): self.plot_layout_manager.update_all_panels() + def shutdown(self): + self.worker_manager.shutdown() + def main(route_to_load=None): dpg.create_context() @@ -241,12 +292,13 @@ def main(route_to_load=None): dpg.show_viewport() # Main loop - while dpg.is_dearpygui_running(): - controller.update_frame(default_font) - dpg.render_dearpygui_frame() - - dpg.destroy_context() - + try: + while dpg.is_dearpygui_running(): + controller.update_frame(default_font) + dpg.render_dearpygui_frame() + finally: + controller.shutdown() + dpg.destroy_context() if __name__ == "__main__": parser = argparse.ArgumentParser(description="A tool for visualizing openpilot logs.") diff --git a/tools/jotpluggler/views.py b/tools/jotpluggler/views.py index 4309ada88e..dbc61ead67 100644 --- a/tools/jotpluggler/views.py +++ b/tools/jotpluggler/views.py @@ -1,4 +1,5 @@ import os +import queue import re import uuid import threading @@ -38,25 +39,25 @@ class ViewPanel(ABC): class TimeSeriesPanel(ViewPanel): - def __init__(self, data_manager: DataManager, playback_manager, panel_id: str | None = None): + def __init__(self, data_manager, playback_manager, worker_manager, panel_id: str | None = None): super().__init__(panel_id) self.data_manager = data_manager self.playback_manager = playback_manager + self.worker_manager = worker_manager self.title = "Time Series Plot" - self.plot_tag: str | None = None - self.x_axis_tag: str | None = None - self.y_axis_tag: str | None = None - self.timeline_indicator_tag: str | None = None - self._ui_created = False - self._series_data: dict[str, tuple] = {} - self.data_manager.add_observer(self.on_data_loaded) - - def create_ui(self, parent_tag: str): self.plot_tag = f"plot_{self.panel_id}" self.x_axis_tag = f"{self.plot_tag}_x_axis" self.y_axis_tag = f"{self.plot_tag}_y_axis" self.timeline_indicator_tag = f"{self.plot_tag}_timeline" + self._ui_created = False + self._series_data = {} + self._last_plot_duration = 0 + self._update_lock = threading.RLock() + self.results_queue = queue.Queue() + self._new_data = False + def create_ui(self, parent_tag: str): + self.data_manager.add_observer(self.on_data_loaded) with dpg.plot(height=-1, width=-1, tag=self.plot_tag, parent=parent_tag, drop_callback=self._on_series_drop, payload_type="TIMESERIES_PAYLOAD"): dpg.add_plot_legend() dpg.add_plot_axis(dpg.mvXAxis, no_label=True, tag=self.x_axis_tag) @@ -66,80 +67,138 @@ class TimeSeriesPanel(ViewPanel): for series_path in list(self._series_data.keys()): self.add_series(series_path) - self._ui_created = True def update(self): - if self._ui_created: - self.update_timeline_indicator(self.playback_manager.current_time_s) - - def update_timeline_indicator(self, current_time_s: float): - if not self._ui_created or not dpg.does_item_exist(self.timeline_indicator_tag): + with self._update_lock: + if not self._ui_created: + return + + if self._new_data: + self._new_data = False + for series_path in list(self._series_data.keys()): + self.add_series(series_path, update=True) + + try: # check downsample result queue + results = self.results_queue.get_nowait() + for series_path, downsampled_time, downsampled_values in results: + series_tag = f"series_{self.panel_id}_{series_path}" + if dpg.does_item_exist(series_tag): + dpg.set_value(series_tag, [downsampled_time, downsampled_values]) + except queue.Empty: + pass + + current_time_s = self.playback_manager.current_time_s + dpg.set_value(self.timeline_indicator_tag, [[current_time_s], [0]]) + + # update timeseries legend label + for series_path, (time_array, value_array) in self._series_data.items(): + position = np.searchsorted(time_array, current_time_s, side='right') - 1 + if position >= 0 and (current_time_s - time_array[position]) <= 1.0: + value = value_array[position] + formatted_value = f"{value:.5f}" if np.issubdtype(type(value), np.floating) else str(value) + series_tag = f"series_{self.panel_id}_{series_path}" + if dpg.does_item_exist(series_tag): + dpg.configure_item(series_tag, label=f"{series_path}: {formatted_value}") + + # downsample if plot zoom changed significantly + plot_duration = dpg.get_axis_limits(self.x_axis_tag)[1] - dpg.get_axis_limits(self.x_axis_tag)[0] + if plot_duration > self._last_plot_duration * 2 or plot_duration < self._last_plot_duration * 0.5: + self._downsample_all_series(plot_duration) + + def _downsample_all_series(self, plot_duration): + plot_width = dpg.get_item_rect_size(self.plot_tag)[0] + if plot_width <= 0 or plot_duration <= 0: return - dpg.set_value(self.timeline_indicator_tag, [[current_time_s], [0]]) - - for series_path, (rel_time_array, value_array) in self._series_data.items(): - position = np.searchsorted(rel_time_array, current_time_s, side='right') - 1 - value = None - - if position >= 0 and (current_time_s - rel_time_array[position]) <= 1.0: - value = value_array[position] - - if value is not None: - if np.issubdtype(type(value), np.floating): - formatted_value = f"{value:.5f}" - else: - formatted_value = str(value) - - series_tag = f"series_{self.panel_id}_{series_path}" - legend_label = f"{series_path}: {formatted_value}" - - if dpg.does_item_exist(series_tag): - dpg.configure_item(series_tag, label=legend_label) - - def add_series(self, series_path: str, update: bool = False) -> bool: - if update or series_path not in self._series_data: - self._series_data[series_path] = self.data_manager.get_timeseries(series_path) - - rel_time_array, value_array = self._series_data[series_path] - series_tag = f"series_{self.panel_id}_{series_path}" - - if dpg.does_item_exist(series_tag): - dpg.set_value(series_tag, [rel_time_array, value_array]) - else: - line_series_tag = dpg.add_line_series(x=rel_time_array, y=value_array, label=series_path, parent=self.y_axis_tag, tag=series_tag) - dpg.bind_item_theme(line_series_tag, "global_line_theme") - dpg.fit_axis_data(self.x_axis_tag) - dpg.fit_axis_data(self.y_axis_tag) - return True + self._last_plot_duration = plot_duration + target_points_per_second = plot_width / plot_duration + work_items = [] + for series_path, (time_array, value_array) in self._series_data.items(): + if len(time_array) == 0: + continue + series_duration = time_array[-1] - time_array[0] if len(time_array) > 1 else 1 + points_per_second = len(time_array) / series_duration + if points_per_second > target_points_per_second * 2: + target_points = max(int(target_points_per_second * series_duration), plot_width) + work_items.append((series_path, time_array, value_array, target_points)) + elif dpg.does_item_exist(f"series_{self.panel_id}_{series_path}"): + dpg.set_value(f"series_{self.panel_id}_{series_path}", [time_array, value_array]) + + if work_items: + self.worker_manager.submit_task( + TimeSeriesPanel._downsample_worker, work_items, callback=lambda results: self.results_queue.put(results), task_id=f"downsample_{self.panel_id}" + ) + + def add_series(self, series_path: str, update: bool = False): + with self._update_lock: + if update or series_path not in self._series_data: + self._series_data[series_path] = self.data_manager.get_timeseries(series_path) + + time_array, value_array = self._series_data[series_path] + series_tag = f"series_{self.panel_id}_{series_path}" + if dpg.does_item_exist(series_tag): + dpg.set_value(series_tag, [time_array, value_array]) + else: + line_series_tag = dpg.add_line_series(x=time_array, y=value_array, label=series_path, parent=self.y_axis_tag, tag=series_tag) + dpg.bind_item_theme(line_series_tag, "global_line_theme") + dpg.fit_axis_data(self.x_axis_tag) + dpg.fit_axis_data(self.y_axis_tag) + plot_duration = dpg.get_axis_limits(self.x_axis_tag)[1] - dpg.get_axis_limits(self.x_axis_tag)[0] + self._downsample_all_series(plot_duration) def destroy_ui(self): - if self.plot_tag and dpg.does_item_exist(self.plot_tag): - dpg.delete_item(self.plot_tag) - self._ui_created = False + with self._update_lock: + self.data_manager.remove_observer(self.on_data_loaded) + if dpg.does_item_exist(self.plot_tag): + dpg.delete_item(self.plot_tag) + self._ui_created = False def get_panel_type(self) -> str: return "timeseries" def clear(self): - for series_path in list(self._series_data.keys()): - self.remove_series(series_path) + with self._update_lock: + for series_path in list(self._series_data.keys()): + self.remove_series(series_path) def remove_series(self, series_path: str): - if series_path in self._series_data: - series_tag = f"series_{self.panel_id}_{series_path}" - if dpg.does_item_exist(series_tag): - dpg.delete_item(series_tag) - del self._series_data[series_path] + with self._update_lock: + if series_path in self._series_data: + if dpg.does_item_exist(f"series_{self.panel_id}_{series_path}"): + dpg.delete_item(f"series_{self.panel_id}_{series_path}") + del self._series_data[series_path] def on_data_loaded(self, data: dict): - for series_path in list(self._series_data.keys()): - self.add_series(series_path, update=True) + self._new_data = True def _on_series_drop(self, sender, app_data, user_data): self.add_series(app_data) + @staticmethod + def _downsample_worker(series_path, time_array, value_array, target_points): + if len(time_array) <= target_points: + return series_path, time_array, value_array + + step = len(time_array) / target_points + indices = [] + + for i in range(target_points): + start_idx = int(i * step) + end_idx = int((i + 1) * step) + if start_idx == end_idx: + indices.append(start_idx) + else: + bucket_values = value_array[start_idx:end_idx] + min_idx = start_idx + np.argmin(bucket_values) + max_idx = start_idx + np.argmax(bucket_values) + if min_idx != max_idx: + indices.extend([min(min_idx, max_idx), max(min_idx, max_idx)]) + else: + indices.append(min_idx) + indices = sorted(set(indices)) + return series_path, time_array[indices], value_array[indices] + class DataTreeNode: def __init__(self, name: str, full_path: str = ""):