#!/usr/bin/env python import os import time import unittest import numpy as np try: import onnx except ModuleNotFoundError: raise unittest.SkipTest("onnx not installed, skipping onnx test") from tinygrad.frontend.onnx import OnnxRunner from tinygrad.tensor import Tensor from tinygrad.helpers import CI, fetch, temp def run_onnx_torch(onnx_model, inputs): import torch from onnx2torch import convert torch_model = convert(onnx_model).float() with torch.no_grad(): torch_out = torch_model(*[torch.tensor(x) for x in inputs.values()]) return torch_out OPENPILOT_MODEL = "https://github.com/commaai/openpilot/raw/v0.9.4/selfdrive/modeld/models/supercombo.onnx" np.random.seed(1337) class TestOnnxModel(unittest.TestCase): def test_benchmark_openpilot_model(self): onnx_model = onnx.load(fetch(OPENPILOT_MODEL)) run_onnx = OnnxRunner(onnx_model) def get_inputs(): np_inputs = { "input_imgs": np.random.randn(*(1, 12, 128, 256)), "big_input_imgs": np.random.randn(*(1, 12, 128, 256)), "desire": np.zeros((1, 100, 8)), "traffic_convention": np.array([[1., 0.]]), "nav_features": np.zeros((1, 256)), "features_buffer": np.zeros((1, 99, 128)), } inputs = {k:Tensor(v.astype(np.float32), requires_grad=False) for k,v in np_inputs.items()} return inputs for _ in range(7): inputs = get_inputs() st = time.monotonic() tinygrad_out = run_onnx(inputs)['outputs'] mt = time.monotonic() tinygrad_out.realize() mt2 = time.monotonic() tinygrad_out = tinygrad_out.numpy() et = time.monotonic() if not CI: print(f"ran openpilot model in {(et-st)*1000.0:.2f} ms, waited {(mt2-mt)*1000.0:.2f} ms for realize, {(et-mt2)*1000.0:.2f} ms for GPU queue") if not CI: import cProfile import pstats inputs = get_inputs() pr = cProfile.Profile(timer=time.perf_counter_ns, timeunit=1e-6) pr.enable() tinygrad_out = run_onnx(inputs)['outputs'] tinygrad_out.realize() tinygrad_out = tinygrad_out.numpy() if not CI: pr.disable() stats = pstats.Stats(pr) stats.dump_stats(temp("net.prof")) os.system(f"flameprof {temp('net.prof')} > {temp('prof.svg')}") ps = stats.sort_stats(pstats.SortKey.TIME) ps.print_stats(30) def test_openpilot_model(self): onnx_model = onnx.load(fetch(OPENPILOT_MODEL)) run_onnx = OnnxRunner(onnx_model) print("got run_onnx") inputs = { "input_imgs": np.random.randn(*(1, 12, 128, 256)), "big_input_imgs": np.random.randn(*(1, 12, 128, 256)), "desire": np.zeros((1, 100, 8)), "traffic_convention": np.array([[1., 0.]]), "nav_features": np.zeros((1, 256)), "features_buffer": np.zeros((1, 99, 128)), } inputs = {k:v.astype(np.float32) for k,v in inputs.items()} st = time.monotonic() print("****** run onnx ******") tinygrad_out = run_onnx(inputs)['outputs'] mt = time.monotonic() print("****** realize ******") tinygrad_out.realize() mt2 = time.monotonic() tinygrad_out = tinygrad_out.numpy() et = time.monotonic() print(f"ran openpilot model in {(et-st)*1000.0:.2f} ms, waited {(mt2-mt)*1000.0:.2f} ms for realize, {(et-mt2)*1000.0:.2f} ms for GPU queue") Tensor.no_grad = True torch_out = run_onnx_torch(onnx_model, inputs).numpy() Tensor.no_grad = False print(tinygrad_out, torch_out) np.testing.assert_allclose(tinygrad_out, torch_out, atol=1e-4, rtol=1e-2) @unittest.skip("slow") def test_efficientnet(self): input_name, input_new = "images:0", True self._test_model( fetch("https://github.com/onnx/models/raw/main/validated/vision/classification/efficientnet-lite4/model/efficientnet-lite4-11.onnx"), input_name, input_new) def test_shufflenet(self): input_name, input_new = "gpu_0/data_0", False self._test_model( fetch("https://github.com/onnx/models/raw/main/validated/vision/classification/shufflenet/model/shufflenet-9.onnx"), input_name, input_new) @unittest.skip("test is very slow") def test_resnet(self): # NOTE: many onnx models can't be run right now due to max pool with strides != kernel_size input_name, input_new = "data", False self._test_model( fetch("https://github.com/onnx/models/raw/main/validated/vision/classification/resnet/model/resnet18-v2-7.onnx"), input_name, input_new) def _test_model(self, fn, input_name, input_new, debug=False): onnx_model = onnx.load(fn) print("onnx loaded") from test.models.test_efficientnet import chicken_img, car_img, preprocess, _LABELS run_onnx = OnnxRunner(onnx_model) def run(img): inputs = {input_name: preprocess(img, new=input_new)} tinygrad_out = list(run_onnx(inputs, debug=debug).values())[0].numpy() return tinygrad_out.argmax() cls = run(chicken_img) print(cls, _LABELS[cls]) assert _LABELS[cls] == "hen" or _LABELS[cls] == "cock" cls = run(car_img) print(cls, _LABELS[cls]) assert "car" in _LABELS[cls] or _LABELS[cls] == "convertible" if __name__ == "__main__": unittest.main()