You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
108 lines
4.0 KiB
108 lines
4.0 KiB
import random
|
|
from tinygrad.helpers import round_up
|
|
from tinygrad.runtime.support.am.amdev import AMPageTableTraverseContext
|
|
from test.external.external_test_am import helper_read_entry_components, FakeAM
|
|
|
|
class AMPTFuzzer:
|
|
def __init__(self, total_size):
|
|
self.total_size = total_size
|
|
self.alloc_payload = 0
|
|
self.d = FakeAM()
|
|
|
|
self.allocations: dict[int, tuple[int, int]] = {} # ptr -> (size, pattern)
|
|
|
|
self.min_alloc_size = 0x1000
|
|
self.max_alloc_size = int(total_size * 0.1)
|
|
self.alloc_probability = 0.7
|
|
|
|
def generate_pattern(self, ptr: int, size: int) -> int: return random.randint(0, 0xff)
|
|
|
|
def fill_memory(self, va, size: int, pattern: int):
|
|
ctx = AMPageTableTraverseContext(self.d, self.d.mm.root_page_table, va.va_addr)
|
|
pages = list(ctx.next(size))
|
|
|
|
for _offset, _pt, _pte_idx, _n_ptes, _pte_covers in pages:
|
|
_vaddr = va.va_addr + _offset
|
|
|
|
for i in range(_n_ptes):
|
|
pte = helper_read_entry_components(_pt.entries[_pte_idx + i])
|
|
self.d.vram[pte['paddr']] = pattern # Mark this page
|
|
assert pte['valid'] == 1
|
|
|
|
# If page has contigous fragment, all range should be this valid memory
|
|
frags_cnt = pte['fragment']
|
|
contig_range = (1 << (frags_cnt + 12))
|
|
start_vaddr = _vaddr & ~(contig_range - 1)
|
|
start_paddr = pte['paddr'] - (_vaddr - start_vaddr)
|
|
contig_ptes = contig_range // _pte_covers
|
|
assert contig_ptes > 0
|
|
|
|
ctx = AMPageTableTraverseContext(self.d, self.d.mm.root_page_table, start_vaddr)
|
|
frags_l = list(ctx.next(contig_range))
|
|
for f_offset, f_pt, f_pte_idx, f_n_ptes, f_pte_covers in frags_l:
|
|
for j in range(f_n_ptes):
|
|
f_pte = helper_read_entry_components(f_pt.entries[f_pte_idx + j])
|
|
assert f_pte['valid'] == 1
|
|
assert f_pte['paddr'] == start_paddr+f_offset+j*f_pte_covers, f"paddr {f_pte['paddr']:#x} not {start_paddr+f_offset+j*f_pte_covers:#x}"
|
|
|
|
_vaddr += _pte_covers
|
|
_offset += _pte_covers
|
|
|
|
return pages
|
|
|
|
def verify_memory(self, pages, pattern: int) -> bool:
|
|
for _offset, _pt, _pte_idx, _n_ptes, _pte_covers in pages:
|
|
for i in range(_n_ptes):
|
|
pte = helper_read_entry_components(_pt.entries[_pte_idx + i])
|
|
if self.d.vram[pte['paddr']] != pattern: return False
|
|
if pte['valid'] == 0: return False
|
|
|
|
return True
|
|
|
|
def random_alloc(self):
|
|
if self.total_size - self.alloc_payload < self.min_alloc_size: return None
|
|
|
|
size = random.randint(self.min_alloc_size, min(self.max_alloc_size, self.total_size - self.alloc_payload))
|
|
size = round_up(size, (2 << 20) if size > (4 << 20) else (4 << 10))
|
|
|
|
try: ptr = self.d.mm.valloc(size)
|
|
except MemoryError:
|
|
print(f"Failed to allocate {size} bytes. Payload size is {self.alloc_payload}, so fragmenation is {(size / self.total_size)*100.0:.2f}%")
|
|
return None
|
|
|
|
pattern = self.generate_pattern(ptr, size)
|
|
pages = self.fill_memory(ptr, size, pattern)
|
|
self.allocations[ptr.va_addr] = (size, pattern, pages, ptr)
|
|
self.alloc_payload += size
|
|
print(f"Allocated {size} bytes at {ptr.va_addr:x}, pattern: {pattern:02x}")
|
|
return ptr
|
|
|
|
def random_free(self) -> bool:
|
|
if not self.allocations: return False
|
|
|
|
ptr = random.choice(list(self.allocations.keys()))
|
|
size, pattern, pages, vm = self.allocations[ptr]
|
|
|
|
# Verify pattern before freeing
|
|
if not self.verify_memory(pages, pattern):
|
|
raise RuntimeError(f"Memory corruption detected at {vm.va_addr:x}!")
|
|
|
|
print(f"Freeing {size} bytes at {vm.va_addr:x}, pattern verified: {pattern:02x}")
|
|
self.alloc_payload -= size
|
|
self.d.mm.vfree(vm)
|
|
del self.allocations[ptr]
|
|
return True
|
|
|
|
def run(self):
|
|
for i in range(10000000):
|
|
if (random.random() < self.alloc_probability or not self.allocations): self.random_alloc()
|
|
else: self.random_free()
|
|
|
|
print("\nCleaning up remaining allocations...")
|
|
while self.allocations: self.random_free()
|
|
|
|
print("Fuzzing completed successfully!")
|
|
|
|
if __name__ == "__main__":
|
|
fuzzer = AMPTFuzzer(1 << 30)
|
|
fuzzer.run()
|
|
|