#!/usr/bin/env python3 import sys import json import base64 import os import subprocess from multiprocessing import Pool from openpilot.tools.lib.route import Route from openpilot.tools.lib.logreader import LogReader try: from mcap.writer import Writer, CompressionType except ImportError: print("mcap module not found. Attempting to install...") subprocess.run([sys.executable, "-m", "pip", "install", "mcap"]) # Attempt to import again after installation try: from mcap.writer import Writer, CompressionType except ImportError: print("Failed to install mcap module. Exiting.") sys.exit(1) FOXGLOVE_IMAGE_SCHEME_TITLE = "foxglove.CompressedImage" FOXGLOVE_GEOJSON_TITLE = "foxglove.GeoJSON" FOXGLOVE_IMAGE_ENCODING = "base64" OUT_MCAP_FILE_NAME = "json_log.mcap" RLOG_FOLDER = "rlogs" SCHEMAS_FOLDER = "schemas" SCHEMA_EXTENSION = ".json" schemas: dict[str, int] = {} channels: dict[str, int] = {} writer: Writer def convertBytesToString(data): if isinstance(data, bytes): return data.decode('latin-1') # Assuming UTF-8 encoding, adjust if needed elif isinstance(data, list): return [convertBytesToString(item) for item in data] elif isinstance(data, dict): return {key: convertBytesToString(value) for key, value in data.items()} else: return data # Load jsonscheme for every Event def loadSchema(schemaName): with open(os.path.join(SCHEMAS_FOLDER, schemaName + SCHEMA_EXTENSION), "r") as file: return json.loads(file.read()) # Foxglove creates one graph of an array, and not one for each item of an array # This can be avoided by transforming array to separate objects def transformListsToJsonDict(json_data): def convert_array_to_dict(array): new_dict = {} for index, item in enumerate(array): if isinstance(item, dict): new_dict[index] = transformListsToJsonDict(item) else: new_dict[index] = item return new_dict new_data = {} for key, value in json_data.items(): if isinstance(value, list): new_data[key] = convert_array_to_dict(value) elif isinstance(value, dict): new_data[key] = transformListsToJsonDict(value) else: new_data[key] = value return new_data # Transform openpilot thumbnail to foxglove compressedImage def transformToFoxgloveSchema(jsonMsg): bytesImgData = jsonMsg.get("thumbnail").get("thumbnail").encode('latin1') base64ImgData = base64.b64encode(bytesImgData) base64_string = base64ImgData.decode('utf-8') foxMsg = { "timestamp": {"sec": "0", "nsec": jsonMsg.get("logMonoTime")}, "frame_id": str(jsonMsg.get("thumbnail").get("frameId")), "data": base64_string, "format": "jpeg", } return foxMsg # TODO: Check if there is a tool to build GEOJson def transformMapCoordinates(jsonMsg): coordinates = [] for jsonCoords in jsonMsg.get("navRoute").get("coordinates"): coordinates.append([jsonCoords.get("longitude"), jsonCoords.get("latitude")]) # Define the GeoJSON geojson_data = { "type": "FeatureCollection", "features": [{"type": "Feature", "geometry": {"type": "LineString", "coordinates": coordinates}, "logMonoTime": jsonMsg.get("logMonoTime")}], } # Create the final JSON with the GeoJSON data encoded as a string geoJson = {"geojson": json.dumps(geojson_data)} return geoJson def jsonToScheme(jsonData): zeroArray = False schema = {"type": "object", "properties": {}, "required": []} for key, value in jsonData.items(): if isinstance(value, dict): tempScheme, zeroArray = jsonToScheme(value) if tempScheme == 0: return 0 schema["properties"][key] = tempScheme schema["required"].append(key) elif isinstance(value, list): if all(isinstance(item, dict) for item in value) and len(value) > 0: # Handle zero value arrays # Handle array of objects tempScheme, zeroArray = jsonToScheme(value[0]) schema["properties"][key] = {"type": "array", "items": tempScheme if value else {}} schema["required"].append(key) else: if len(value) == 0: zeroArray = True # Handle array of primitive types schema["properties"][key] = {"type": "array", "items": {"type": "string"}} schema["required"].append(key) else: typeName = type(value).__name__ if typeName == "str": typeName = "string" elif typeName == "bool": typeName = "boolean" elif typeName == "float": typeName = "number" elif typeName == "int": typeName = "integer" schema["properties"][key] = {"type": typeName} schema["required"].append(key) return schema, zeroArray def saveScheme(scheme, schemaFileName): schemaFileName = schemaFileName + SCHEMA_EXTENSION # Create the new schemas folder os.makedirs(SCHEMAS_FOLDER, exist_ok=True) with open(os.path.join(SCHEMAS_FOLDER, schemaFileName), 'w') as json_file: json.dump(convertBytesToString(scheme), json_file) def convertToFoxGloveFormat(jsonData, rlogTopic): jsonData["title"] = rlogTopic if rlogTopic == "thumbnail": jsonData = transformToFoxgloveSchema(jsonData) jsonData["title"] = FOXGLOVE_IMAGE_SCHEME_TITLE elif rlogTopic == "navRoute": jsonData = transformMapCoordinates(jsonData) jsonData["title"] = FOXGLOVE_GEOJSON_TITLE else: jsonData = transformListsToJsonDict(jsonData) return jsonData def generateSchemas(): listOfDirs = os.listdir(RLOG_FOLDER) # Open every dir in rlogs for directory in listOfDirs: # List every file in every rlog dir dirPath = os.path.join(RLOG_FOLDER, directory) listOfFiles = os.listdir(dirPath) lastIteration = len(listOfFiles) for iteration, file in enumerate(listOfFiles): # Load json data from every file until found one without empty arrays filePath = os.path.join(dirPath, file) with open(filePath, 'r') as jsonFile: jsonData = json.load(jsonFile) scheme, zerroArray = jsonToScheme(jsonData) # If array of len 0 has been found, type of its data can not be parsed, skip to the next log # in search for a non empty array. If there is not an non empty array in logs, put a dummy string type if zerroArray and not iteration == lastIteration - 1: continue title = jsonData.get("title") scheme["title"] = title # Add contentEncoding type, hardcoded in foxglove format if title == FOXGLOVE_IMAGE_SCHEME_TITLE: scheme["properties"]["data"]["contentEncoding"] = FOXGLOVE_IMAGE_ENCODING saveScheme(scheme, directory) break def downloadLogs(logPaths): segment_counter = 0 for logPath in logPaths: segment_counter += 1 msg_counter = 1 print(segment_counter) rlog = LogReader(logPath) for msg in rlog: jsonMsg = json.loads(json.dumps(convertBytesToString(msg.to_dict()))) jsonMsg = convertToFoxGloveFormat(jsonMsg, msg.which()) rlog_dir_path = os.path.join(RLOG_FOLDER, msg.which()) if not os.path.exists(rlog_dir_path): os.makedirs(rlog_dir_path) file_path = os.path.join(rlog_dir_path, str(segment_counter) + "," + str(msg_counter)) with open(file_path, 'w') as json_file: json.dump(jsonMsg, json_file) msg_counter += 1 def getLogMonoTime(jsonMsg): if jsonMsg.get("title") == FOXGLOVE_IMAGE_SCHEME_TITLE: logMonoTime = jsonMsg.get("timestamp").get("nsec") elif jsonMsg.get("title") == FOXGLOVE_GEOJSON_TITLE: logMonoTime = json.loads(jsonMsg.get("geojson")).get("features")[0].get("logMonoTime") else: logMonoTime = jsonMsg.get("logMonoTime") return logMonoTime def processMsgs(args): msgFile, rlogTopicPath, rlogTopic = args msgFilePath = os.path.join(rlogTopicPath, msgFile) with open(msgFilePath, "r") as file: jsonMsg = json.load(file) logMonoTime = getLogMonoTime(jsonMsg) return {'channel_id': channels[rlogTopic], 'log_time': logMonoTime, 'data': json.dumps(jsonMsg).encode("utf-8"), 'publish_time': logMonoTime} # Get logs from a path, and convert them into mcap def createMcap(logPaths): print(f"Downloading logs [{len(logPaths)}]") downloadLogs(logPaths) print("Creating schemas") generateSchemas() print("Creating mcap file") listOfRlogTopics = os.listdir(RLOG_FOLDER) print(f"Registering schemas and channels [{len(listOfRlogTopics)}]") for counter, rlogTopic in enumerate(listOfRlogTopics): print(counter) schema = loadSchema(rlogTopic) schema_id = writer.register_schema(name=schema.get("title"), encoding="jsonschema", data=json.dumps(schema).encode()) schemas[rlogTopic] = schema_id channel_id = writer.register_channel(schema_id=schemas[rlogTopic], topic=rlogTopic, message_encoding="json") channels[rlogTopic] = channel_id rlogTopicPath = os.path.join(RLOG_FOLDER, rlogTopic) msgFiles = os.listdir(rlogTopicPath) pool = Pool() results = pool.map(processMsgs, [(msgFile, rlogTopicPath, rlogTopic) for msgFile in msgFiles]) pool.close() pool.join() for result in results: writer.add_message(channel_id=result['channel_id'], log_time=result['log_time'], data=result['data'], publish_time=result['publish_time']) def is_program_installed(program_name): try: # Check if the program is installed using dpkg (for traditional Debian packages) subprocess.run(["dpkg", "-l", program_name], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return True except subprocess.CalledProcessError: # Check if the program is installed using snap try: subprocess.run(["snap", "list", program_name], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return True except subprocess.CalledProcessError: return False if __name__ == '__main__': # Example usage: program_name = "foxglove-studio" # Change this to the program you want to check if is_program_installed(program_name): print(f"{program_name} detected.") else: print(f"{program_name} could not be detected.") installFoxglove = input("Would you like to install it? YES/NO? - ") if installFoxglove.lower() == "yes": try: subprocess.run(['./install_foxglove.sh'], check=True) print("Installation completed successfully.") except subprocess.CalledProcessError as e: print(f"Installation failed with return code {e.returncode}.") # Get a route if len(sys.argv) == 1: route_name = "a2a0ccea32023010|2023-07-27--13-01-19" print("No route was provided, using demo route") else: route_name = sys.argv[1] # Get logs for a route print("Getting route log paths") route = Route(route_name) logPaths = route.log_paths() # Start mcap writer with open(OUT_MCAP_FILE_NAME, "wb") as stream: writer = Writer(stream, compression=CompressionType.NONE) writer.start() createMcap(logPaths) writer.finish() print(f"File {OUT_MCAP_FILE_NAME} has been successfully created. Please import it into foxglove studio to continue.")