openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

81 lines
3.6 KiB

import capnp
import hypothesis.strategies as st
from typing import Any
from collections.abc import Callable
from functools import cache
from cereal import log
DrawType = Callable[[st.SearchStrategy], Any]
class FuzzyGenerator:
def __init__(self, draw: DrawType, real_floats: bool):
self.draw = draw
self.native_type_map = FuzzyGenerator._get_native_type_map(real_floats)
def generate_native_type(self, field: str) -> st.SearchStrategy[bool | int | float | str | bytes]:
value_func = self.native_type_map.get(field)
if value_func:
return value_func
else:
raise NotImplementedError(f'Invalid type: {field}')
def generate_field(self, field: capnp.lib.capnp._StructSchemaField) -> st.SearchStrategy:
def rec(field_type: capnp.lib.capnp._DynamicStructReader) -> st.SearchStrategy:
type_which = field_type.which()
if type_which == 'struct':
return self.generate_struct(field.schema.elementType if base_type == 'list' else field.schema)
elif type_which == 'list':
return st.lists(rec(field_type.list.elementType))
elif type_which == 'enum':
schema = field.schema.elementType if base_type == 'list' else field.schema
return st.sampled_from(list(schema.enumerants.keys()))
else:
return self.generate_native_type(type_which)
try:
if hasattr(field.proto, 'slot'):
slot_type = field.proto.slot.type
base_type = slot_type.which()
return rec(slot_type)
else:
return self.generate_struct(field.schema)
except capnp.lib.capnp.KjException:
return self.generate_struct(field.schema)
def generate_struct(self, schema: capnp.lib.capnp._StructSchema, event: str = None) -> st.SearchStrategy[dict[str, Any]]:
single_fill: tuple[str, ...] = (event,) if event else (self.draw(st.sampled_from(schema.union_fields)),) if schema.union_fields else ()
fields_to_generate = schema.non_union_fields + single_fill
return st.fixed_dictionaries({field: self.generate_field(schema.fields[field]) for field in fields_to_generate if not field.endswith('DEPRECATED')})
@staticmethod
@cache
def _get_native_type_map(real_floats: bool) -> dict[str, st.SearchStrategy]:
return {
'bool': st.booleans(),
'int8': st.integers(min_value=-2**7, max_value=2**7-1),
'int16': st.integers(min_value=-2**15, max_value=2**15-1),
'int32': st.integers(min_value=-2**31, max_value=2**31-1),
'int64': st.integers(min_value=-2**63, max_value=2**63-1),
'uint8': st.integers(min_value=0, max_value=2**8-1),
'uint16': st.integers(min_value=0, max_value=2**16-1),
'uint32': st.integers(min_value=0, max_value=2**32-1),
'uint64': st.integers(min_value=0, max_value=2**64-1),
'float32': st.floats(width=32, allow_nan=not real_floats, allow_infinity=not real_floats),
'float64': st.floats(width=64, allow_nan=not real_floats, allow_infinity=not real_floats),
'text': st.text(max_size=1000),
'data': st.binary(max_size=1000),
'anyPointer': st.text(), # Note: No need to define a separate function for anyPointer
}
@classmethod
def get_random_msg(cls, draw: DrawType, struct: capnp.lib.capnp._StructModule, real_floats: bool = False) -> dict[str, Any]:
fg = cls(draw, real_floats=real_floats)
data: dict[str, Any] = draw(fg.generate_struct(struct.schema))
return data
@classmethod
def get_random_event_msg(cls, draw: DrawType, events: list[str], real_floats: bool = False) -> list[dict[str, Any]]:
fg = cls(draw, real_floats=real_floats)
return [draw(fg.generate_struct(log.Event.schema, e)) for e in sorted(events)]