# https://raw.githubusercontent.com/pjreddie/darknet/master/cfg/yolov3.cfg
import sys
import io
import time
import math
import cv2
import numpy as np
from PIL import Image
from tinygrad . tensor import Tensor
from tinygrad . nn import BatchNorm2d , Conv2d
from tinygrad . helpers import fetch
def show_labels ( prediction , confidence = 0.5 , num_classes = 80 ) :
coco_labels = fetch ( ' https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.names ' ) . read_bytes ( )
coco_labels = coco_labels . decode ( ' utf-8 ' ) . split ( ' \n ' )
prediction = prediction . detach ( ) . numpy ( )
conf_mask = ( prediction [ : , : , 4 ] > confidence )
prediction * = np . expand_dims ( conf_mask , 2 )
labels = [ ]
# Iterate over batches
for img_pred in prediction :
max_conf = np . amax ( img_pred [ : , 5 : 5 + num_classes ] , axis = 1 )
max_conf_score = np . argmax ( img_pred [ : , 5 : 5 + num_classes ] , axis = 1 )
max_conf_score = np . expand_dims ( max_conf_score , axis = 1 )
max_conf = np . expand_dims ( max_conf , axis = 1 )
seq = ( img_pred [ : , : 5 ] , max_conf , max_conf_score )
image_pred = np . concatenate ( seq , axis = 1 )
non_zero_ind = np . nonzero ( image_pred [ : , 4 ] ) [ 0 ]
assert all ( image_pred [ non_zero_ind , 0 ] > 0 )
image_pred_ = np . reshape ( image_pred [ np . squeeze ( non_zero_ind ) , : ] , ( - 1 , 7 ) )
classes , indexes = np . unique ( image_pred_ [ : , - 1 ] , return_index = True )
for index , coco_class in enumerate ( classes ) :
label , probability = coco_labels [ int ( coco_class ) ] , image_pred_ [ indexes [ index ] ] [ 4 ] * 100
print ( f " Detected { label } { probability : .2f } " )
labels . append ( label )
return labels
def add_boxes ( img , prediction ) :
if isinstance ( prediction , int ) : # no predictions
return img
coco_labels = fetch ( ' https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.names ' ) . read_bytes ( )
coco_labels = coco_labels . decode ( ' utf-8 ' ) . split ( ' \n ' )
height , width = img . shape [ 0 : 2 ]
scale_factor = 608 / width
prediction [ : , [ 1 , 3 ] ] - = ( 608 - scale_factor * width ) / 2
prediction [ : , [ 2 , 4 ] ] - = ( 608 - scale_factor * height ) / 2
for pred in prediction :
corner1 = tuple ( pred [ 1 : 3 ] . astype ( int ) )
corner2 = tuple ( pred [ 3 : 5 ] . astype ( int ) )
w = corner2 [ 0 ] - corner1 [ 0 ]
h = corner2 [ 1 ] - corner1 [ 1 ]
corner2 = ( corner2 [ 0 ] + w , corner2 [ 1 ] + h )
label = coco_labels [ int ( pred [ - 1 ] ) ]
img = cv2 . rectangle ( img , corner1 , corner2 , ( 255 , 0 , 0 ) , 2 )
t_size = cv2 . getTextSize ( label , cv2 . FONT_HERSHEY_PLAIN , 1 , 1 ) [ 0 ]
c2 = corner1 [ 0 ] + t_size [ 0 ] + 3 , corner1 [ 1 ] + t_size [ 1 ] + 4
img = cv2 . rectangle ( img , corner1 , c2 , ( 255 , 0 , 0 ) , - 1 )
img = cv2 . putText ( img , label , ( corner1 [ 0 ] , corner1 [ 1 ] + t_size [ 1 ] + 4 ) , cv2 . FONT_HERSHEY_PLAIN , 1 , [ 225 , 255 , 255 ] , 1 )
return img
def bbox_iou ( box1 , box2 ) :
"""
Returns the IoU of two bounding boxes
IoU : IoU = Area Of Overlap / Area of Union - > How close the predicted bounding box is
to the ground truth bounding box . Higher IoU = Better accuracy
In training , used to track accuracy . with inference , using to remove duplicate bounding boxes
"""
# Get the coordinates of bounding boxes
b1_x1 , b1_y1 , b1_x2 , b1_y2 = box1 [ : , 0 ] , box1 [ : , 1 ] , box1 [ : , 2 ] , box1 [ : , 3 ]
b2_x1 , b2_y1 , b2_x2 , b2_y2 = box2 [ : , 0 ] , box2 [ : , 1 ] , box2 [ : , 2 ] , box2 [ : , 3 ]
# get the coordinates of the intersection rectangle
inter_rect_x1 = np . maximum ( b1_x1 , b2_x1 )
inter_rect_y1 = np . maximum ( b1_y1 , b2_y1 )
inter_rect_x2 = np . maximum ( b1_x2 , b2_x2 )
inter_rect_y2 = np . maximum ( b1_y2 , b2_y2 )
#Intersection area
inter_area = np . clip ( inter_rect_x2 - inter_rect_x1 + 1 , 0 , 99999 ) * np . clip ( inter_rect_y2 - inter_rect_y1 + 1 , 0 , 99999 )
#Union Area
b1_area = ( b1_x2 - b1_x1 + 1 ) * ( b1_y2 - b1_y1 + 1 )
b2_area = ( b2_x2 - b2_x1 + 1 ) * ( b2_y2 - b2_y1 + 1 )
iou = inter_area / ( b1_area + b2_area - inter_area )
return iou
def process_results ( prediction , confidence = 0.9 , num_classes = 80 , nms_conf = 0.4 ) :
prediction = prediction . detach ( ) . numpy ( )
conf_mask = ( prediction [ : , : , 4 ] > confidence )
conf_mask = np . expand_dims ( conf_mask , 2 )
prediction = prediction * conf_mask
# Non max suppression
box_corner = prediction
box_corner [ : , : , 0 ] = ( prediction [ : , : , 0 ] - prediction [ : , : , 2 ] / 2 )
box_corner [ : , : , 1 ] = ( prediction [ : , : , 1 ] - prediction [ : , : , 3 ] / 2 )
box_corner [ : , : , 2 ] = ( prediction [ : , : , 0 ] + prediction [ : , : , 2 ] / 2 )
box_corner [ : , : , 3 ] = ( prediction [ : , : , 1 ] + prediction [ : , : , 3 ] / 2 )
prediction [ : , : , : 4 ] = box_corner [ : , : , : 4 ]
write = False
# Process img
img_pred = prediction [ 0 ]
max_conf = np . amax ( img_pred [ : , 5 : 5 + num_classes ] , axis = 1 )
max_conf_score = np . argmax ( img_pred [ : , 5 : 5 + num_classes ] , axis = 1 )
max_conf_score = np . expand_dims ( max_conf_score , axis = 1 )
max_conf = np . expand_dims ( max_conf , axis = 1 )
seq = ( img_pred [ : , : 5 ] , max_conf , max_conf_score )
image_pred = np . concatenate ( seq , axis = 1 )
non_zero_ind = np . nonzero ( image_pred [ : , 4 ] ) [ 0 ]
assert all ( image_pred [ non_zero_ind , 0 ] > 0 )
image_pred_ = np . reshape ( image_pred [ np . squeeze ( non_zero_ind ) , : ] , ( - 1 , 7 ) )
if image_pred_ . shape [ 0 ] == 0 :
print ( " No detections found! " )
return 0
for cls in np . unique ( image_pred_ [ : , - 1 ] ) :
# perform NMS, get the detections with one particular class
cls_mask = image_pred_ * np . expand_dims ( image_pred_ [ : , - 1 ] == cls , axis = 1 )
class_mask_ind = np . squeeze ( np . nonzero ( cls_mask [ : , - 2 ] ) )
# class_mask_ind = np.nonzero()
image_pred_class = np . reshape ( image_pred_ [ class_mask_ind ] , ( - 1 , 7 ) )
# sort the detections such that the entry with the maximum objectness
# confidence is at the top
conf_sort_index = np . argsort ( image_pred_class [ : , 4 ] )
image_pred_class = image_pred_class [ conf_sort_index ]
for i in range ( image_pred_class . shape [ 0 ] ) :
# Get the IOUs of all boxes that come after the one we are looking at in the loop
try :
ious = bbox_iou ( np . expand_dims ( image_pred_class [ i ] , axis = 0 ) , image_pred_class [ i + 1 : ] )
except :
break
# Zero out all the detections that have IoU > threshold
iou_mask = np . expand_dims ( ( ious < nms_conf ) , axis = 1 )
image_pred_class [ i + 1 : ] * = iou_mask
# Remove the non-zero entries
non_zero_ind = np . squeeze ( np . nonzero ( image_pred_class [ : , 4 ] ) )
image_pred_class = np . reshape ( image_pred_class [ non_zero_ind ] , ( - 1 , 7 ) )
batch_ind = np . array ( [ [ 0 ] ] )
seq = ( batch_ind , image_pred_class )
if not write :
output , write = np . concatenate ( seq , axis = 1 ) , True
else :
out = np . concatenate ( seq , axis = 1 )
output = np . concatenate ( ( output , out ) )
return output
def infer ( model , img ) :
img = np . array ( Image . fromarray ( img ) . resize ( ( 608 , 608 ) ) )
img = img [ : , : , : : - 1 ] . transpose ( ( 2 , 0 , 1 ) )
img = img [ np . newaxis , : , : , : ] / 255.0
prediction = model . forward ( Tensor ( img . astype ( np . float32 ) ) )
return prediction
def parse_cfg ( cfg ) :
# Return a list of blocks
lines = cfg . decode ( " utf-8 " ) . split ( ' \n ' )
lines = [ x for x in lines if len ( x ) > 0 ]
lines = [ x for x in lines if x [ 0 ] != ' # ' ]
lines = [ x . rstrip ( ) . lstrip ( ) for x in lines ]
block , blocks = { } , [ ]
for line in lines :
if line [ 0 ] == " [ " :
if len ( block ) != 0 :
blocks . append ( block )
block = { }
block [ " type " ] = line [ 1 : - 1 ] . rstrip ( )
else :
key , value = line . split ( " = " )
block [ key . rstrip ( ) ] = value . lstrip ( )
blocks . append ( block )
return blocks
# TODO: Speed up this function, avoid copying stuff from GPU to CPU
def predict_transform ( prediction , inp_dim , anchors , num_classes ) :
batch_size = prediction . shape [ 0 ]
stride = inp_dim / / prediction . shape [ 2 ]
grid_size = inp_dim / / stride
bbox_attrs = 5 + num_classes
num_anchors = len ( anchors )
prediction = prediction . reshape ( shape = ( batch_size , bbox_attrs * num_anchors , grid_size * grid_size ) )
prediction = prediction . transpose ( 1 , 2 )
prediction = prediction . reshape ( shape = ( batch_size , grid_size * grid_size * num_anchors , bbox_attrs ) )
prediction_cpu = prediction . numpy ( )
for i in ( 0 , 1 , 4 ) :
prediction_cpu [ : , : , i ] = 1 / ( 1 + np . exp ( - prediction_cpu [ : , : , i ] ) )
# Add the center offsets
grid = np . arange ( grid_size )
a , b = np . meshgrid ( grid , grid )
x_offset = a . reshape ( ( - 1 , 1 ) )
y_offset = b . reshape ( ( - 1 , 1 ) )
x_y_offset = np . concatenate ( ( x_offset , y_offset ) , 1 )
x_y_offset = np . tile ( x_y_offset , ( 1 , num_anchors ) )
x_y_offset = x_y_offset . reshape ( ( - 1 , 2 ) )
x_y_offset = np . expand_dims ( x_y_offset , 0 )
anchors = [ ( a [ 0 ] / stride , a [ 1 ] / stride ) for a in anchors ]
anchors = np . tile ( anchors , ( grid_size * grid_size , 1 ) )
anchors = np . expand_dims ( anchors , 0 )
prediction_cpu [ : , : , : 2 ] + = x_y_offset
prediction_cpu [ : , : , 2 : 4 ] = np . exp ( prediction_cpu [ : , : , 2 : 4 ] ) * anchors
prediction_cpu [ : , : , 5 : 5 + num_classes ] = 1 / ( 1 + np . exp ( - prediction_cpu [ : , : , 5 : 5 + num_classes ] ) )
prediction_cpu [ : , : , : 4 ] * = stride
return Tensor ( prediction_cpu )
class Darknet :
def __init__ ( self , cfg ) :
self . blocks = parse_cfg ( cfg )
self . net_info , self . module_list = self . create_modules ( self . blocks )
print ( " Modules length: " , len ( self . module_list ) )
def create_modules ( self , blocks ) :
net_info = blocks [ 0 ] # Info about model hyperparameters
prev_filters , filters = 3 , None
output_filters , module_list = [ ] , [ ]
## module
for index , x in enumerate ( blocks [ 1 : ] ) :
module_type = x [ " type " ]
module = [ ]
if module_type == " convolutional " :
try :
batch_normalize , bias = int ( x [ " batch_normalize " ] ) , False
except :
batch_normalize , bias = 0 , True
# layer
activation = x [ " activation " ]
filters = int ( x [ " filters " ] )
padding = int ( x [ " pad " ] )
pad = ( int ( x [ " size " ] ) - 1 ) / / 2 if padding else 0
module . append ( Conv2d ( prev_filters , filters , int ( x [ " size " ] ) , int ( x [ " stride " ] ) , pad , bias = bias ) )
# BatchNorm2d
if batch_normalize :
module . append ( BatchNorm2d ( filters , eps = 1e-05 , track_running_stats = True ) )
# LeakyReLU activation
if activation == " leaky " :
module . append ( lambda x : x . leaky_relu ( 0.1 ) )
elif module_type == " maxpool " :
size , stride = int ( x [ " size " ] ) , int ( x [ " stride " ] )
module . append ( lambda x : x . max_pool2d ( kernel_size = ( size , size ) , stride = stride ) )
elif module_type == " upsample " :
module . append ( lambda x : Tensor ( x . numpy ( ) . repeat ( 2 , axis = - 2 ) . repeat ( 2 , axis = - 1 ) ) )
elif module_type == " route " :
x [ " layers " ] = x [ " layers " ] . split ( " , " )
# Start of route
start = int ( x [ " layers " ] [ 0 ] )
# End if it exists
try :
end = int ( x [ " layers " ] [ 1 ] )
except :
end = 0
if start > 0 : start - = index
if end > 0 : end - = index
module . append ( lambda x : x )
if end < 0 :
filters = output_filters [ index + start ] + output_filters [ index + end ]
else :
filters = output_filters [ index + start ]
# Shortcut corresponds to skip connection
elif module_type == " shortcut " :
module . append ( lambda x : x )
elif module_type == " yolo " :
mask = list ( map ( int , x [ " mask " ] . split ( " , " ) ) )
anchors = [ int ( a ) for a in x [ " anchors " ] . split ( " , " ) ]
anchors = [ ( anchors [ i ] , anchors [ i + 1 ] ) for i in range ( 0 , len ( anchors ) , 2 ) ]
module . append ( [ anchors [ i ] for i in mask ] )
# Append to module_list
module_list . append ( module )
if filters is not None :
prev_filters = filters
output_filters . append ( filters )
return ( net_info , module_list )
def dump_weights ( self ) :
for i in range ( len ( self . module_list ) ) :
module_type = self . blocks [ i + 1 ] [ " type " ]
if module_type == " convolutional " :
print ( self . blocks [ i + 1 ] [ " type " ] , " weights " , i )
model = self . module_list [ i ]
conv = model [ 0 ]
print ( conv . weight . numpy ( ) [ 0 ] [ 0 ] [ 0 ] )
if conv . bias is not None :
print ( " biases " )
print ( conv . bias . shape )
print ( conv . bias . numpy ( ) [ 0 ] [ 0 : 5 ] )
else :
print ( " None biases for layer " , i )
def load_weights ( self , url ) :
weights = np . frombuffer ( fetch ( url ) . read_bytes ( ) , dtype = np . float32 ) [ 5 : ]
ptr = 0
for i in range ( len ( self . module_list ) ) :
module_type = self . blocks [ i + 1 ] [ " type " ]
if module_type == " convolutional " :
model = self . module_list [ i ]
try : # we have batchnorm, load conv weights without biases, and batchnorm values
batch_normalize = int ( self . blocks [ i + 1 ] [ " batch_normalize " ] )
except : # no batchnorm, load conv weights + biases
batch_normalize = 0
conv = model [ 0 ]
if batch_normalize :
bn = model [ 1 ]
# Get the number of weights of batchnorm
num_bn_biases = math . prod ( bn . bias . shape )
# Load weights
bn_biases = Tensor ( weights [ ptr : ptr + num_bn_biases ] )
ptr + = num_bn_biases
bn_weights = Tensor ( weights [ ptr : ptr + num_bn_biases ] )
ptr + = num_bn_biases
bn_running_mean = Tensor ( weights [ ptr : ptr + num_bn_biases ] )
ptr + = num_bn_biases
bn_running_var = Tensor ( weights [ ptr : ptr + num_bn_biases ] )
ptr + = num_bn_biases
# Cast the loaded weights into dims of model weights
bn_biases = bn_biases . reshape ( shape = tuple ( bn . bias . shape ) )
bn_weights = bn_weights . reshape ( shape = tuple ( bn . weight . shape ) )
bn_running_mean = bn_running_mean . reshape ( shape = tuple ( bn . running_mean . shape ) )
bn_running_var = bn_running_var . reshape ( shape = tuple ( bn . running_var . shape ) )
# Copy data
bn . bias = bn_biases
bn . weight = bn_weights
bn . running_mean = bn_running_mean
bn . running_var = bn_running_var
else :
# load biases of the conv layer
num_biases = math . prod ( conv . bias . shape )
# Load weights
conv_biases = Tensor ( weights [ ptr : ptr + num_biases ] )
ptr + = num_biases
# Reshape
conv_biases = conv_biases . reshape ( shape = tuple ( conv . bias . shape ) )
# Copy
conv . bias = conv_biases
# Load weighys for conv layers
num_weights = math . prod ( conv . weight . shape )
conv_weights = Tensor ( weights [ ptr : ptr + num_weights ] )
ptr + = num_weights
conv_weights = conv_weights . reshape ( shape = tuple ( conv . weight . shape ) )
conv . weight = conv_weights
def forward ( self , x ) :
modules = self . blocks [ 1 : ]
outputs = { } # Cached outputs for route layer
detections , write = None , False
for i , module in enumerate ( modules ) :
module_type = ( module [ " type " ] )
if module_type == " convolutional " or module_type == " upsample " :
for layer in self . module_list [ i ] :
x = layer ( x )
elif module_type == " route " :
layers = module [ " layers " ]
layers = [ int ( a ) for a in layers ]
if ( layers [ 0 ] ) > 0 :
layers [ 0 ] = layers [ 0 ] - i
if len ( layers ) == 1 :
x = outputs [ i + ( layers [ 0 ] ) ]
else :
if ( layers [ 1 ] ) > 0 : layers [ 1 ] = layers [ 1 ] - i
map1 = outputs [ i + layers [ 0 ] ]
map2 = outputs [ i + layers [ 1 ] ]
x = Tensor ( np . concatenate ( ( map1 . numpy ( ) , map2 . numpy ( ) ) , axis = 1 ) )
elif module_type == " shortcut " :
from_ = int ( module [ " from " ] )
x = outputs [ i - 1 ] + outputs [ i + from_ ]
elif module_type == " yolo " :
anchors = self . module_list [ i ] [ 0 ]
inp_dim = int ( self . net_info [ " height " ] ) # 416
num_classes = int ( module [ " classes " ] )
x = predict_transform ( x , inp_dim , anchors , num_classes )
if not write :
detections , write = x , True
else :
detections = Tensor ( np . concatenate ( ( detections . numpy ( ) , x . numpy ( ) ) , axis = 1 ) )
outputs [ i ] = x
return detections
if __name__ == " __main__ " :
model = Darknet ( fetch ( ' https://raw.githubusercontent.com/pjreddie/darknet/master/cfg/yolov3.cfg ' ) . read_bytes ( ) )
print ( " Loading weights file (237MB). This might take a while… " )
model . load_weights ( ' https://pjreddie.com/media/files/yolov3.weights ' )
if len ( sys . argv ) > 1 :
url = sys . argv [ 1 ]
else :
url = " https://github.com/ayooshkathuria/pytorch-yolo-v3/raw/master/dog-cycle-car.png "
if url == ' webcam ' :
cap = cv2 . VideoCapture ( 0 )
cap . set ( cv2 . CAP_PROP_BUFFERSIZE , 1 )
while 1 :
_ = cap . grab ( ) # discard one frame to circumvent capture buffering
ret , frame = cap . read ( )
prediction = process_results ( infer ( model , frame ) )
img = Image . fromarray ( frame [ : , : , [ 2 , 1 , 0 ] ] )
boxes = add_boxes ( np . array ( img . resize ( ( 608 , 608 ) ) ) , prediction )
boxes = cv2 . cvtColor ( boxes , cv2 . COLOR_RGB2BGR )
cv2 . imshow ( ' yolo ' , boxes )
if cv2 . waitKey ( 1 ) & 0xFF == ord ( ' q ' ) :
break
cap . release ( )
cv2 . destroyAllWindows ( )
elif url . startswith ( ' http ' ) :
img_stream = io . BytesIO ( fetch ( url ) . read_bytes ( ) )
img = cv2 . imdecode ( np . frombuffer ( img_stream . read ( ) , np . uint8 ) , 1 )
else :
img = cv2 . imread ( url )
st = time . time ( )
print ( ' running inference… ' )
prediction = infer ( model , img )
print ( f ' did inference in { ( time . time ( ) - st ) : 2f } s ' )
show_labels ( prediction )
prediction = process_results ( prediction )
boxes = add_boxes ( np . array ( Image . fromarray ( img ) . resize ( ( 608 , 608 ) ) ) , prediction )
cv2 . imwrite ( ' boxes.jpg ' , boxes )