You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							304 lines
						
					
					
						
							11 KiB
						
					
					
				
			
		
		
	
	
							304 lines
						
					
					
						
							11 KiB
						
					
					
				#!/usr/bin/env python3
 | 
						|
import sys
 | 
						|
import json
 | 
						|
import base64
 | 
						|
import os
 | 
						|
import subprocess
 | 
						|
from multiprocessing import Pool
 | 
						|
from openpilot.tools.lib.route import Route
 | 
						|
from openpilot.tools.lib.logreader import LogReader
 | 
						|
 | 
						|
try:
 | 
						|
  from mcap.writer import Writer, CompressionType
 | 
						|
except ImportError:
 | 
						|
  print("mcap module not found. Attempting to install...")
 | 
						|
  subprocess.run([sys.executable, "-m", "pip", "install", "mcap"])
 | 
						|
  # Attempt to import again after installation
 | 
						|
  try:
 | 
						|
    from mcap.writer import Writer, CompressionType
 | 
						|
  except ImportError:
 | 
						|
    print("Failed to install mcap module. Exiting.")
 | 
						|
    sys.exit(1)
 | 
						|
 | 
						|
 | 
						|
FOXGLOVE_IMAGE_SCHEME_TITLE = "foxglove.CompressedImage"
 | 
						|
FOXGLOVE_GEOJSON_TITLE = "foxglove.GeoJSON"
 | 
						|
FOXGLOVE_IMAGE_ENCODING = "base64"
 | 
						|
OUT_MCAP_FILE_NAME = "json_log.mcap"
 | 
						|
RLOG_FOLDER = "rlogs"
 | 
						|
SCHEMAS_FOLDER = "schemas"
 | 
						|
SCHEMA_EXTENSION = ".json"
 | 
						|
 | 
						|
schemas: dict[str, int] = {}
 | 
						|
channels: dict[str, int] = {}
 | 
						|
writer: Writer
 | 
						|
 | 
						|
 | 
						|
def convertBytesToString(data):
 | 
						|
  if isinstance(data, bytes):
 | 
						|
    return data.decode('latin-1')  # Assuming UTF-8 encoding, adjust if needed
 | 
						|
  elif isinstance(data, list):
 | 
						|
    return [convertBytesToString(item) for item in data]
 | 
						|
  elif isinstance(data, dict):
 | 
						|
    return {key: convertBytesToString(value) for key, value in data.items()}
 | 
						|
  else:
 | 
						|
    return data
 | 
						|
 | 
						|
 | 
						|
# Load jsonscheme for every Event
 | 
						|
def loadSchema(schemaName):
 | 
						|
  with open(os.path.join(SCHEMAS_FOLDER, schemaName + SCHEMA_EXTENSION), "r") as file:
 | 
						|
    return json.loads(file.read())
 | 
						|
 | 
						|
 | 
						|
# Foxglove creates one graph of an array, and not one for each item of an array
 | 
						|
# This can be avoided by transforming array to separate objects
 | 
						|
def transformListsToJsonDict(json_data):
 | 
						|
  def convert_array_to_dict(array):
 | 
						|
    new_dict = {}
 | 
						|
    for index, item in enumerate(array):
 | 
						|
      if isinstance(item, dict):
 | 
						|
        new_dict[index] = transformListsToJsonDict(item)
 | 
						|
      else:
 | 
						|
        new_dict[index] = item
 | 
						|
    return new_dict
 | 
						|
 | 
						|
  new_data = {}
 | 
						|
  for key, value in json_data.items():
 | 
						|
    if isinstance(value, list):
 | 
						|
      new_data[key] = convert_array_to_dict(value)
 | 
						|
    elif isinstance(value, dict):
 | 
						|
      new_data[key] = transformListsToJsonDict(value)
 | 
						|
    else:
 | 
						|
      new_data[key] = value
 | 
						|
  return new_data
 | 
						|
 | 
						|
 | 
						|
# Transform openpilot thumbnail to foxglove compressedImage
 | 
						|
def transformToFoxgloveSchema(jsonMsg):
 | 
						|
  bytesImgData = jsonMsg.get("thumbnail").get("thumbnail").encode('latin1')
 | 
						|
  base64ImgData = base64.b64encode(bytesImgData)
 | 
						|
  base64_string = base64ImgData.decode('utf-8')
 | 
						|
  foxMsg = {
 | 
						|
    "timestamp": {"sec": "0", "nsec": jsonMsg.get("logMonoTime")},
 | 
						|
    "frame_id": str(jsonMsg.get("thumbnail").get("frameId")),
 | 
						|
    "data": base64_string,
 | 
						|
    "format": "jpeg",
 | 
						|
  }
 | 
						|
  return foxMsg
 | 
						|
 | 
						|
 | 
						|
# TODO: Check if there is a tool to build GEOJson
 | 
						|
def transformMapCoordinates(jsonMsg):
 | 
						|
  coordinates = []
 | 
						|
  for jsonCoords in jsonMsg.get("navRoute").get("coordinates"):
 | 
						|
    coordinates.append([jsonCoords.get("longitude"), jsonCoords.get("latitude")])
 | 
						|
 | 
						|
  # Define the GeoJSON
 | 
						|
  geojson_data = {
 | 
						|
    "type": "FeatureCollection",
 | 
						|
    "features": [{"type": "Feature", "geometry": {"type": "LineString", "coordinates": coordinates}, "logMonoTime": jsonMsg.get("logMonoTime")}],
 | 
						|
  }
 | 
						|
 | 
						|
  # Create the final JSON with the GeoJSON data encoded as a string
 | 
						|
  geoJson = {"geojson": json.dumps(geojson_data)}
 | 
						|
 | 
						|
  return geoJson
 | 
						|
 | 
						|
 | 
						|
def jsonToScheme(jsonData):
 | 
						|
  zeroArray = False
 | 
						|
  schema = {"type": "object", "properties": {}, "required": []}
 | 
						|
  for key, value in jsonData.items():
 | 
						|
    if isinstance(value, dict):
 | 
						|
      tempScheme, zeroArray = jsonToScheme(value)
 | 
						|
      if tempScheme == 0:
 | 
						|
        return 0
 | 
						|
      schema["properties"][key] = tempScheme
 | 
						|
      schema["required"].append(key)
 | 
						|
    elif isinstance(value, list):
 | 
						|
      if all(isinstance(item, dict) for item in value) and len(value) > 0:  # Handle zero value arrays
 | 
						|
        # Handle array of objects
 | 
						|
        tempScheme, zeroArray = jsonToScheme(value[0])
 | 
						|
        schema["properties"][key] = {"type": "array", "items": tempScheme if value else {}}
 | 
						|
        schema["required"].append(key)
 | 
						|
      else:
 | 
						|
        if len(value) == 0:
 | 
						|
          zeroArray = True
 | 
						|
        # Handle array of primitive types
 | 
						|
        schema["properties"][key] = {"type": "array", "items": {"type": "string"}}
 | 
						|
        schema["required"].append(key)
 | 
						|
    else:
 | 
						|
      typeName = type(value).__name__
 | 
						|
      if typeName == "str":
 | 
						|
        typeName = "string"
 | 
						|
      elif typeName == "bool":
 | 
						|
        typeName = "boolean"
 | 
						|
      elif typeName == "float":
 | 
						|
        typeName = "number"
 | 
						|
      elif typeName == "int":
 | 
						|
        typeName = "integer"
 | 
						|
      schema["properties"][key] = {"type": typeName}
 | 
						|
      schema["required"].append(key)
 | 
						|
 | 
						|
  return schema, zeroArray
 | 
						|
 | 
						|
 | 
						|
def saveScheme(scheme, schemaFileName):
 | 
						|
  schemaFileName = schemaFileName + SCHEMA_EXTENSION
 | 
						|
  # Create the new schemas folder
 | 
						|
  os.makedirs(SCHEMAS_FOLDER, exist_ok=True)
 | 
						|
  with open(os.path.join(SCHEMAS_FOLDER, schemaFileName), 'w') as json_file:
 | 
						|
    json.dump(convertBytesToString(scheme), json_file)
 | 
						|
 | 
						|
 | 
						|
def convertToFoxGloveFormat(jsonData, rlogTopic):
 | 
						|
  jsonData["title"] = rlogTopic
 | 
						|
  if rlogTopic == "thumbnail":
 | 
						|
    jsonData = transformToFoxgloveSchema(jsonData)
 | 
						|
    jsonData["title"] = FOXGLOVE_IMAGE_SCHEME_TITLE
 | 
						|
  elif rlogTopic == "navRoute":
 | 
						|
    jsonData = transformMapCoordinates(jsonData)
 | 
						|
    jsonData["title"] = FOXGLOVE_GEOJSON_TITLE
 | 
						|
  else:
 | 
						|
    jsonData = transformListsToJsonDict(jsonData)
 | 
						|
  return jsonData
 | 
						|
 | 
						|
 | 
						|
def generateSchemas():
 | 
						|
  listOfDirs = os.listdir(RLOG_FOLDER)
 | 
						|
  # Open every dir in rlogs
 | 
						|
  for directory in listOfDirs:
 | 
						|
    # List every file in every rlog dir
 | 
						|
    dirPath = os.path.join(RLOG_FOLDER, directory)
 | 
						|
    listOfFiles = os.listdir(dirPath)
 | 
						|
    lastIteration = len(listOfFiles)
 | 
						|
    for iteration, file in enumerate(listOfFiles):
 | 
						|
      # Load json data from every file until found one without empty arrays
 | 
						|
      filePath = os.path.join(dirPath, file)
 | 
						|
      with open(filePath, 'r') as jsonFile:
 | 
						|
        jsonData = json.load(jsonFile)
 | 
						|
        scheme, zerroArray = jsonToScheme(jsonData)
 | 
						|
        # If array of len 0 has been found, type of its data can not be parsed, skip to the next log
 | 
						|
        # in search for a non empty array. If there is not an non empty array in logs, put a dummy string type
 | 
						|
        if zerroArray and not iteration == lastIteration - 1:
 | 
						|
          continue
 | 
						|
        title = jsonData.get("title")
 | 
						|
        scheme["title"] = title
 | 
						|
        # Add contentEncoding type, hardcoded in foxglove format
 | 
						|
        if title == FOXGLOVE_IMAGE_SCHEME_TITLE:
 | 
						|
          scheme["properties"]["data"]["contentEncoding"] = FOXGLOVE_IMAGE_ENCODING
 | 
						|
        saveScheme(scheme, directory)
 | 
						|
        break
 | 
						|
 | 
						|
 | 
						|
def downloadLogs(logPaths):
 | 
						|
  segment_counter = 0
 | 
						|
  for logPath in logPaths:
 | 
						|
    segment_counter += 1
 | 
						|
    msg_counter = 1
 | 
						|
    print(segment_counter)
 | 
						|
    rlog = LogReader(logPath)
 | 
						|
    for msg in rlog:
 | 
						|
      jsonMsg = json.loads(json.dumps(convertBytesToString(msg.to_dict())))
 | 
						|
      jsonMsg = convertToFoxGloveFormat(jsonMsg, msg.which())
 | 
						|
      rlog_dir_path = os.path.join(RLOG_FOLDER, msg.which())
 | 
						|
      if not os.path.exists(rlog_dir_path):
 | 
						|
        os.makedirs(rlog_dir_path)
 | 
						|
      file_path = os.path.join(rlog_dir_path, str(segment_counter) + "," + str(msg_counter))
 | 
						|
      with open(file_path, 'w') as json_file:
 | 
						|
        json.dump(jsonMsg, json_file)
 | 
						|
      msg_counter += 1
 | 
						|
 | 
						|
 | 
						|
def getLogMonoTime(jsonMsg):
 | 
						|
  if jsonMsg.get("title") == FOXGLOVE_IMAGE_SCHEME_TITLE:
 | 
						|
    logMonoTime = jsonMsg.get("timestamp").get("nsec")
 | 
						|
  elif jsonMsg.get("title") == FOXGLOVE_GEOJSON_TITLE:
 | 
						|
    logMonoTime = json.loads(jsonMsg.get("geojson")).get("features")[0].get("logMonoTime")
 | 
						|
  else:
 | 
						|
    logMonoTime = jsonMsg.get("logMonoTime")
 | 
						|
  return logMonoTime
 | 
						|
 | 
						|
 | 
						|
def processMsgs(args):
 | 
						|
  msgFile, rlogTopicPath, rlogTopic = args
 | 
						|
  msgFilePath = os.path.join(rlogTopicPath, msgFile)
 | 
						|
  with open(msgFilePath, "r") as file:
 | 
						|
    jsonMsg = json.load(file)
 | 
						|
    logMonoTime = getLogMonoTime(jsonMsg)
 | 
						|
    return {'channel_id': channels[rlogTopic], 'log_time': logMonoTime, 'data': json.dumps(jsonMsg).encode("utf-8"), 'publish_time': logMonoTime}
 | 
						|
 | 
						|
 | 
						|
# Get logs from a path, and convert them into mcap
 | 
						|
def createMcap(logPaths):
 | 
						|
  print(f"Downloading logs [{len(logPaths)}]")
 | 
						|
  downloadLogs(logPaths)
 | 
						|
  print("Creating schemas")
 | 
						|
  generateSchemas()
 | 
						|
  print("Creating mcap file")
 | 
						|
 | 
						|
  listOfRlogTopics = os.listdir(RLOG_FOLDER)
 | 
						|
  print(f"Registering schemas and channels [{len(listOfRlogTopics)}]")
 | 
						|
  for counter, rlogTopic in enumerate(listOfRlogTopics):
 | 
						|
    print(counter)
 | 
						|
    schema = loadSchema(rlogTopic)
 | 
						|
    schema_id = writer.register_schema(name=schema.get("title"), encoding="jsonschema", data=json.dumps(schema).encode())
 | 
						|
    schemas[rlogTopic] = schema_id
 | 
						|
    channel_id = writer.register_channel(schema_id=schemas[rlogTopic], topic=rlogTopic, message_encoding="json")
 | 
						|
    channels[rlogTopic] = channel_id
 | 
						|
    rlogTopicPath = os.path.join(RLOG_FOLDER, rlogTopic)
 | 
						|
    msgFiles = os.listdir(rlogTopicPath)
 | 
						|
    pool = Pool()
 | 
						|
    results = pool.map(processMsgs, [(msgFile, rlogTopicPath, rlogTopic) for msgFile in msgFiles])
 | 
						|
    pool.close()
 | 
						|
    pool.join()
 | 
						|
    for result in results:
 | 
						|
      writer.add_message(channel_id=result['channel_id'], log_time=result['log_time'], data=result['data'], publish_time=result['publish_time'])
 | 
						|
 | 
						|
 | 
						|
def is_program_installed(program_name):
 | 
						|
  try:
 | 
						|
    # Check if the program is installed using dpkg (for traditional Debian packages)
 | 
						|
    subprocess.run(["dpkg", "-l", program_name], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 | 
						|
    return True
 | 
						|
  except subprocess.CalledProcessError:
 | 
						|
    # Check if the program is installed using snap
 | 
						|
    try:
 | 
						|
      subprocess.run(["snap", "list", program_name], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 | 
						|
      return True
 | 
						|
    except subprocess.CalledProcessError:
 | 
						|
      return False
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
  # Example usage:
 | 
						|
  program_name = "foxglove-studio"  # Change this to the program you want to check
 | 
						|
  if is_program_installed(program_name):
 | 
						|
    print(f"{program_name} detected.")
 | 
						|
  else:
 | 
						|
    print(f"{program_name} could not be detected.")
 | 
						|
    installFoxglove = input("Would you like to install it? YES/NO? - ")
 | 
						|
    if installFoxglove.lower() == "yes":
 | 
						|
      try:
 | 
						|
        subprocess.run(['./install_foxglove.sh'], check=True)
 | 
						|
        print("Installation completed successfully.")
 | 
						|
      except subprocess.CalledProcessError as e:
 | 
						|
        print(f"Installation failed with return code {e.returncode}.")
 | 
						|
  # Get a route
 | 
						|
  if len(sys.argv) == 1:
 | 
						|
    route_name = "a2a0ccea32023010|2023-07-27--13-01-19"
 | 
						|
    print("No route was provided, using demo route")
 | 
						|
  else:
 | 
						|
    route_name = sys.argv[1]
 | 
						|
  # Get logs for a route
 | 
						|
  print("Getting route log paths")
 | 
						|
  route = Route(route_name)
 | 
						|
  logPaths = route.log_paths()
 | 
						|
  # Start mcap writer
 | 
						|
  with open(OUT_MCAP_FILE_NAME, "wb") as stream:
 | 
						|
    writer = Writer(stream, compression=CompressionType.NONE)
 | 
						|
    writer.start()
 | 
						|
    createMcap(logPaths)
 | 
						|
    writer.finish()
 | 
						|
    print(f"File {OUT_MCAP_FILE_NAME} has been successfully created. Please import it into foxglove studio to continue.")
 | 
						|
 |