openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

108 lines
4.0 KiB

import random
from tinygrad.helpers import round_up
from tinygrad.runtime.support.am.amdev import AMPageTableTraverseContext
from test.external.external_test_am import helper_read_entry_components, FakeAM
class AMPTFuzzer:
def __init__(self, total_size):
self.total_size = total_size
self.alloc_payload = 0
self.d = FakeAM()
self.allocations: dict[int, tuple[int, int]] = {} # ptr -> (size, pattern)
self.min_alloc_size = 0x1000
self.max_alloc_size = int(total_size * 0.1)
self.alloc_probability = 0.7
def generate_pattern(self, ptr: int, size: int) -> int: return random.randint(0, 0xff)
def fill_memory(self, va, size: int, pattern: int):
ctx = AMPageTableTraverseContext(self.d, self.d.mm.root_page_table, va.va_addr)
pages = list(ctx.next(size))
for _offset, _pt, _pte_idx, _n_ptes, _pte_covers in pages:
_vaddr = va.va_addr + _offset
for i in range(_n_ptes):
pte = helper_read_entry_components(_pt.entries[_pte_idx + i])
self.d.vram[pte['paddr']] = pattern # Mark this page
assert pte['valid'] == 1
# If page has contigous fragment, all range should be this valid memory
frags_cnt = pte['fragment']
contig_range = (1 << (frags_cnt + 12))
start_vaddr = _vaddr & ~(contig_range - 1)
start_paddr = pte['paddr'] - (_vaddr - start_vaddr)
contig_ptes = contig_range // _pte_covers
assert contig_ptes > 0
ctx = AMPageTableTraverseContext(self.d, self.d.mm.root_page_table, start_vaddr)
frags_l = list(ctx.next(contig_range))
for f_offset, f_pt, f_pte_idx, f_n_ptes, f_pte_covers in frags_l:
for j in range(f_n_ptes):
f_pte = helper_read_entry_components(f_pt.entries[f_pte_idx + j])
assert f_pte['valid'] == 1
assert f_pte['paddr'] == start_paddr+f_offset+j*f_pte_covers, f"paddr {f_pte['paddr']:#x} not {start_paddr+f_offset+j*f_pte_covers:#x}"
_vaddr += _pte_covers
_offset += _pte_covers
return pages
def verify_memory(self, pages, pattern: int) -> bool:
for _offset, _pt, _pte_idx, _n_ptes, _pte_covers in pages:
for i in range(_n_ptes):
pte = helper_read_entry_components(_pt.entries[_pte_idx + i])
if self.d.vram[pte['paddr']] != pattern: return False
if pte['valid'] == 0: return False
return True
def random_alloc(self):
if self.total_size - self.alloc_payload < self.min_alloc_size: return None
size = random.randint(self.min_alloc_size, min(self.max_alloc_size, self.total_size - self.alloc_payload))
size = round_up(size, (2 << 20) if size > (4 << 20) else (4 << 10))
try: ptr = self.d.mm.valloc(size)
except MemoryError:
print(f"Failed to allocate {size} bytes. Payload size is {self.alloc_payload}, so fragmenation is {(size / self.total_size)*100.0:.2f}%")
return None
pattern = self.generate_pattern(ptr, size)
pages = self.fill_memory(ptr, size, pattern)
self.allocations[ptr.va_addr] = (size, pattern, pages, ptr)
self.alloc_payload += size
print(f"Allocated {size} bytes at {ptr.va_addr:x}, pattern: {pattern:02x}")
return ptr
def random_free(self) -> bool:
if not self.allocations: return False
ptr = random.choice(list(self.allocations.keys()))
size, pattern, pages, vm = self.allocations[ptr]
# Verify pattern before freeing
if not self.verify_memory(pages, pattern):
raise RuntimeError(f"Memory corruption detected at {vm.va_addr:x}!")
print(f"Freeing {size} bytes at {vm.va_addr:x}, pattern verified: {pattern:02x}")
self.alloc_payload -= size
self.d.mm.vfree(vm)
del self.allocations[ptr]
return True
def run(self):
for i in range(10000000):
if (random.random() < self.alloc_probability or not self.allocations): self.random_alloc()
else: self.random_free()
print("\nCleaning up remaining allocations...")
while self.allocations: self.random_free()
print("Fuzzing completed successfully!")
if __name__ == "__main__":
fuzzer = AMPTFuzzer(1 << 30)
fuzzer.run()