diff --git a/tools/foxglove/fox.py b/tools/foxglove/fox.py deleted file mode 100755 index a1e930d893..0000000000 --- a/tools/foxglove/fox.py +++ /dev/null @@ -1,304 +0,0 @@ -#!/usr/bin/env python3 -import sys -import json -import base64 -import os -import subprocess -from multiprocessing import Pool -from openpilot.tools.lib.route import Route -from openpilot.tools.lib.logreader import LogReader - -try: - from mcap.writer import Writer, CompressionType -except ImportError: - print("mcap module not found. Attempting to install...") - subprocess.run([sys.executable, "-m", "pip", "install", "mcap"]) - # Attempt to import again after installation - try: - from mcap.writer import Writer, CompressionType - except ImportError: - print("Failed to install mcap module. Exiting.") - sys.exit(1) - - -FOXGLOVE_IMAGE_SCHEME_TITLE = "foxglove.CompressedImage" -FOXGLOVE_GEOJSON_TITLE = "foxglove.GeoJSON" -FOXGLOVE_IMAGE_ENCODING = "base64" -OUT_MCAP_FILE_NAME = "json_log.mcap" -RLOG_FOLDER = "rlogs" -SCHEMAS_FOLDER = "schemas" -SCHEMA_EXTENSION = ".json" - -schemas: dict[str, int] = {} -channels: dict[str, int] = {} -writer: Writer - - -def convertBytesToString(data): - if isinstance(data, bytes): - return data.decode('latin-1') # Assuming UTF-8 encoding, adjust if needed - elif isinstance(data, list): - return [convertBytesToString(item) for item in data] - elif isinstance(data, dict): - return {key: convertBytesToString(value) for key, value in data.items()} - else: - return data - - -# Load jsonscheme for every Event -def loadSchema(schemaName): - with open(os.path.join(SCHEMAS_FOLDER, schemaName + SCHEMA_EXTENSION), "r") as file: - return json.loads(file.read()) - - -# Foxglove creates one graph of an array, and not one for each item of an array -# This can be avoided by transforming array to separate objects -def transformListsToJsonDict(json_data): - def convert_array_to_dict(array): - new_dict = {} - for index, item in enumerate(array): - if isinstance(item, dict): - new_dict[index] = transformListsToJsonDict(item) - else: - new_dict[index] = item - return new_dict - - new_data = {} - for key, value in json_data.items(): - if isinstance(value, list): - new_data[key] = convert_array_to_dict(value) - elif isinstance(value, dict): - new_data[key] = transformListsToJsonDict(value) - else: - new_data[key] = value - return new_data - - -# Transform openpilot thumbnail to foxglove compressedImage -def transformToFoxgloveSchema(jsonMsg): - bytesImgData = jsonMsg.get("thumbnail").get("thumbnail").encode('latin1') - base64ImgData = base64.b64encode(bytesImgData) - base64_string = base64ImgData.decode('utf-8') - foxMsg = { - "timestamp": {"sec": "0", "nsec": jsonMsg.get("logMonoTime")}, - "frame_id": str(jsonMsg.get("thumbnail").get("frameId")), - "data": base64_string, - "format": "jpeg", - } - return foxMsg - - -# TODO: Check if there is a tool to build GEOJson -def transformMapCoordinates(jsonMsg): - coordinates = [] - for jsonCoords in jsonMsg.get("navRoute").get("coordinates"): - coordinates.append([jsonCoords.get("longitude"), jsonCoords.get("latitude")]) - - # Define the GeoJSON - geojson_data = { - "type": "FeatureCollection", - "features": [{"type": "Feature", "geometry": {"type": "LineString", "coordinates": coordinates}, "logMonoTime": jsonMsg.get("logMonoTime")}], - } - - # Create the final JSON with the GeoJSON data encoded as a string - geoJson = {"geojson": json.dumps(geojson_data)} - - return geoJson - - -def jsonToScheme(jsonData): - zeroArray = False - schema = {"type": "object", "properties": {}, "required": []} - for key, value in jsonData.items(): - if isinstance(value, dict): - tempScheme, zeroArray = jsonToScheme(value) - if tempScheme == 0: - return 0 - schema["properties"][key] = tempScheme - schema["required"].append(key) - elif isinstance(value, list): - if all(isinstance(item, dict) for item in value) and len(value) > 0: # Handle zero value arrays - # Handle array of objects - tempScheme, zeroArray = jsonToScheme(value[0]) - schema["properties"][key] = {"type": "array", "items": tempScheme if value else {}} - schema["required"].append(key) - else: - if len(value) == 0: - zeroArray = True - # Handle array of primitive types - schema["properties"][key] = {"type": "array", "items": {"type": "string"}} - schema["required"].append(key) - else: - typeName = type(value).__name__ - if typeName == "str": - typeName = "string" - elif typeName == "bool": - typeName = "boolean" - elif typeName == "float": - typeName = "number" - elif typeName == "int": - typeName = "integer" - schema["properties"][key] = {"type": typeName} - schema["required"].append(key) - - return schema, zeroArray - - -def saveScheme(scheme, schemaFileName): - schemaFileName = schemaFileName + SCHEMA_EXTENSION - # Create the new schemas folder - os.makedirs(SCHEMAS_FOLDER, exist_ok=True) - with open(os.path.join(SCHEMAS_FOLDER, schemaFileName), 'w') as json_file: - json.dump(convertBytesToString(scheme), json_file) - - -def convertToFoxGloveFormat(jsonData, rlogTopic): - jsonData["title"] = rlogTopic - if rlogTopic == "thumbnail": - jsonData = transformToFoxgloveSchema(jsonData) - jsonData["title"] = FOXGLOVE_IMAGE_SCHEME_TITLE - elif rlogTopic == "navRoute": - jsonData = transformMapCoordinates(jsonData) - jsonData["title"] = FOXGLOVE_GEOJSON_TITLE - else: - jsonData = transformListsToJsonDict(jsonData) - return jsonData - - -def generateSchemas(): - listOfDirs = os.listdir(RLOG_FOLDER) - # Open every dir in rlogs - for directory in listOfDirs: - # List every file in every rlog dir - dirPath = os.path.join(RLOG_FOLDER, directory) - listOfFiles = os.listdir(dirPath) - lastIteration = len(listOfFiles) - for iteration, file in enumerate(listOfFiles): - # Load json data from every file until found one without empty arrays - filePath = os.path.join(dirPath, file) - with open(filePath, 'r') as jsonFile: - jsonData = json.load(jsonFile) - scheme, zerroArray = jsonToScheme(jsonData) - # If array of len 0 has been found, type of its data can not be parsed, skip to the next log - # in search for a non empty array. If there is not an non empty array in logs, put a dummy string type - if zerroArray and not iteration == lastIteration - 1: - continue - title = jsonData.get("title") - scheme["title"] = title - # Add contentEncoding type, hardcoded in foxglove format - if title == FOXGLOVE_IMAGE_SCHEME_TITLE: - scheme["properties"]["data"]["contentEncoding"] = FOXGLOVE_IMAGE_ENCODING - saveScheme(scheme, directory) - break - - -def downloadLogs(logPaths): - segment_counter = 0 - for logPath in logPaths: - segment_counter += 1 - msg_counter = 1 - print(segment_counter) - rlog = LogReader(logPath) - for msg in rlog: - jsonMsg = json.loads(json.dumps(convertBytesToString(msg.to_dict()))) - jsonMsg = convertToFoxGloveFormat(jsonMsg, msg.which()) - rlog_dir_path = os.path.join(RLOG_FOLDER, msg.which()) - if not os.path.exists(rlog_dir_path): - os.makedirs(rlog_dir_path) - file_path = os.path.join(rlog_dir_path, str(segment_counter) + "," + str(msg_counter)) - with open(file_path, 'w') as json_file: - json.dump(jsonMsg, json_file) - msg_counter += 1 - - -def getLogMonoTime(jsonMsg): - if jsonMsg.get("title") == FOXGLOVE_IMAGE_SCHEME_TITLE: - logMonoTime = jsonMsg.get("timestamp").get("nsec") - elif jsonMsg.get("title") == FOXGLOVE_GEOJSON_TITLE: - logMonoTime = json.loads(jsonMsg.get("geojson")).get("features")[0].get("logMonoTime") - else: - logMonoTime = jsonMsg.get("logMonoTime") - return logMonoTime - - -def processMsgs(args): - msgFile, rlogTopicPath, rlogTopic = args - msgFilePath = os.path.join(rlogTopicPath, msgFile) - with open(msgFilePath, "r") as file: - jsonMsg = json.load(file) - logMonoTime = getLogMonoTime(jsonMsg) - return {'channel_id': channels[rlogTopic], 'log_time': logMonoTime, 'data': json.dumps(jsonMsg).encode("utf-8"), 'publish_time': logMonoTime} - - -# Get logs from a path, and convert them into mcap -def createMcap(logPaths): - print(f"Downloading logs [{len(logPaths)}]") - downloadLogs(logPaths) - print("Creating schemas") - generateSchemas() - print("Creating mcap file") - - listOfRlogTopics = os.listdir(RLOG_FOLDER) - print(f"Registering schemas and channels [{len(listOfRlogTopics)}]") - for counter, rlogTopic in enumerate(listOfRlogTopics): - print(counter) - schema = loadSchema(rlogTopic) - schema_id = writer.register_schema(name=schema.get("title"), encoding="jsonschema", data=json.dumps(schema).encode()) - schemas[rlogTopic] = schema_id - channel_id = writer.register_channel(schema_id=schemas[rlogTopic], topic=rlogTopic, message_encoding="json") - channels[rlogTopic] = channel_id - rlogTopicPath = os.path.join(RLOG_FOLDER, rlogTopic) - msgFiles = os.listdir(rlogTopicPath) - pool = Pool() - results = pool.map(processMsgs, [(msgFile, rlogTopicPath, rlogTopic) for msgFile in msgFiles]) - pool.close() - pool.join() - for result in results: - writer.add_message(channel_id=result['channel_id'], log_time=result['log_time'], data=result['data'], publish_time=result['publish_time']) - - -def is_program_installed(program_name): - try: - # Check if the program is installed using dpkg (for traditional Debian packages) - subprocess.run(["dpkg", "-l", program_name], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - return True - except subprocess.CalledProcessError: - # Check if the program is installed using snap - try: - subprocess.run(["snap", "list", program_name], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - return True - except subprocess.CalledProcessError: - return False - - -if __name__ == '__main__': - # Example usage: - program_name = "foxglove-studio" # Change this to the program you want to check - if is_program_installed(program_name): - print(f"{program_name} detected.") - else: - print(f"{program_name} could not be detected.") - installFoxglove = input("Would you like to install it? YES/NO? - ") - if installFoxglove.lower() == "yes": - try: - subprocess.run(['./install_foxglove.sh'], check=True) - print("Installation completed successfully.") - except subprocess.CalledProcessError as e: - print(f"Installation failed with return code {e.returncode}.") - # Get a route - if len(sys.argv) == 1: - route_name = "a2a0ccea32023010|2023-07-27--13-01-19" - print("No route was provided, using demo route") - else: - route_name = sys.argv[1] - # Get logs for a route - print("Getting route log paths") - route = Route(route_name) - logPaths = route.log_paths() - # Start mcap writer - with open(OUT_MCAP_FILE_NAME, "wb") as stream: - writer = Writer(stream, compression=CompressionType.NONE) - writer.start() - createMcap(logPaths) - writer.finish() - print(f"File {OUT_MCAP_FILE_NAME} has been successfully created. Please import it into foxglove studio to continue.") diff --git a/tools/foxglove/install_foxglove.sh b/tools/foxglove/install_foxglove.sh deleted file mode 100755 index 0f401549a2..0000000000 --- a/tools/foxglove/install_foxglove.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash -echo "Installing foxglvoe studio..." -sudo snap install foxglove-studio