openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

80 lines
2.0 KiB

# -*- coding: utf-8 -
#
# This file is part of gunicorn released under the MIT license.
# See the NOTICE for more information.
import os
from gunicorn import six
# Classes that can undo reading data from
# a given type of data source.
class Unreader(object):
def __init__(self):
self.buf = six.BytesIO()
def chunk(self):
raise NotImplementedError()
def read(self, size=None):
if size is not None and not isinstance(size, six.integer_types):
raise TypeError("size parameter must be an int or long.")
if size is not None:
if size == 0:
return b""
if size < 0:
size = None
self.buf.seek(0, os.SEEK_END)
if size is None and self.buf.tell():
ret = self.buf.getvalue()
self.buf = six.BytesIO()
return ret
if size is None:
d = self.chunk()
return d
while self.buf.tell() < size:
chunk = self.chunk()
if not chunk:
ret = self.buf.getvalue()
self.buf = six.BytesIO()
return ret
self.buf.write(chunk)
data = self.buf.getvalue()
self.buf = six.BytesIO()
self.buf.write(data[size:])
return data[:size]
def unread(self, data):
self.buf.seek(0, os.SEEK_END)
self.buf.write(data)
class SocketUnreader(Unreader):
def __init__(self, sock, max_chunk=8192):
super(SocketUnreader, self).__init__()
self.sock = sock
self.mxchunk = max_chunk
def chunk(self):
return self.sock.recv(self.mxchunk)
class IterUnreader(Unreader):
def __init__(self, iterable):
super(IterUnreader, self).__init__()
self.iter = iter(iterable)
def chunk(self):
if not self.iter:
return b""
try:
return six.next(self.iter)
except StopIteration:
self.iter = None
return b""