openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

326 lines
16 KiB

import unittest, ctypes, struct, time, array
from tinygrad import Device, Tensor, dtypes
from tinygrad.helpers import to_mv, CI
from tinygrad.device import Buffer, BufferSpec
from tinygrad.engine.realize import get_runner
def _time_queue(q, d):
st = time.perf_counter()
q.signal(d.timeline_signal, d.timeline_value)
q.submit(d)
d._wait_signal(d.timeline_signal, d.timeline_value)
d.timeline_value += 1
return time.perf_counter() - st
@unittest.skipUnless(Device.DEFAULT in ["NV", "AMD"], "Runs only on NV or AMD")
class TestHCQ(unittest.TestCase):
@classmethod
def setUpClass(self):
TestHCQ.d0 = Device[Device.DEFAULT]
#TestHCQ.d1: AMDDevice = Device["AMD:1"]
TestHCQ.a = Tensor([0.,1.], device=Device.DEFAULT).realize()
TestHCQ.b = self.a + 1
si = self.b.schedule()[-1]
TestHCQ.runner = get_runner(TestHCQ.d0.device, si.ast)
TestHCQ.b.lazydata.buffer.allocate()
# wow that's a lot of abstraction layers
TestHCQ.addr = struct.pack("QQ", TestHCQ.b.lazydata.buffer._buf.va_addr, TestHCQ.a.lazydata.buffer._buf.va_addr)
TestHCQ.addr2 = struct.pack("QQ", TestHCQ.a.lazydata.buffer._buf.va_addr, TestHCQ.b.lazydata.buffer._buf.va_addr)
TestHCQ.kernargs_off = TestHCQ.runner._prg.kernargs_offset
TestHCQ.kernargs_size = TestHCQ.runner._prg.kernargs_alloc_size
ctypes.memmove(TestHCQ.d0.kernargs_ptr+TestHCQ.kernargs_off, TestHCQ.addr, len(TestHCQ.addr))
ctypes.memmove(TestHCQ.d0.kernargs_ptr+TestHCQ.kernargs_size+TestHCQ.kernargs_off, TestHCQ.addr2, len(TestHCQ.addr2))
if Device.DEFAULT == "AMD":
from tinygrad.runtime.ops_amd import HWQueue, HWPM4Queue
TestHCQ.compute_queue = HWPM4Queue
TestHCQ.copy_queue = HWQueue
elif Device.DEFAULT == "NV":
from tinygrad.runtime.ops_nv import HWQueue, HWQueue
# nv need to copy constbuffer there as well
to_mv(TestHCQ.d0.kernargs_ptr, 0x160).cast('I')[:] = array.array('I', TestHCQ.runner._prg.constbuffer_0)
to_mv(TestHCQ.d0.kernargs_ptr+TestHCQ.kernargs_size, 0x160).cast('I')[:] = array.array('I', TestHCQ.runner._prg.constbuffer_0)
TestHCQ.compute_queue = HWQueue
TestHCQ.copy_queue = HWQueue
def setUp(self):
TestHCQ.d0.synchronize()
TestHCQ.a.lazydata.buffer.copyin(memoryview(bytearray(struct.pack("ff", 0, 1))))
TestHCQ.b.lazydata.buffer.copyin(memoryview(bytearray(struct.pack("ff", 0, 0))))
TestHCQ.d0.synchronize() # wait for copyins to complete
def test_run_1000_times_one_submit(self):
temp_signal, temp_value = TestHCQ.d0._alloc_signal(value=0), 0
q = TestHCQ.compute_queue()
for _ in range(1000):
q.exec(TestHCQ.runner._prg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.p.global_size, TestHCQ.runner.p.local_size)
q.signal(temp_signal, temp_value + 1).wait(temp_signal, temp_value + 1)
temp_value += 1
q.exec(TestHCQ.runner._prg, TestHCQ.d0.kernargs_ptr+TestHCQ.kernargs_size, TestHCQ.runner.p.global_size, TestHCQ.runner.p.local_size)
q.signal(temp_signal, temp_value + 1).wait(temp_signal, temp_value + 1)
temp_value += 1
q.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
q.submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
val = TestHCQ.a.lazydata.buffer.as_buffer().cast("f")[0]
assert val == 2000.0, f"got val {val}"
def test_run_1000_times(self):
temp_signal = TestHCQ.d0._alloc_signal(value=0)
q = TestHCQ.compute_queue()
q.exec(TestHCQ.runner._prg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.p.global_size, TestHCQ.runner.p.local_size)
q.signal(temp_signal, 2).wait(temp_signal, 2)
q.exec(TestHCQ.runner._prg, TestHCQ.d0.kernargs_ptr+TestHCQ.kernargs_size, TestHCQ.runner.p.global_size,
TestHCQ.runner.p.local_size)
for _ in range(1000):
TestHCQ.d0._set_signal(temp_signal, 1)
q.submit(TestHCQ.d0)
TestHCQ.compute_queue().signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
val = TestHCQ.a.lazydata.buffer.as_buffer().cast("f")[0]
assert val == 2000.0, f"got val {val}"
def test_run_to_3(self):
temp_signal = TestHCQ.d0._alloc_signal(value=0)
q = TestHCQ.compute_queue()
q.exec(TestHCQ.runner._prg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.p.global_size, TestHCQ.runner.p.local_size)
q.signal(temp_signal, 1).wait(temp_signal, 1)
q.exec(TestHCQ.runner._prg, TestHCQ.d0.kernargs_ptr+TestHCQ.kernargs_size, TestHCQ.runner.p.global_size, TestHCQ.runner.p.local_size)
q.signal(temp_signal, 2).wait(temp_signal, 2)
q.exec(TestHCQ.runner._prg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.p.global_size, TestHCQ.runner.p.local_size)
q.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
val = TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[0]
assert val == 3.0, f"got val {val}"
def test_update_exec(self):
q = TestHCQ.compute_queue()
exec_cmd_idx = len(q)
q.exec(TestHCQ.runner._prg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.p.global_size, TestHCQ.runner.p.local_size)
q.update_exec(exec_cmd_idx, (1,1,1), (1,1,1))
q.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
val = TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[0]
assert val == 1.0, f"got val {val}"
val = TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[1]
assert val == 0.0, f"got val {val}, should not be updated"
@unittest.skipUnless(Device.DEFAULT == "NV", "Only NV supports bind")
def test_bind_run(self):
temp_signal = TestHCQ.d0._alloc_signal(value=0)
q = TestHCQ.compute_queue()
q.exec(TestHCQ.runner._prg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.p.global_size, TestHCQ.runner.p.local_size)
q.signal(temp_signal, 2).wait(temp_signal, 2)
q.exec(TestHCQ.runner._prg, TestHCQ.d0.kernargs_ptr+TestHCQ.kernargs_size, TestHCQ.runner.p.global_size,
TestHCQ.runner.p.local_size)
q.bind(TestHCQ.d0)
for _ in range(1000):
TestHCQ.d0._set_signal(temp_signal, 1)
q.submit(TestHCQ.d0)
TestHCQ.compute_queue().signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
val = TestHCQ.a.lazydata.buffer.as_buffer().cast("f")[0]
assert val == 2000.0, f"got val {val}"
@unittest.skipUnless(Device.DEFAULT == "NV", "Only NV supports bind")
def test_update_exec_binded(self):
q = TestHCQ.compute_queue()
exec_ptr = q.ptr()
q.exec(TestHCQ.runner._prg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.p.global_size, TestHCQ.runner.p.local_size)
q.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
q.bind(TestHCQ.d0)
q.update_exec(exec_ptr, (1,1,1), (1,1,1))
q.submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
val = TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[0]
assert val == 1.0, f"got val {val}"
val = TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[1]
assert val == 0.0, f"got val {val}, should not be updated"
@unittest.skipIf(CI, "Can't handle async update on CPU")
def test_wait_signal(self):
temp_signal = TestHCQ.d0._alloc_signal(value=0)
TestHCQ.compute_queue().wait(temp_signal, value=1).signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
with self.assertRaises(RuntimeError):
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value, timeout=50)
# clean up
TestHCQ.d0._set_signal(temp_signal, 1)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value, timeout=100)
TestHCQ.d0.timeline_value += 1
@unittest.skipIf(CI, "Can't handle async update on CPU")
def test_wait_copy_signal(self):
temp_signal = TestHCQ.d0._alloc_signal(value=0)
TestHCQ.copy_queue().wait(temp_signal, value=1).signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
with self.assertRaises(RuntimeError):
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value, timeout=50)
# clean up
TestHCQ.d0._set_signal(temp_signal, 1)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value, timeout=100)
TestHCQ.d0.timeline_value += 1
def test_run_normal(self):
q = TestHCQ.compute_queue()
q.exec(TestHCQ.runner._prg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.p.global_size, TestHCQ.runner.p.local_size)
q.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
val = TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[0]
assert val == 1.0, f"got val {val}"
def test_submit_empty_queues(self):
TestHCQ.compute_queue().submit(TestHCQ.d0)
TestHCQ.copy_queue().submit(TestHCQ.d0)
def test_signal_timeout(self):
with self.assertRaises(RuntimeError):
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value, timeout=50)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value + 122, timeout=50)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1, timeout=50)
def test_signal(self):
new_timeline_value = TestHCQ.d0.timeline_value + 0xff
TestHCQ.compute_queue().signal(TestHCQ.d0.timeline_signal, new_timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, new_timeline_value)
TestHCQ.d0.timeline_value = new_timeline_value + 1 # update to not break runtime
def test_copy_signal(self):
new_timeline_value = TestHCQ.d0.timeline_value + 0xff
TestHCQ.copy_queue().signal(TestHCQ.d0.timeline_signal, new_timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, new_timeline_value)
TestHCQ.d0.timeline_value = new_timeline_value + 1 # update to not break runtime
def test_run_signal(self):
q = TestHCQ.compute_queue()
q.exec(TestHCQ.runner._prg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.p.global_size, TestHCQ.runner.p.local_size)
q.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
q.submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
val = TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[0]
assert val == 1.0, f"got val {val}"
def test_copy_1000_times(self):
q = TestHCQ.copy_queue()
q.copy(TestHCQ.a.lazydata.buffer._buf.va_addr, TestHCQ.b.lazydata.buffer._buf.va_addr, 8)
q.copy(TestHCQ.b.lazydata.buffer._buf.va_addr, TestHCQ.a.lazydata.buffer._buf.va_addr, 8)
for _ in range(1000):
q.submit(TestHCQ.d0)
TestHCQ.copy_queue().signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
# confirm the signal didn't exceed the put value
with self.assertRaises(RuntimeError):
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value + 1, timeout=50)
val = TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[1]
assert val == 0.0, f"got val {val}"
def test_copy(self):
q = TestHCQ.copy_queue()
q.copy(TestHCQ.b.lazydata.buffer._buf.va_addr, TestHCQ.a.lazydata.buffer._buf.va_addr, 8)
q.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
q.submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
val = TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[1]
assert val == 1.0, f"got val {val}"
@unittest.skipUnless(Device.DEFAULT == "NV", "Only NV supports bind")
def test_bind_copy(self):
q = TestHCQ.copy_queue()
q.copy(TestHCQ.a.lazydata.buffer._buf.va_addr, TestHCQ.b.lazydata.buffer._buf.va_addr, 8)
q.copy(TestHCQ.b.lazydata.buffer._buf.va_addr, TestHCQ.a.lazydata.buffer._buf.va_addr, 8)
q.bind(TestHCQ.d0)
for _ in range(1000):
q.submit(TestHCQ.d0)
TestHCQ.copy_queue().signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
# confirm the signal didn't exceed the put value
with self.assertRaises(RuntimeError):
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value + 1, timeout=50)
val = TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[1]
assert val == 0.0, f"got val {val}"
def test_copy_bandwidth(self):
# THEORY: the bandwidth is low here because it's only using one SDMA queue. I suspect it's more stable like this at least.
SZ = 2_000_000_000
a = Buffer(Device.DEFAULT, SZ, dtypes.uint8, options=BufferSpec(nolru=True)).allocate()
b = Buffer(Device.DEFAULT, SZ, dtypes.uint8, options=BufferSpec(nolru=True)).allocate()
q = TestHCQ.copy_queue()
q.copy(a._buf.va_addr, b._buf.va_addr, SZ)
et = _time_queue(q, TestHCQ.d0)
gb_s = (SZ/1e9)/et
print(f"same device copy: {et*1e3:.2f} ms, {gb_s:.2f} GB/s")
assert (0.3 if CI else 10) <= gb_s <= 1000
def test_cross_device_copy_bandwidth(self):
SZ = 2_000_000_000
b = Buffer(f"{Device.DEFAULT}:1", SZ, dtypes.uint8, options=BufferSpec(nolru=True)).allocate()
a = Buffer(Device.DEFAULT, SZ, dtypes.uint8, options=BufferSpec(nolru=True)).allocate()
TestHCQ.d0._gpu_map(b._buf)
q = TestHCQ.copy_queue()
q.copy(a._buf.va_addr, b._buf.va_addr, SZ)
et = _time_queue(q, TestHCQ.d0)
gb_s = (SZ/1e9)/et
print(f"cross device copy: {et*1e3:.2f} ms, {gb_s:.2f} GB/s")
assert (0.3 if CI else 2) <= gb_s <= 50
def test_interleave_compute_and_copy(self):
q = TestHCQ.compute_queue()
qc = TestHCQ.copy_queue()
q.exec(TestHCQ.runner._prg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.p.global_size, TestHCQ.runner.p.local_size) # b = [1, 2]
q.signal(sig:=TestHCQ.d0._alloc_signal(value=0), value=1)
qc.wait(sig, value=1)
qc.copy(TestHCQ.a.lazydata.buffer._buf.va_addr, TestHCQ.b.lazydata.buffer._buf.va_addr, 8)
qc.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
qc.submit(TestHCQ.d0)
time.sleep(0.02) # give it time for the wait to fail
q.submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
val = TestHCQ.a.lazydata.buffer.as_buffer().cast("f")[0]
assert val == 1.0, f"got val {val}"
def test_cross_device_signal(self):
d1 = Device[f"{Device.DEFAULT}:1"]
q1 = TestHCQ.compute_queue()
q2 = TestHCQ.compute_queue()
q1.signal(sig:=TestHCQ.d0._alloc_signal(value=0), value=0xfff)
q2.wait(sig, value=0xfff)
q2.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
q2.submit(TestHCQ.d0)
q1.signal(d1.timeline_signal, d1.timeline_value)
q1.submit(d1)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
d1._wait_signal(d1.timeline_signal, d1.timeline_value)
d1.timeline_value += 1
def test_timeline_signal_rollover(self):
# NV 64bit, AMD 32bit
TestHCQ.d0.timeline_value = (1 << 64) - 20 if Device.DEFAULT == "NV" else (1 << 32) - 20 # close value to reset
TestHCQ.compute_queue().signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1)
for _ in range(40):
q = TestHCQ.compute_queue()
q.wait(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1)
q.exec(TestHCQ.runner._prg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.p.global_size, TestHCQ.runner.p.local_size)
q.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
val = TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[0]
assert val == 1.0, f"got val {val}"
if __name__ == "__main__":
unittest.main()