openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

110 lines
3.2 KiB

from huggingface_hub import snapshot_download
from tinygrad import nn, Tensor, TinyJit, Device, GlobalCounters, Context
import time
class Block:
def __init__(self, in_dims, dims, stride=1):
super().__init__()
self.conv1 = nn.Conv2d(
in_dims, dims, kernel_size=3, stride=stride, padding=1, bias=False
)
self.bn1 = nn.BatchNorm(dims)
self.conv2 = nn.Conv2d(
dims, dims, kernel_size=3, stride=1, padding=1, bias=False
)
self.bn2 = nn.BatchNorm(dims)
self.downsample = []
if stride != 1:
self.downsample = [
nn.Conv2d(in_dims, dims, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm(dims)
]
def __call__(self, x):
out = self.bn1(self.conv1(x)).relu()
out = self.bn2(self.conv2(out))
for l in self.downsample:
x = l(x)
out += x
return out.relu()
class ResNet:
def __init__(self, block, num_blocks, num_classes=10):
super().__init__()
self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
self.bn1 = nn.BatchNorm(64)
self.layer1 = self._make_layer(block, 64, 64, num_blocks[0], stride=1)
self.layer2 = self._make_layer(block, 64, 128, num_blocks[1], stride=2)
self.layer3 = self._make_layer(block, 128, 256, num_blocks[2], stride=2)
self.layer4 = self._make_layer(block, 256, 512, num_blocks[3], stride=2)
self.fc = nn.Linear(512, num_classes)
def _make_layer(self, block, in_dims, dims, num_blocks, stride):
strides = [stride] + [1] * (num_blocks - 1)
layers = []
for stride in strides:
layers.append(block(in_dims, dims, stride))
in_dims = dims
return layers
def __call__(self, x:Tensor):
x = self.bn1(self.conv1(x)).relu().max_pool2d()
x = x.sequential(self.layer1)
with Context(WINO=1): x = x.sequential(self.layer2 + self.layer3 + self.layer4)
x = x.mean([2, 3])
x = self.fc(x)
return x
def load():
model = ResNet(Block, [2, 2, 2, 2], num_classes=1000)
file = "model.safetensors"
model_path = snapshot_download(
repo_id="awni/resnet18-mlx",
allow_patterns=[file],
)
state = nn.state.safe_load(model_path + "/" + file)
# mlx is NHWC, tinygrad is NCHW
nn.state.load_state_dict(model, {k:v if len(v.shape) != 4 else v.to(None).permute(0,3,1,2).contiguous() for k,v in state.items()}, strict=False)
return model
if __name__ == "__main__":
resnet18 = load()
@Tensor.test()
def _forward(im): return resnet18(im)
forward = TinyJit(_forward, prune=True)
batch_sizes = [1, 2, 4, 8, 16, 32, 64]
#its = 200
#batch_sizes = [64]
its = 20
print(f"Batch Size | Images-per-second | Milliseconds-per-image")
print(f"---- | ---- | ---- ")
for N in batch_sizes:
forward.reset() # reset the JIT for a new batch size (could make automatic)
image = Tensor.uniform(N, 3, 288, 288)
# Warmup
for _ in range(5):
GlobalCounters.reset()
output = forward(image)
Device.default.synchronize()
tic = time.time()
for _ in range(its):
GlobalCounters.reset()
output = forward(image)
Device.default.synchronize()
toc = time.time()
ims_per_sec = N * its / (toc - tic)
ms_per_im = 1e3 / ims_per_sec
print(f"{N} | {ims_per_sec:.3f} | {ms_per_im:.3f}")