You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							165 lines
						
					
					
						
							6.4 KiB
						
					
					
				
			
		
		
	
	
							165 lines
						
					
					
						
							6.4 KiB
						
					
					
				| import os
 | |
| import time
 | |
| import datetime
 | |
| from openpilot.common.time_helpers import system_time_valid
 | |
| from openpilot.selfdrive.ui.ui_state import ui_state
 | |
| from openpilot.system.ui.lib.application import gui_app
 | |
| from openpilot.system.ui.widgets import Widget, DialogResult
 | |
| from openpilot.system.ui.widgets.confirm_dialog import ConfirmDialog
 | |
| from openpilot.system.ui.widgets.list_view import button_item, text_item, ListItem
 | |
| from openpilot.system.ui.widgets.scroller import Scroller
 | |
| 
 | |
| # TODO: remove this. updater fails to respond on startup if time is not correct
 | |
| UPDATED_TIMEOUT = 10  # seconds to wait for updated to respond
 | |
| 
 | |
| 
 | |
| def time_ago(date: datetime.datetime | None) -> str:
 | |
|   if not date:
 | |
|     return "never"
 | |
| 
 | |
|   if not system_time_valid():
 | |
|     return date.strftime("%a %b %d %Y")
 | |
| 
 | |
|   now = datetime.datetime.now(datetime.UTC)
 | |
|   if date.tzinfo is None:
 | |
|     date = date.replace(tzinfo=datetime.UTC)
 | |
| 
 | |
|   diff_seconds = int((now - date).total_seconds())
 | |
|   if diff_seconds < 60:
 | |
|     return "now"
 | |
|   if diff_seconds < 3600:
 | |
|     m = diff_seconds // 60
 | |
|     return f"{m} minute{'s' if m != 1 else ''} ago"
 | |
|   if diff_seconds < 86400:
 | |
|     h = diff_seconds // 3600
 | |
|     return f"{h} hour{'s' if h != 1 else ''} ago"
 | |
|   if diff_seconds < 604800:
 | |
|     d = diff_seconds // 86400
 | |
|     return f"{d} day{'s' if d != 1 else ''} ago"
 | |
|   return date.strftime("%a %b %d %Y")
 | |
| 
 | |
| 
 | |
| class SoftwareLayout(Widget):
 | |
|   def __init__(self):
 | |
|     super().__init__()
 | |
| 
 | |
|     self._onroad_label = ListItem(title="Updates are only downloaded while the car is off.")
 | |
|     self._version_item = text_item("Current Version", ui_state.params.get("UpdaterCurrentDescription") or "")
 | |
|     self._download_btn = button_item("Download", "CHECK", callback=self._on_download_update)
 | |
| 
 | |
|     # Install button is initially hidden
 | |
|     self._install_btn = button_item("Install Update", "INSTALL", callback=self._on_install_update)
 | |
|     self._install_btn.set_visible(False)
 | |
| 
 | |
|     # Track waiting-for-updater transition to avoid brief re-enable while still idle
 | |
|     self._waiting_for_updater = False
 | |
|     self._waiting_start_ts: float = 0.0
 | |
| 
 | |
|     items = self._init_items()
 | |
|     self._scroller = Scroller(items, line_separator=True, spacing=0)
 | |
| 
 | |
|   def _init_items(self):
 | |
|     items = [
 | |
|       self._onroad_label,
 | |
|       self._version_item,
 | |
|       self._download_btn,
 | |
|       self._install_btn,
 | |
|       # TODO: implement branch switching
 | |
|       # button_item("Target Branch", "SELECT", callback=self._on_select_branch),
 | |
|       button_item("Uninstall", "UNINSTALL", callback=self._on_uninstall),
 | |
|     ]
 | |
|     return items
 | |
| 
 | |
|   def show_event(self):
 | |
|     self._scroller.show_event()
 | |
| 
 | |
|   def _render(self, rect):
 | |
|     self._scroller.render(rect)
 | |
| 
 | |
|   def _update_state(self):
 | |
|     # Show/hide onroad warning
 | |
|     self._onroad_label.set_visible(ui_state.is_onroad())
 | |
| 
 | |
|     # Update current version and release notes
 | |
|     current_desc = ui_state.params.get("UpdaterCurrentDescription") or ""
 | |
|     current_release_notes = (ui_state.params.get("UpdaterCurrentReleaseNotes") or b"").decode("utf-8", "replace")
 | |
|     self._version_item.action_item.set_text(current_desc)
 | |
|     self._version_item.set_description(current_release_notes)
 | |
| 
 | |
|     # Update download button visibility and state
 | |
|     self._download_btn.set_visible(ui_state.is_offroad())
 | |
| 
 | |
|     updater_state = ui_state.params.get("UpdaterState") or "idle"
 | |
|     failed_count = ui_state.params.get("UpdateFailedCount") or 0
 | |
|     fetch_available = ui_state.params.get_bool("UpdaterFetchAvailable")
 | |
|     update_available = ui_state.params.get_bool("UpdateAvailable")
 | |
| 
 | |
|     if updater_state != "idle":
 | |
|       # Updater responded
 | |
|       self._waiting_for_updater = False
 | |
|       self._download_btn.action_item.set_enabled(False)
 | |
|       self._download_btn.action_item.set_value(updater_state)
 | |
|     else:
 | |
|       if failed_count > 0:
 | |
|         self._download_btn.action_item.set_value("failed to check for update")
 | |
|         self._download_btn.action_item.set_text("CHECK")
 | |
|       elif fetch_available:
 | |
|         self._download_btn.action_item.set_value("update available")
 | |
|         self._download_btn.action_item.set_text("DOWNLOAD")
 | |
|       else:
 | |
|         last_update = ui_state.params.get("LastUpdateTime")
 | |
|         if last_update:
 | |
|           formatted = time_ago(last_update)
 | |
|           self._download_btn.action_item.set_value(f"up to date, last checked {formatted}")
 | |
|         else:
 | |
|           self._download_btn.action_item.set_value("up to date, last checked never")
 | |
|         self._download_btn.action_item.set_text("CHECK")
 | |
| 
 | |
|       # If we've been waiting too long without a state change, reset state
 | |
|       if self._waiting_for_updater and (time.monotonic() - self._waiting_start_ts > UPDATED_TIMEOUT):
 | |
|         self._waiting_for_updater = False
 | |
| 
 | |
|       # Only enable if we're not waiting for updater to flip out of idle
 | |
|       self._download_btn.action_item.set_enabled(not self._waiting_for_updater)
 | |
| 
 | |
|     # Update install button
 | |
|     self._install_btn.set_visible(ui_state.is_offroad() and update_available)
 | |
|     if update_available:
 | |
|       new_desc = ui_state.params.get("UpdaterNewDescription") or ""
 | |
|       new_release_notes = (ui_state.params.get("UpdaterNewReleaseNotes") or b"").decode("utf-8", "replace")
 | |
|       self._install_btn.action_item.set_text("INSTALL")
 | |
|       self._install_btn.action_item.set_value(new_desc)
 | |
|       self._install_btn.set_description(new_release_notes)
 | |
|       # Enable install button for testing (like Qt showEvent)
 | |
|       self._install_btn.action_item.set_enabled(True)
 | |
|     else:
 | |
|       self._install_btn.set_visible(False)
 | |
| 
 | |
|   def _on_download_update(self):
 | |
|     # Check if we should start checking or start downloading
 | |
|     self._download_btn.action_item.set_enabled(False)
 | |
|     if self._download_btn.action_item.text == "CHECK":
 | |
|       # Start checking for updates
 | |
|       self._waiting_for_updater = True
 | |
|       self._waiting_start_ts = time.monotonic()
 | |
|       os.system("pkill -SIGUSR1 -f system.updated.updated")
 | |
|     else:
 | |
|       # Start downloading
 | |
|       self._waiting_for_updater = True
 | |
|       self._waiting_start_ts = time.monotonic()
 | |
|       os.system("pkill -SIGHUP -f system.updated.updated")
 | |
| 
 | |
|   def _on_uninstall(self):
 | |
|     def handle_uninstall_confirmation(result):
 | |
|       if result == DialogResult.CONFIRM:
 | |
|         ui_state.params.put_bool("DoUninstall", True)
 | |
| 
 | |
|     dialog = ConfirmDialog("Are you sure you want to uninstall?", "Uninstall")
 | |
|     gui_app.set_modal_overlay(dialog, callback=handle_uninstall_confirmation)
 | |
| 
 | |
|   def _on_install_update(self):
 | |
|     # Trigger reboot to install update
 | |
|     self._install_btn.action_item.set_enabled(False)
 | |
|     ui_state.params.put_bool("DoReboot", True)
 | |
| 
 | |
|   def _on_select_branch(self): pass
 | |
| 
 |