|
|
@ -157,7 +157,7 @@ def init_baudrate(pigeon: TTYPigeon): |
|
|
|
pigeon.set_baud(460800) |
|
|
|
pigeon.set_baud(460800) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def initialize_pigeon(pigeon: TTYPigeon) -> bool: |
|
|
|
def init_pigeon(pigeon: TTYPigeon) -> bool: |
|
|
|
# try initializing a few times |
|
|
|
# try initializing a few times |
|
|
|
for _ in range(10): |
|
|
|
for _ in range(10): |
|
|
|
try: |
|
|
|
try: |
|
|
@ -250,8 +250,6 @@ def initialize_pigeon(pigeon: TTYPigeon) -> bool: |
|
|
|
return True |
|
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
def deinitialize_and_exit(pigeon: TTYPigeon | None): |
|
|
|
def deinitialize_and_exit(pigeon: TTYPigeon | None): |
|
|
|
cloudlog.warning("Storing almanac in ublox flash") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if pigeon is not None: |
|
|
|
if pigeon is not None: |
|
|
|
# controlled GNSS stop |
|
|
|
# controlled GNSS stop |
|
|
|
pigeon.send(b"\xB5\x62\x06\x04\x04\x00\x00\x00\x08\x00\x16\x74") |
|
|
|
pigeon.send(b"\xB5\x62\x06\x04\x04\x00\x00\x00\x08\x00\x16\x74") |
|
|
@ -260,12 +258,9 @@ def deinitialize_and_exit(pigeon: TTYPigeon | None): |
|
|
|
set_power(False) |
|
|
|
set_power(False) |
|
|
|
sys.exit(0) |
|
|
|
sys.exit(0) |
|
|
|
|
|
|
|
|
|
|
|
def create_pigeon() -> tuple[TTYPigeon, messaging.PubMaster]: |
|
|
|
def init(pigeon: TTYPigeon) -> None: |
|
|
|
pigeon = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# register exit handler |
|
|
|
# register exit handler |
|
|
|
signal.signal(signal.SIGINT, lambda sig, frame: deinitialize_and_exit(pigeon)) |
|
|
|
signal.signal(signal.SIGINT, lambda sig, frame: deinitialize_and_exit(pigeon)) |
|
|
|
pm = messaging.PubMaster(['ubloxRaw']) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# power cycle ublox |
|
|
|
# power cycle ublox |
|
|
|
set_power(False) |
|
|
|
set_power(False) |
|
|
@ -273,23 +268,23 @@ def create_pigeon() -> tuple[TTYPigeon, messaging.PubMaster]: |
|
|
|
set_power(True) |
|
|
|
set_power(True) |
|
|
|
time.sleep(0.5) |
|
|
|
time.sleep(0.5) |
|
|
|
|
|
|
|
|
|
|
|
pigeon = TTYPigeon() |
|
|
|
init_baudrate(pigeon) |
|
|
|
return pigeon, pm |
|
|
|
init_pigeon(pigeon) |
|
|
|
|
|
|
|
|
|
|
|
def run_receiving(pigeon: TTYPigeon, pm: messaging.PubMaster, duration: int = 0): |
|
|
|
def run_receiving(duration: int = 0): |
|
|
|
|
|
|
|
pm = messaging.PubMaster(['ubloxRaw']) |
|
|
|
|
|
|
|
|
|
|
|
start_time = time.monotonic() |
|
|
|
pigeon = TTYPigeon() |
|
|
|
def end_condition(): |
|
|
|
init(pigeon) |
|
|
|
return True if duration == 0 else time.monotonic() - start_time < duration |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
start_time = time.monotonic() |
|
|
|
last_almanac_save = time.monotonic() |
|
|
|
last_almanac_save = time.monotonic() |
|
|
|
while end_condition(): |
|
|
|
while (duration == 0) or (time.monotonic() - start_time < duration): |
|
|
|
dat = pigeon.receive() |
|
|
|
dat = pigeon.receive() |
|
|
|
if len(dat) > 0: |
|
|
|
if len(dat) > 0: |
|
|
|
if dat[0] == 0x00: |
|
|
|
if dat[0] == 0x00: |
|
|
|
cloudlog.warning("received invalid data from ublox, re-initing!") |
|
|
|
cloudlog.warning("received invalid data from ublox, re-initing!") |
|
|
|
init_baudrate(pigeon) |
|
|
|
init(pigeon) |
|
|
|
initialize_pigeon(pigeon) |
|
|
|
|
|
|
|
continue |
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
# send out to socket |
|
|
|
# send out to socket |
|
|
@ -308,13 +303,7 @@ def run_receiving(pigeon: TTYPigeon, pm: messaging.PubMaster, duration: int = 0) |
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
|
def main(): |
|
|
|
assert TICI, "unsupported hardware for pigeond" |
|
|
|
assert TICI, "unsupported hardware for pigeond" |
|
|
|
|
|
|
|
run_receiving() |
|
|
|
pigeon, pm = create_pigeon() |
|
|
|
|
|
|
|
init_baudrate(pigeon) |
|
|
|
|
|
|
|
initialize_pigeon(pigeon) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# start receiving data |
|
|
|
|
|
|
|
run_receiving(pigeon, pm) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
if __name__ == "__main__": |
|
|
|
main() |
|
|
|
main() |
|
|
|