Adding foxglove support (#31907)
	
		
	
				
					
				
			* Adding foxglove support, demo version
* Adding image/thumbnail support
* Adding foxglove map support
* Adding foxglove map support
* Updating title in jsonschemas according to rlog attribute names
* Remove shemas folder, add dynamic schema creation
* Update identation to 2 spaces, code refactored
* Fix, jsonToSchema() recursion returns 0 when an empty arr found
* Update, empty arrays data type set to dummy value of type string
* Enable logs download
* Adding foxglove install functionality
* Adding, transform json lists into json obj
* Adding, pip install mcap library
* format it
* Refactoring msg adding to mcap writer by utilizing pool of worker processes
Disabling compression
---------
Co-authored-by: Justin Newberry <justin@comma.ai>
old-commit-hash: 8a138f9be8
			
			
				vw-mqb-aeb
			
			
		
							parent
							
								
									b33aef01b8
								
							
						
					
					
						commit
						f2c6778548
					
				
				 2 changed files with 306 additions and 0 deletions
			
			
		| @ -0,0 +1,303 @@ | |||||||
|  | import sys | ||||||
|  | import json | ||||||
|  | import base64 | ||||||
|  | import os | ||||||
|  | import subprocess | ||||||
|  | from multiprocessing import Pool | ||||||
|  | from openpilot.tools.lib.route import Route | ||||||
|  | from openpilot.tools.lib.logreader import LogReader | ||||||
|  | 
 | ||||||
|  | try: | ||||||
|  |   from mcap.writer import Writer, CompressionType | ||||||
|  | except ImportError: | ||||||
|  |   print("mcap module not found. Attempting to install...") | ||||||
|  |   subprocess.run([sys.executable, "-m", "pip", "install", "mcap"]) | ||||||
|  |   # Attempt to import again after installation | ||||||
|  |   try: | ||||||
|  |     from mcap.writer import Writer, CompressionType | ||||||
|  |   except ImportError: | ||||||
|  |     print("Failed to install mcap module. Exiting.") | ||||||
|  |     sys.exit(1) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | FOXGLOVE_IMAGE_SCHEME_TITLE = "foxglove.CompressedImage" | ||||||
|  | FOXGLOVE_GEOJSON_TITLE = "foxglove.GeoJSON" | ||||||
|  | FOXGLOVE_IMAGE_ENCODING = "base64" | ||||||
|  | OUT_MCAP_FILE_NAME = "json_log.mcap" | ||||||
|  | RLOG_FOLDER = "rlogs" | ||||||
|  | SCHEMAS_FOLDER = "schemas" | ||||||
|  | SCHEMA_EXTENSION = ".json" | ||||||
|  | 
 | ||||||
|  | schemas: dict[str, int] = {} | ||||||
|  | channels: dict[str, int] = {} | ||||||
|  | writer: Writer | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def convertBytesToString(data): | ||||||
|  |   if isinstance(data, bytes): | ||||||
|  |     return data.decode('latin-1')  # Assuming UTF-8 encoding, adjust if needed | ||||||
|  |   elif isinstance(data, list): | ||||||
|  |     return [convertBytesToString(item) for item in data] | ||||||
|  |   elif isinstance(data, dict): | ||||||
|  |     return {key: convertBytesToString(value) for key, value in data.items()} | ||||||
|  |   else: | ||||||
|  |     return data | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # Load jsonscheme for every Event | ||||||
|  | def loadSchema(schemaName): | ||||||
|  |   with open(os.path.join(SCHEMAS_FOLDER, schemaName + SCHEMA_EXTENSION), "r") as file: | ||||||
|  |     return json.loads(file.read()) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # Foxglove creates one graph of an array, and not one for each item of an array | ||||||
|  | # This can be avoided by transforming array to separate objects | ||||||
|  | def transformListsToJsonDict(json_data): | ||||||
|  |   def convert_array_to_dict(array): | ||||||
|  |     new_dict = {} | ||||||
|  |     for index, item in enumerate(array): | ||||||
|  |       if isinstance(item, dict): | ||||||
|  |         new_dict[index] = transformListsToJsonDict(item) | ||||||
|  |       else: | ||||||
|  |         new_dict[index] = item | ||||||
|  |     return new_dict | ||||||
|  | 
 | ||||||
|  |   new_data = {} | ||||||
|  |   for key, value in json_data.items(): | ||||||
|  |     if isinstance(value, list): | ||||||
|  |       new_data[key] = convert_array_to_dict(value) | ||||||
|  |     elif isinstance(value, dict): | ||||||
|  |       new_data[key] = transformListsToJsonDict(value) | ||||||
|  |     else: | ||||||
|  |       new_data[key] = value | ||||||
|  |   return new_data | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # Transform openpilot thumbnail to foxglove compressedImage | ||||||
|  | def transformToFoxgloveSchema(jsonMsg): | ||||||
|  |   bytesImgData = jsonMsg.get("thumbnail").get("thumbnail").encode('latin1') | ||||||
|  |   base64ImgData = base64.b64encode(bytesImgData) | ||||||
|  |   base64_string = base64ImgData.decode('utf-8') | ||||||
|  |   foxMsg = { | ||||||
|  |     "timestamp": {"sec": "0", "nsec": jsonMsg.get("logMonoTime")}, | ||||||
|  |     "frame_id": str(jsonMsg.get("thumbnail").get("frameId")), | ||||||
|  |     "data": base64_string, | ||||||
|  |     "format": "jpeg", | ||||||
|  |   } | ||||||
|  |   return foxMsg | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # TODO: Check if there is a tool to build GEOJson | ||||||
|  | def transformMapCoordinates(jsonMsg): | ||||||
|  |   coordinates = [] | ||||||
|  |   for jsonCoords in jsonMsg.get("navRoute").get("coordinates"): | ||||||
|  |     coordinates.append([jsonCoords.get("longitude"), jsonCoords.get("latitude")]) | ||||||
|  | 
 | ||||||
|  |   # Define the GeoJSON | ||||||
|  |   geojson_data = { | ||||||
|  |     "type": "FeatureCollection", | ||||||
|  |     "features": [{"type": "Feature", "geometry": {"type": "LineString", "coordinates": coordinates}, "logMonoTime": jsonMsg.get("logMonoTime")}], | ||||||
|  |   } | ||||||
|  | 
 | ||||||
|  |   # Create the final JSON with the GeoJSON data encoded as a string | ||||||
|  |   geoJson = {"geojson": json.dumps(geojson_data)} | ||||||
|  | 
 | ||||||
|  |   return geoJson | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def jsonToScheme(jsonData): | ||||||
|  |   zeroArray = False | ||||||
|  |   schema = {"type": "object", "properties": {}, "required": []} | ||||||
|  |   for key, value in jsonData.items(): | ||||||
|  |     if isinstance(value, dict): | ||||||
|  |       tempScheme, zeroArray = jsonToScheme(value) | ||||||
|  |       if tempScheme == 0: | ||||||
|  |         return 0 | ||||||
|  |       schema["properties"][key] = tempScheme | ||||||
|  |       schema["required"].append(key) | ||||||
|  |     elif isinstance(value, list): | ||||||
|  |       if all(isinstance(item, dict) for item in value) and len(value) > 0:  # Handle zero value arrays | ||||||
|  |         # Handle array of objects | ||||||
|  |         tempScheme, zeroArray = jsonToScheme(value[0]) | ||||||
|  |         schema["properties"][key] = {"type": "array", "items": tempScheme if value else {}} | ||||||
|  |         schema["required"].append(key) | ||||||
|  |       else: | ||||||
|  |         if len(value) == 0: | ||||||
|  |           zeroArray = True | ||||||
|  |         # Handle array of primitive types | ||||||
|  |         schema["properties"][key] = {"type": "array", "items": {"type": "string"}} | ||||||
|  |         schema["required"].append(key) | ||||||
|  |     else: | ||||||
|  |       typeName = type(value).__name__ | ||||||
|  |       if typeName == "str": | ||||||
|  |         typeName = "string" | ||||||
|  |       elif typeName == "bool": | ||||||
|  |         typeName = "boolean" | ||||||
|  |       elif typeName == "float": | ||||||
|  |         typeName = "number" | ||||||
|  |       elif typeName == "int": | ||||||
|  |         typeName = "integer" | ||||||
|  |       schema["properties"][key] = {"type": typeName} | ||||||
|  |       schema["required"].append(key) | ||||||
|  | 
 | ||||||
|  |   return schema, zeroArray | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def saveScheme(scheme, schemaFileName): | ||||||
|  |   schemaFileName = schemaFileName + SCHEMA_EXTENSION | ||||||
|  |   # Create the new schemas folder | ||||||
|  |   os.makedirs(SCHEMAS_FOLDER, exist_ok=True) | ||||||
|  |   with open(os.path.join(SCHEMAS_FOLDER, schemaFileName), 'w') as json_file: | ||||||
|  |     json.dump(convertBytesToString(scheme), json_file) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def convertToFoxGloveFormat(jsonData, rlogTopic): | ||||||
|  |   jsonData["title"] = rlogTopic | ||||||
|  |   if rlogTopic == "thumbnail": | ||||||
|  |     jsonData = transformToFoxgloveSchema(jsonData) | ||||||
|  |     jsonData["title"] = FOXGLOVE_IMAGE_SCHEME_TITLE | ||||||
|  |   elif rlogTopic == "navRoute": | ||||||
|  |     jsonData = transformMapCoordinates(jsonData) | ||||||
|  |     jsonData["title"] = FOXGLOVE_GEOJSON_TITLE | ||||||
|  |   else: | ||||||
|  |     jsonData = transformListsToJsonDict(jsonData) | ||||||
|  |   return jsonData | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def generateSchemas(): | ||||||
|  |   listOfDirs = os.listdir(RLOG_FOLDER) | ||||||
|  |   # Open every dir in rlogs | ||||||
|  |   for directory in listOfDirs: | ||||||
|  |     # List every file in every rlog dir | ||||||
|  |     dirPath = os.path.join(RLOG_FOLDER, directory) | ||||||
|  |     listOfFiles = os.listdir(dirPath) | ||||||
|  |     lastIteration = len(listOfFiles) | ||||||
|  |     for iteration, file in enumerate(listOfFiles): | ||||||
|  |       # Load json data from every file until found one without empty arrays | ||||||
|  |       filePath = os.path.join(dirPath, file) | ||||||
|  |       with open(filePath, 'r') as jsonFile: | ||||||
|  |         jsonData = json.load(jsonFile) | ||||||
|  |         scheme, zerroArray = jsonToScheme(jsonData) | ||||||
|  |         # If array of len 0 has been found, type of its data can not be parsed, skip to the next log | ||||||
|  |         # in search for a non empty array. If there is not an non empty array in logs, put a dummy string type | ||||||
|  |         if zerroArray and not iteration == lastIteration - 1: | ||||||
|  |           continue | ||||||
|  |         title = jsonData.get("title") | ||||||
|  |         scheme["title"] = title | ||||||
|  |         # Add contentEncoding type, hardcoded in foxglove format | ||||||
|  |         if title == FOXGLOVE_IMAGE_SCHEME_TITLE: | ||||||
|  |           scheme["properties"]["data"]["contentEncoding"] = FOXGLOVE_IMAGE_ENCODING | ||||||
|  |         saveScheme(scheme, directory) | ||||||
|  |         break | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def downloadLogs(logPaths): | ||||||
|  |   segment_counter = 0 | ||||||
|  |   for logPath in logPaths: | ||||||
|  |     segment_counter += 1 | ||||||
|  |     msg_counter = 1 | ||||||
|  |     print(segment_counter) | ||||||
|  |     rlog = LogReader(logPath) | ||||||
|  |     for msg in rlog: | ||||||
|  |       jsonMsg = json.loads(json.dumps(convertBytesToString(msg.to_dict()))) | ||||||
|  |       jsonMsg = convertToFoxGloveFormat(jsonMsg, msg.which()) | ||||||
|  |       rlog_dir_path = os.path.join(RLOG_FOLDER, msg.which()) | ||||||
|  |       if not os.path.exists(rlog_dir_path): | ||||||
|  |         os.makedirs(rlog_dir_path) | ||||||
|  |       file_path = os.path.join(rlog_dir_path, str(segment_counter) + "," + str(msg_counter)) | ||||||
|  |       with open(file_path, 'w') as json_file: | ||||||
|  |         json.dump(jsonMsg, json_file) | ||||||
|  |       msg_counter += 1 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def getLogMonoTime(jsonMsg): | ||||||
|  |   if jsonMsg.get("title") == FOXGLOVE_IMAGE_SCHEME_TITLE: | ||||||
|  |     logMonoTime = jsonMsg.get("timestamp").get("nsec") | ||||||
|  |   elif jsonMsg.get("title") == FOXGLOVE_GEOJSON_TITLE: | ||||||
|  |     logMonoTime = json.loads(jsonMsg.get("geojson")).get("features")[0].get("logMonoTime") | ||||||
|  |   else: | ||||||
|  |     logMonoTime = jsonMsg.get("logMonoTime") | ||||||
|  |   return logMonoTime | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def processMsgs(args): | ||||||
|  |   msgFile, rlogTopicPath, rlogTopic = args | ||||||
|  |   msgFilePath = os.path.join(rlogTopicPath, msgFile) | ||||||
|  |   with open(msgFilePath, "r") as file: | ||||||
|  |     jsonMsg = json.load(file) | ||||||
|  |     logMonoTime = getLogMonoTime(jsonMsg) | ||||||
|  |     return {'channel_id': channels[rlogTopic], 'log_time': logMonoTime, 'data': json.dumps(jsonMsg).encode("utf-8"), 'publish_time': logMonoTime} | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # Get logs from a path, and convert them into mcap | ||||||
|  | def createMcap(logPaths): | ||||||
|  |   print(f"Downloading logs [{len(logPaths)}]") | ||||||
|  |   downloadLogs(logPaths) | ||||||
|  |   print("Creating schemas") | ||||||
|  |   generateSchemas() | ||||||
|  |   print("Creating mcap file") | ||||||
|  | 
 | ||||||
|  |   listOfRlogTopics = os.listdir(RLOG_FOLDER) | ||||||
|  |   print(f"Registering schemas and channels [{len(listOfRlogTopics)}]") | ||||||
|  |   for counter, rlogTopic in enumerate(listOfRlogTopics): | ||||||
|  |     print(counter) | ||||||
|  |     schema = loadSchema(rlogTopic) | ||||||
|  |     schema_id = writer.register_schema(name=schema.get("title"), encoding="jsonschema", data=json.dumps(schema).encode()) | ||||||
|  |     schemas[rlogTopic] = schema_id | ||||||
|  |     channel_id = writer.register_channel(schema_id=schemas[rlogTopic], topic=rlogTopic, message_encoding="json") | ||||||
|  |     channels[rlogTopic] = channel_id | ||||||
|  |     rlogTopicPath = os.path.join(RLOG_FOLDER, rlogTopic) | ||||||
|  |     msgFiles = os.listdir(rlogTopicPath) | ||||||
|  |     pool = Pool() | ||||||
|  |     results = pool.map(processMsgs, [(msgFile, rlogTopicPath, rlogTopic) for msgFile in msgFiles]) | ||||||
|  |     pool.close() | ||||||
|  |     pool.join() | ||||||
|  |     for result in results: | ||||||
|  |       writer.add_message(channel_id=result['channel_id'], log_time=result['log_time'], data=result['data'], publish_time=result['publish_time']) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def is_program_installed(program_name): | ||||||
|  |   try: | ||||||
|  |     # Check if the program is installed using dpkg (for traditional Debian packages) | ||||||
|  |     subprocess.run(["dpkg", "-l", program_name], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | ||||||
|  |     return True | ||||||
|  |   except subprocess.CalledProcessError: | ||||||
|  |     # Check if the program is installed using snap | ||||||
|  |     try: | ||||||
|  |       subprocess.run(["snap", "list", program_name], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | ||||||
|  |       return True | ||||||
|  |     except subprocess.CalledProcessError: | ||||||
|  |       return False | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | if __name__ == '__main__': | ||||||
|  |   # Example usage: | ||||||
|  |   program_name = "foxglove-studio"  # Change this to the program you want to check | ||||||
|  |   if is_program_installed(program_name): | ||||||
|  |     print(f"{program_name} detected.") | ||||||
|  |   else: | ||||||
|  |     print(f"{program_name} could not be detected.") | ||||||
|  |     installFoxglove = input("Would you like to install it? YES/NO? - ") | ||||||
|  |     if installFoxglove.lower() == "yes": | ||||||
|  |       try: | ||||||
|  |         subprocess.run(['./install_foxglove.sh'], check=True) | ||||||
|  |         print("Installation completed successfully.") | ||||||
|  |       except subprocess.CalledProcessError as e: | ||||||
|  |         print(f"Installation failed with return code {e.returncode}.") | ||||||
|  |   # Get a route | ||||||
|  |   if len(sys.argv) == 1: | ||||||
|  |     route_name = "a2a0ccea32023010|2023-07-27--13-01-19" | ||||||
|  |     print("No route was provided, using demo route") | ||||||
|  |   else: | ||||||
|  |     route_name = sys.argv[1] | ||||||
|  |   # Get logs for a route | ||||||
|  |   print("Getting route log paths") | ||||||
|  |   route = Route(route_name) | ||||||
|  |   logPaths = route.log_paths() | ||||||
|  |   # Start mcap writer | ||||||
|  |   with open(OUT_MCAP_FILE_NAME, "wb") as stream: | ||||||
|  |     writer = Writer(stream, compression=CompressionType.NONE) | ||||||
|  |     writer.start() | ||||||
|  |     createMcap(logPaths) | ||||||
|  |     writer.finish() | ||||||
|  |     print(f"File {OUT_MCAP_FILE_NAME} has been successfully created. Please import it into foxglove studio to continue.") | ||||||
| @ -0,0 +1,3 @@ | |||||||
|  | #!/bin/bash | ||||||
|  | echo "Installing foxglvoe studio..." | ||||||
|  | sudo snap install foxglove-studio | ||||||
					Loading…
					
					
				
		Reference in new issue