import numpy as np
class RunningStat ( ) :
# tracks realtime mean and standard deviation without storing any data
def __init__ ( self , priors = None , max_trackable = - 1 ) :
self . max_trackable = max_trackable
if priors is not None :
# initialize from history
self . M = priors [ 0 ]
self . S = priors [ 1 ]
self . n = priors [ 2 ]
self . M_last = self . M
self . S_last = self . S
else :
self . reset ( )
def reset ( self ) :
self . M = 0.
self . S = 0.
self . M_last = 0.
self . S_last = 0.
self . n = 0
def push_data ( self , new_data ) :
# short term memory hack
if self . max_trackable < 0 or self . n < self . max_trackable :
self . n + = 1
if self . n == 0 :
self . M_last = new_data
self . M = self . M_last
self . S_last = 0.
else :
self . M = self . M_last + ( new_data - self . M_last ) / self . n
self . S = self . S_last + ( new_data - self . M_last ) * ( new_data - self . M ) ;
self . M_last = self . M
self . S_last = self . S
def mean ( self ) :
return self . M
def variance ( self ) :
if self . n > = 2 :
return self . S / ( self . n - 1. )
else :
return 0
def std ( self ) :
return np . sqrt ( self . variance ( ) )
def params_to_save ( self ) :
return [ self . M , self . S , self . n ]
class RunningStatFilter ( ) :
def __init__ ( self , raw_priors = None , filtered_priors = None , max_trackable = - 1 ) :
self . raw_stat = RunningStat ( raw_priors , - 1 )
self . filtered_stat = RunningStat ( filtered_priors , max_trackable )
def reset ( self ) :
self . raw_stat . reset ( )
self . filtered_stat . reset ( )
def push_and_update ( self , new_data ) :
_std_last = self . raw_stat . std ( )
self . raw_stat . push_data ( new_data )
_delta_std = self . raw_stat . std ( ) - _std_last
if _delta_std < = 0 :
self . filtered_stat . push_data ( new_data )
else :
pass
# self.filtered_stat.push_data(self.filtered_stat.mean())
# class SequentialBayesian():