|
|
|
@ -8,6 +8,10 @@ import requests |
|
|
|
|
import serial |
|
|
|
|
import subprocess |
|
|
|
|
|
|
|
|
|
class LPAError(Exception): |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LPA: |
|
|
|
|
def __init__(self): |
|
|
|
|
self.env = os.environ.copy() |
|
|
|
@ -22,22 +26,30 @@ class LPA: |
|
|
|
|
if ret.returncode != 0: |
|
|
|
|
raise Exception(f"lpac {cmd} failed: {err}") |
|
|
|
|
|
|
|
|
|
# FIXME: lpac may respond with multiple json objects, so we need to parse all of them |
|
|
|
|
parsed = json.loads(out.decode().strip()) |
|
|
|
|
messages = [] |
|
|
|
|
for line in out.decode().strip().splitlines(): |
|
|
|
|
if line.startswith('{'): |
|
|
|
|
message = json.loads(line) |
|
|
|
|
|
|
|
|
|
# lpac response format validations |
|
|
|
|
assert 'type' in message |
|
|
|
|
assert message['type'] == 'lpa' |
|
|
|
|
assert 'payload' in message |
|
|
|
|
assert 'code' in message['payload'] |
|
|
|
|
|
|
|
|
|
if message['payload']['code'] != 0: |
|
|
|
|
raise LPAError(f"lpac {' '.join(cmd)} failed with code {message['payload']['code']}: {message['payload']['message']}") |
|
|
|
|
|
|
|
|
|
# lpac response format validations |
|
|
|
|
assert 'type' in parsed |
|
|
|
|
assert parsed['type'] == 'lpa' |
|
|
|
|
assert 'payload' in parsed |
|
|
|
|
assert 'code' in parsed['payload'] |
|
|
|
|
messages.append(message['payload']['data']) |
|
|
|
|
|
|
|
|
|
if parsed['payload']['code'] != 0: |
|
|
|
|
raise Exception(f"lpac {' '.join(cmd)} failed: {parsed['payload']['code']}") |
|
|
|
|
# TODO: confirm we should always get a message |
|
|
|
|
if len(messages) == 0: |
|
|
|
|
raise LPAError(f"lpac {cmd} returned no messages") |
|
|
|
|
|
|
|
|
|
return parsed['payload']['data'] |
|
|
|
|
return messages |
|
|
|
|
|
|
|
|
|
def list_profiles(self): |
|
|
|
|
raw = self._invoke(['profile', 'list']) |
|
|
|
|
raw = self._invoke(['profile', 'list'])[-1] # only one message |
|
|
|
|
|
|
|
|
|
profiles = [] |
|
|
|
|
for profile in raw: |
|
|
|
|