openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.

85 lines
3.5 KiB

import capnp
import hypothesis.strategies as st
from typing import Any, Callable, Dict, List, Optional, Union
from cereal import log
DrawType = Callable[[st.SearchStrategy], Any]
class FuzzyGenerator:
def __init__(self, draw: DrawType, real_floats: bool):
self.draw = draw
self.real_floats = real_floats
def generate_native_type(self, field: str) -> st.SearchStrategy[Union[bool, int, float, str, bytes]]:
def floats(**kwargs) -> st.SearchStrategy[float]:
allow_nan = not self.real_floats
allow_infinity = not self.real_floats
return st.floats(**kwargs, allow_nan=allow_nan, allow_infinity=allow_infinity)
if field == 'bool':
return st.booleans()
elif field == 'int8':
return st.integers(min_value=-2**7, max_value=2**7-1)
elif field == 'int16':
return st.integers(min_value=-2**15, max_value=2**15-1)
elif field == 'int32':
return st.integers(min_value=-2**31, max_value=2**31-1)
elif field == 'int64':
return st.integers(min_value=-2**63, max_value=2**63-1)
elif field == 'uint8':
return st.integers(min_value=0, max_value=2**8-1)
elif field == 'uint16':
return st.integers(min_value=0, max_value=2**16-1)
elif field == 'uint32':
return st.integers(min_value=0, max_value=2**32-1)
elif field == 'uint64':
return st.integers(min_value=0, max_value=2**64-1)
elif field == 'float32':
return floats(width=32)
elif field == 'float64':
return floats(width=64)
elif field == 'text':
return st.text(max_size=1000)
elif field == 'data':
return st.binary(max_size=1000)
elif field == 'anyPointer':
return st.text()
else:
raise NotImplementedError(f'Invalid type : {field}')
def generate_field(self, field: capnp.lib.capnp._StructSchemaField) -> st.SearchStrategy:
def rec(field_type: capnp.lib.capnp._DynamicStructReader) -> st.SearchStrategy:
if field_type.which() == 'struct':
return self.generate_struct(field.schema.elementType if base_type == 'list' else field.schema)
elif field_type.which() == 'list':
return st.lists(rec(field_type.list.elementType))
elif field_type.which() == 'enum':
schema = field.schema.elementType if base_type == 'list' else field.schema
return st.sampled_from(list(schema.enumerants.keys()))
else:
return self.generate_native_type(field_type.which())
if 'slot' in field.proto.to_dict():
base_type = field.proto.slot.type.which()
return rec(field.proto.slot.type)
else:
return self.generate_struct(field.schema)
def generate_struct(self, schema: capnp.lib.capnp._StructSchema, event: Optional[str] = None) -> st.SearchStrategy[Dict[str, Any]]:
full_fill: List[str] = list(schema.non_union_fields)
single_fill: List[str] = [event] if event else [self.draw(st.sampled_from(schema.union_fields))] if schema.union_fields else []
return st.fixed_dictionaries({field: self.generate_field(schema.fields[field]) for field in full_fill + single_fill})
@classmethod
def get_random_msg(cls, draw: DrawType, struct: capnp.lib.capnp._StructModule, real_floats: bool = False) -> Dict[str, Any]:
fg = cls(draw, real_floats=real_floats)
data: Dict[str, Any] = draw(fg.generate_struct(struct.schema))
return data
@classmethod
def get_random_event_msg(cls, draw: DrawType, events: List[str], real_floats: bool = False) -> List[Dict[str, Any]]:
fg = cls(draw, real_floats=real_floats)
return [draw(fg.generate_struct(log.Event.schema, e)) for e in sorted(events)]