#!/usr/bin/env python3 import datetime import os import subprocess import time from typing import NoReturn from timezonefinder import TimezoneFinder import cereal.messaging as messaging from openpilot.common.params import Params from openpilot.common.swaglog import cloudlog from openpilot.system.hardware import AGNOS def set_timezone(timezone): valid_timezones = subprocess.check_output('timedatectl list-timezones', shell=True, encoding='utf8').strip().split('\n') if timezone not in valid_timezones: cloudlog.error(f"Timezone not supported {timezone}") return cloudlog.debug(f"Setting timezone to {timezone}") try: if AGNOS: tzpath = os.path.join("/usr/share/zoneinfo/", timezone) subprocess.check_call(f'sudo su -c "ln -snf {tzpath} /data/etc/tmptime && \ mv /data/etc/tmptime /data/etc/localtime"', shell=True) subprocess.check_call(f'sudo su -c "echo \"{timezone}\" > /data/etc/timezone"', shell=True) else: subprocess.check_call(f'sudo timedatectl set-timezone {timezone}', shell=True) except subprocess.CalledProcessError: cloudlog.exception(f"Error setting timezone to {timezone}") def set_time(new_time): diff = datetime.datetime.now() - new_time if diff < datetime.timedelta(seconds=10): cloudlog.debug(f"Time diff too small: {diff}") return cloudlog.debug(f"Setting time to {new_time}") try: subprocess.run(f"TZ=UTC date -s '{new_time}'", shell=True, check=True) except subprocess.CalledProcessError: cloudlog.exception("timed.failed_setting_time") def main() -> NoReturn: """ timed has two responsibilities: - getting the current time - getting the current timezone GPS directly gives time, and timezone is looked up from GPS position. AGNOS will also use NTP to update the time. """ params = Params() tf = TimezoneFinder() # Restore timezone from param tz = params.get("Timezone", encoding='utf8') tf = TimezoneFinder() if tz is not None: cloudlog.debug("Restoring timezone from param") set_timezone(tz) pm = messaging.PubMaster(['clocks']) sm = messaging.SubMaster(['liveLocationKalman']) while True: sm.update(1000) msg = messaging.new_message('clocks', valid=True) msg.clocks.wallTimeNanos = time.time_ns() pm.send('clocks', msg) llk = sm['liveLocationKalman'] if not llk.gpsOK or (time.monotonic() - sm.logMonoTime['liveLocationKalman']/1e9) > 0.2: continue # set time # TODO: account for unixTimesatmpMillis being a (usually short) time in the past gps_time = datetime.datetime.fromtimestamp(llk.unixTimestampMillis / 1000.) set_time(gps_time) # set timezone pos = llk.positionGeodetic.value if len(pos) == 3: gps_timezone = tf.timezone_at(lat=pos[0], lng=pos[1]) if gps_timezone is None: cloudlog.critical(f"No timezone found based on {pos=}") else: set_timezone(gps_timezone) params.put_nonblocking("Timezone", gps_timezone) time.sleep(10) if __name__ == "__main__": main()