#!/usr/bin/env python
import numpy as np
import unittest
from tinygrad import Tensor , Device , dtypes
from tinygrad . engine . realize import run_schedule
from tinygrad . uop . ops import Ops , UOp , UPat
class TestTensorUOp ( unittest . TestCase ) :
def test_fromcpu_shape_tracker ( self ) :
def helper ( a : np . ndarray ) :
print ( a . shape , a . strides , a . flags . c_contiguous )
b = Tensor ( a ) . uop
#assert b.st.contiguous == a.flags.c_contiguous
assert b . st . shape == a . shape
np . testing . assert_equal ( a , Tensor ( b ) . numpy ( ) )
for ndims in range ( 1 , 4 ) :
a = np . random . randn ( * ( 4 , ) * ndims ) . astype ( np . float32 )
for stride in [ - 2 , 1 , 2 ] :
for start in [ 0 , 1 ] :
helper ( a [ ( slice ( start , None , stride ) , ) * ndims ] )
def test_shuffle_pad_ops_cmpeq ( self ) :
y = Tensor ( [ 1 ] ) . cat ( Tensor ( [ 1 ] ) == 0 ) . numpy ( )
z = Tensor ( [ 1 , 0 ] ) . numpy ( )
np . testing . assert_allclose ( y , z )
def test_shuffle_pad_ops_div ( self ) :
y = Tensor ( [ 1 ] ) . cat ( Tensor ( [ 1 ] ) . div ( Tensor ( [ 2.0 ] ) ) ) . numpy ( )
z = Tensor ( [ 1 , 0.5 ] ) . numpy ( )
np . testing . assert_allclose ( y , z )
def test_shuffle_pad_ops_log ( self ) :
y = Tensor ( [ 1 ] ) . cat ( Tensor ( [ 1 ] ) . log ( ) ) . numpy ( )
z = Tensor ( [ 1 , 0 ] ) . numpy ( )
np . testing . assert_allclose ( y , z )
def test_shuffle_pad_ops_exp ( self ) :
y = Tensor ( [ 1 ] ) . cat ( Tensor ( [ 1 ] ) . exp ( ) ) . numpy ( )
z = Tensor ( [ 1 , np . e ] ) . numpy ( )
np . testing . assert_allclose ( y , z )
def test_device_0_is_the_same_device ( self ) :
a = Tensor ( [ 1 , 2 , 3 ] , f " { Device . DEFAULT } " )
b = Tensor ( [ 1 , 2 , 3 ] , f " { Device . DEFAULT } :0 " )
assert a . device == b . device
def test_shrink_const_into_zero ( self ) :
# regression test to make sure the shapetracker is preserved
a = Tensor . zeros ( 4 , 4 , 4 ) . shrink ( ( None , ( 0 , 0 ) , None ) )
b = Tensor . zeros ( 4 , 1 , 4 )
c = a . cat ( b , dim = 1 )
np . testing . assert_allclose ( c . numpy ( ) , np . concatenate ( ( a . numpy ( ) , b . numpy ( ) ) , axis = 1 ) )
def test_shrink_const_then_cast ( self ) :
# regression test to make sure the shapetracker is preserved
a = Tensor . zeros ( 4 , 4 , 4 ) . shrink ( ( None , ( 0 , 0 ) , None ) ) . cast ( dtypes . int32 )
b = Tensor . zeros ( 4 , 1 , 4 )
c = a . cat ( b , dim = 1 )
np . testing . assert_allclose ( c . numpy ( ) , np . concatenate ( ( a . numpy ( ) , b . numpy ( ) ) , axis = 1 ) )
def test_const_dtype ( self ) :
lb : UOp = Tensor ( [ 1 ] , dtype = dtypes . int ) . uop
assert lb . const_like ( 1 ) . base . arg == 1
assert type ( lb . const_like ( 1 ) . base . arg ) is int
lb : UOp = Tensor ( [ 1 ] , dtype = dtypes . float ) . uop
assert lb . const_like ( 1 ) . base . arg == 1.0
assert type ( lb . const_like ( 1 ) . base . arg ) is float
def test_contiguous_alu ( self ) :
a = Tensor . randn ( 2 , 2 ) . realize ( )
b = Tensor . randn ( 2 , 2 ) . realize ( )
add = ( a + b ) . contiguous ( )
out = add + 2
sched = out . schedule ( )
self . assertEqual ( len ( sched ) , 2 )
run_schedule ( sched )
np . testing . assert_allclose ( out . numpy ( ) , a . numpy ( ) + b . numpy ( ) + 2 )
# NOTE: contiguous on a buffer collapses
def test_contiguous_empty ( self ) :
empty = Tensor . empty ( 1 ) . contiguous ( )
sched = empty . schedule ( )
self . assertEqual ( len ( sched ) , 0 )
def test_contiguous_folded_alu ( self ) :
a = Tensor . empty ( 8 , 8 )
# NOTE: the buffer for mul_0 late folds to just a CONST
mul_0 = a * 0
out = mul_0 . shrink ( ( ( 4 , 8 ) , ( 0 , 8 ) ) ) . contiguous ( )
out . realize ( )
self . assertEqual ( out . tolist ( ) , Tensor . zeros ( 4 , 8 ) . tolist ( ) )
reduce_kernel = UPat ( Ops . SINK , src = ( UPat ( Ops . STORE , src = ( UPat ( ) , UPat ( Ops . REDUCE_AXIS ) ) ) ) )
class TestReduceOp ( unittest . TestCase ) :
def test_no_split_reduce_kernel ( self ) :
a = Tensor . rand ( 4 , 4 ) . realize ( )
a = a . sum ( )
sched = a . schedule ( )
assert len ( sched ) == 1
assert reduce_kernel . match ( sched [ 0 ] . ast , { } )
def test_split_reduce_kernel_dim0 ( self ) :
a = Tensor . rand ( 256 , 255 ) . realize ( )
a = a . sum ( )
sched = a . schedule ( )
assert len ( sched ) == 2
for s in sched :
assert reduce_kernel . match ( s . ast , { } )
def test_split_reduce_kernel_dim1 ( self ) :
a = Tensor . rand ( 255 , 256 ) . realize ( )
a = a . sum ( )
sched = a . schedule ( )
assert len ( sched ) == 2
for s in sched :
assert reduce_kernel . match ( s . ast , { } )
if __name__ == " __main__ " :
unittest . main ( )