You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							294 lines
						
					
					
						
							11 KiB
						
					
					
				
			
		
		
	
	
							294 lines
						
					
					
						
							11 KiB
						
					
					
				import uuid
 | 
						|
import threading
 | 
						|
import numpy as np
 | 
						|
from collections import deque
 | 
						|
import dearpygui.dearpygui as dpg
 | 
						|
from abc import ABC, abstractmethod
 | 
						|
 | 
						|
 | 
						|
class ViewPanel(ABC):
 | 
						|
  """Abstract base class for all view panels that can be displayed in a plot container"""
 | 
						|
 | 
						|
  def __init__(self, panel_id: str = None):
 | 
						|
    self.panel_id = panel_id or str(uuid.uuid4())
 | 
						|
    self.title = "Untitled Panel"
 | 
						|
 | 
						|
  @abstractmethod
 | 
						|
  def clear(self):
 | 
						|
    pass
 | 
						|
 | 
						|
  @abstractmethod
 | 
						|
  def create_ui(self, parent_tag: str):
 | 
						|
    pass
 | 
						|
 | 
						|
  @abstractmethod
 | 
						|
  def destroy_ui(self):
 | 
						|
    pass
 | 
						|
 | 
						|
  @abstractmethod
 | 
						|
  def get_panel_type(self) -> str:
 | 
						|
    pass
 | 
						|
 | 
						|
  @abstractmethod
 | 
						|
  def update(self):
 | 
						|
    pass
 | 
						|
 | 
						|
  @abstractmethod
 | 
						|
  def to_dict(self) -> dict:
 | 
						|
    pass
 | 
						|
 | 
						|
  @classmethod
 | 
						|
  @abstractmethod
 | 
						|
  def load_from_dict(cls, data: dict, data_manager, playback_manager, worker_manager):
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
class TimeSeriesPanel(ViewPanel):
 | 
						|
  def __init__(self, data_manager, playback_manager, worker_manager, panel_id: str | None = None):
 | 
						|
    super().__init__(panel_id)
 | 
						|
    self.data_manager = data_manager
 | 
						|
    self.playback_manager = playback_manager
 | 
						|
    self.worker_manager = worker_manager
 | 
						|
    self.title = "Time Series Plot"
 | 
						|
    self.plot_tag = f"plot_{self.panel_id}"
 | 
						|
    self.x_axis_tag = f"{self.plot_tag}_x_axis"
 | 
						|
    self.y_axis_tag = f"{self.plot_tag}_y_axis"
 | 
						|
    self.timeline_indicator_tag = f"{self.plot_tag}_timeline"
 | 
						|
    self._ui_created = False
 | 
						|
    self._series_data: dict[str, tuple[np.ndarray, np.ndarray]] = {}
 | 
						|
    self._last_plot_duration = 0
 | 
						|
    self._update_lock = threading.RLock()
 | 
						|
    self._results_deque: deque[tuple[str, list, list]] = deque()
 | 
						|
    self._new_data = False
 | 
						|
    self._last_x_limits = (0.0, 0.0)
 | 
						|
    self._queued_x_sync: tuple | None = None
 | 
						|
    self._queued_reallow_x_zoom = False
 | 
						|
    self._total_segments = self.playback_manager.num_segments
 | 
						|
 | 
						|
  def to_dict(self) -> dict:
 | 
						|
    return {
 | 
						|
      "type": "timeseries",
 | 
						|
      "title": self.title,
 | 
						|
      "series_paths": list(self._series_data.keys())
 | 
						|
    }
 | 
						|
 | 
						|
  @classmethod
 | 
						|
  def load_from_dict(cls, data: dict, data_manager, playback_manager, worker_manager):
 | 
						|
    panel = cls(data_manager, playback_manager, worker_manager)
 | 
						|
    panel.title = data.get("title", "Time Series Plot")
 | 
						|
    panel._series_data = {path: (np.array([]), np.array([])) for path in data.get("series_paths", [])}
 | 
						|
    return panel
 | 
						|
 | 
						|
  def create_ui(self, parent_tag: str):
 | 
						|
    self.data_manager.add_observer(self.on_data_loaded)
 | 
						|
    self.playback_manager.add_x_axis_observer(self._on_x_axis_sync)
 | 
						|
    with dpg.plot(height=-1, width=-1, tag=self.plot_tag, parent=parent_tag, drop_callback=self._on_series_drop, payload_type="TIMESERIES_PAYLOAD"):
 | 
						|
      dpg.add_plot_legend()
 | 
						|
      dpg.add_plot_axis(dpg.mvXAxis, no_label=True, tag=self.x_axis_tag)
 | 
						|
      dpg.add_plot_axis(dpg.mvYAxis, no_label=True, tag=self.y_axis_tag)
 | 
						|
      timeline_series_tag = dpg.add_inf_line_series(x=[0], label="Timeline", parent=self.y_axis_tag, tag=self.timeline_indicator_tag)
 | 
						|
      dpg.bind_item_theme(timeline_series_tag, "timeline_theme")
 | 
						|
 | 
						|
    self._new_data = True
 | 
						|
    self._queued_x_sync = self.playback_manager.x_axis_bounds
 | 
						|
    self._ui_created = True
 | 
						|
 | 
						|
  def update(self):
 | 
						|
    with self._update_lock:
 | 
						|
      if not self._ui_created:
 | 
						|
        return
 | 
						|
 | 
						|
      if self._queued_x_sync:
 | 
						|
        min_time, max_time = self._queued_x_sync
 | 
						|
        self._queued_x_sync = None
 | 
						|
        dpg.set_axis_limits(self.x_axis_tag, min_time, max_time)
 | 
						|
        self._last_x_limits = (min_time, max_time)
 | 
						|
        self._fit_y_axis(min_time, max_time)
 | 
						|
        self._queued_reallow_x_zoom = True # must wait a frame before allowing user changes so that axis limits take effect
 | 
						|
        return
 | 
						|
 | 
						|
      if self._queued_reallow_x_zoom:
 | 
						|
        self._queued_reallow_x_zoom = False
 | 
						|
        if tuple(dpg.get_axis_limits(self.x_axis_tag)) == self._last_x_limits:
 | 
						|
          dpg.set_axis_limits_auto(self.x_axis_tag)
 | 
						|
        else:
 | 
						|
          self._queued_x_sync = self._last_x_limits # retry, likely too early
 | 
						|
          return
 | 
						|
 | 
						|
      if self._new_data:  # handle new data in main thread
 | 
						|
        self._new_data = False
 | 
						|
        if self._total_segments > 0:
 | 
						|
          dpg.set_axis_limits_constraints(self.x_axis_tag, -10, self._total_segments * 60 + 10)
 | 
						|
        self._fit_y_axis(*dpg.get_axis_limits(self.x_axis_tag))
 | 
						|
        for series_path in list(self._series_data.keys()):
 | 
						|
          self.add_series(series_path, update=True)
 | 
						|
 | 
						|
      current_limits = dpg.get_axis_limits(self.x_axis_tag)
 | 
						|
      # downsample if plot zoom changed significantly
 | 
						|
      plot_duration = current_limits[1] - current_limits[0]
 | 
						|
      if plot_duration > self._last_plot_duration * 2 or plot_duration < self._last_plot_duration * 0.5:
 | 
						|
        self._downsample_all_series(plot_duration)
 | 
						|
      # sync x-axis if changed by user
 | 
						|
      if self._last_x_limits != current_limits:
 | 
						|
        self.playback_manager.set_x_axis_bounds(current_limits[0], current_limits[1], source_panel=self)
 | 
						|
        self._last_x_limits = current_limits
 | 
						|
        self._fit_y_axis(current_limits[0], current_limits[1])
 | 
						|
 | 
						|
      while self._results_deque:  # handle downsampled results in main thread
 | 
						|
        results = self._results_deque.popleft()
 | 
						|
        for series_path, downsampled_time, downsampled_values in results:
 | 
						|
          series_tag = f"series_{self.panel_id}_{series_path}"
 | 
						|
          if dpg.does_item_exist(series_tag):
 | 
						|
            dpg.set_value(series_tag, (downsampled_time, downsampled_values.astype(float)))
 | 
						|
 | 
						|
      # update timeline
 | 
						|
      current_time_s = self.playback_manager.current_time_s
 | 
						|
      dpg.set_value(self.timeline_indicator_tag, [[current_time_s], [0]])
 | 
						|
 | 
						|
      # update timeseries legend label
 | 
						|
      for series_path, (time_array, value_array) in self._series_data.items():
 | 
						|
        position = np.searchsorted(time_array, current_time_s, side='right') - 1
 | 
						|
        if position >= 0 and (current_time_s - time_array[position]) <= 1.0:
 | 
						|
          value = value_array[position]
 | 
						|
          formatted_value = f"{value:.5f}" if np.issubdtype(type(value), np.floating) else str(value)
 | 
						|
          series_tag = f"series_{self.panel_id}_{series_path}"
 | 
						|
          if dpg.does_item_exist(series_tag):
 | 
						|
            dpg.configure_item(series_tag, label=f"{series_path}: {formatted_value}")
 | 
						|
 | 
						|
  def _on_x_axis_sync(self, min_time: float, max_time: float, source_panel):
 | 
						|
    with self._update_lock:
 | 
						|
      if source_panel != self:
 | 
						|
        self._queued_x_sync = (min_time, max_time)
 | 
						|
 | 
						|
  def _fit_y_axis(self, x_min: float, x_max: float):
 | 
						|
    if not self._series_data:
 | 
						|
      dpg.set_axis_limits(self.y_axis_tag, -1, 1)
 | 
						|
      return
 | 
						|
 | 
						|
    global_min = float('inf')
 | 
						|
    global_max = float('-inf')
 | 
						|
    found_data = False
 | 
						|
 | 
						|
    for time_array, value_array in self._series_data.values():
 | 
						|
      if len(time_array) == 0:
 | 
						|
        continue
 | 
						|
      start_idx, end_idx = np.searchsorted(time_array, [x_min, x_max])
 | 
						|
      end_idx = min(end_idx, len(time_array) - 1)
 | 
						|
      if start_idx <= end_idx:
 | 
						|
        y_slice = value_array[start_idx:end_idx + 1]
 | 
						|
        series_min, series_max = np.min(y_slice), np.max(y_slice)
 | 
						|
        global_min = min(global_min, series_min)
 | 
						|
        global_max = max(global_max, series_max)
 | 
						|
        found_data = True
 | 
						|
 | 
						|
    if not found_data:
 | 
						|
      dpg.set_axis_limits(self.y_axis_tag, -1, 1)
 | 
						|
      return
 | 
						|
 | 
						|
    if global_min == global_max:
 | 
						|
      padding = max(abs(global_min) * 0.1, 1.0)
 | 
						|
      y_min, y_max = global_min - padding, global_max + padding
 | 
						|
    else:
 | 
						|
      range_size = global_max - global_min
 | 
						|
      padding = range_size * 0.1
 | 
						|
      y_min, y_max = global_min - padding, global_max + padding
 | 
						|
 | 
						|
    dpg.set_axis_limits(self.y_axis_tag, y_min, y_max)
 | 
						|
 | 
						|
  def _downsample_all_series(self, plot_duration):
 | 
						|
    plot_width = dpg.get_item_rect_size(self.plot_tag)[0]
 | 
						|
    if plot_width <= 0 or plot_duration <= 0:
 | 
						|
      return
 | 
						|
 | 
						|
    self._last_plot_duration = plot_duration
 | 
						|
    target_points_per_second = plot_width / plot_duration
 | 
						|
    work_items = []
 | 
						|
    for series_path, (time_array, value_array) in self._series_data.items():
 | 
						|
      if len(time_array) == 0:
 | 
						|
        continue
 | 
						|
      series_duration = time_array[-1] - time_array[0] if len(time_array) > 1 else 1
 | 
						|
      points_per_second = len(time_array) / series_duration
 | 
						|
      if points_per_second > target_points_per_second * 2:
 | 
						|
        target_points = max(int(target_points_per_second * series_duration), plot_width)
 | 
						|
        work_items.append((series_path, time_array, value_array, target_points))
 | 
						|
      elif dpg.does_item_exist(f"series_{self.panel_id}_{series_path}"):
 | 
						|
        dpg.set_value(f"series_{self.panel_id}_{series_path}", (time_array, value_array.astype(float)))
 | 
						|
 | 
						|
    if work_items:
 | 
						|
      self.worker_manager.submit_task(
 | 
						|
        TimeSeriesPanel._downsample_worker, work_items, callback=lambda results: self._results_deque.append(results), task_id=f"downsample_{self.panel_id}"
 | 
						|
      )
 | 
						|
 | 
						|
  def add_series(self, series_path: str, update: bool = False):
 | 
						|
    with self._update_lock:
 | 
						|
      if update or series_path not in self._series_data:
 | 
						|
        self._series_data[series_path] = self.data_manager.get_timeseries(series_path)
 | 
						|
 | 
						|
      time_array, value_array = self._series_data[series_path]
 | 
						|
      series_tag = f"series_{self.panel_id}_{series_path}"
 | 
						|
      if dpg.does_item_exist(series_tag):
 | 
						|
        dpg.set_value(series_tag, (time_array, value_array.astype(float)))
 | 
						|
      else:
 | 
						|
        line_series_tag = dpg.add_line_series(x=time_array, y=value_array.astype(float), label=series_path, parent=self.y_axis_tag, tag=series_tag)
 | 
						|
        dpg.bind_item_theme(line_series_tag, "line_theme")
 | 
						|
      self._fit_y_axis(*dpg.get_axis_limits(self.x_axis_tag))
 | 
						|
      plot_duration = dpg.get_axis_limits(self.x_axis_tag)[1] - dpg.get_axis_limits(self.x_axis_tag)[0]
 | 
						|
      self._downsample_all_series(plot_duration)
 | 
						|
 | 
						|
  def destroy_ui(self):
 | 
						|
    with self._update_lock:
 | 
						|
      self.data_manager.remove_observer(self.on_data_loaded)
 | 
						|
      self.playback_manager.remove_x_axis_observer(self._on_x_axis_sync)
 | 
						|
      if dpg.does_item_exist(self.plot_tag):
 | 
						|
        dpg.delete_item(self.plot_tag)
 | 
						|
      self._ui_created = False
 | 
						|
 | 
						|
  def get_panel_type(self) -> str:
 | 
						|
    return "timeseries"
 | 
						|
 | 
						|
  def clear(self):
 | 
						|
    with self._update_lock:
 | 
						|
      for series_path in list(self._series_data.keys()):
 | 
						|
        self.remove_series(series_path)
 | 
						|
 | 
						|
  def remove_series(self, series_path: str):
 | 
						|
    with self._update_lock:
 | 
						|
      if series_path in self._series_data:
 | 
						|
        if dpg.does_item_exist(f"series_{self.panel_id}_{series_path}"):
 | 
						|
          dpg.delete_item(f"series_{self.panel_id}_{series_path}")
 | 
						|
        del self._series_data[series_path]
 | 
						|
 | 
						|
  def on_data_loaded(self, data: dict):
 | 
						|
    with self._update_lock:
 | 
						|
      self._new_data = True
 | 
						|
      if data.get('metadata_loaded'):
 | 
						|
        self._total_segments = data.get('total_segments', 0)
 | 
						|
        limits = (-10, self._total_segments * 60 + 10)
 | 
						|
        self._queued_x_sync = limits
 | 
						|
 | 
						|
  def _on_series_drop(self, sender, app_data, user_data):
 | 
						|
    self.add_series(app_data)
 | 
						|
 | 
						|
  @staticmethod
 | 
						|
  def _downsample_worker(series_path, time_array, value_array, target_points):
 | 
						|
    if len(time_array) <= target_points:
 | 
						|
      return series_path, time_array, value_array
 | 
						|
 | 
						|
    step = len(time_array) / target_points
 | 
						|
    indices = []
 | 
						|
 | 
						|
    for i in range(target_points):
 | 
						|
      start_idx = int(i * step)
 | 
						|
      end_idx = int((i + 1) * step)
 | 
						|
      if start_idx == end_idx:
 | 
						|
        indices.append(start_idx)
 | 
						|
      else:
 | 
						|
        bucket_values = value_array[start_idx:end_idx]
 | 
						|
        min_idx = start_idx + np.argmin(bucket_values)
 | 
						|
        max_idx = start_idx + np.argmax(bucket_values)
 | 
						|
        if min_idx != max_idx:
 | 
						|
          indices.extend([min(min_idx, max_idx), max(min_idx, max_idx)])
 | 
						|
        else:
 | 
						|
          indices.append(min_idx)
 | 
						|
    indices = sorted(set(indices))
 | 
						|
    return series_path, time_array[indices], value_array[indices]
 | 
						|
 |