This post describes a script for efficient computation and storage of the mean and variance corresponding to data from multiple sources. It uses Redis as a backend storage system. Though its written in Python(mainly cause I work with Django a lot), it can be translated into any popular language out there.
The basic assumptions are as follows:
1. You have data arriving from multiple sources, each possessing a unique ID(String) of some sort. For example, the location in case of weather data.
2. You need to store the mean and variance corresponding to non-overlapping time periods, like days(as given in the script).
3. You don’t want to/need to store the individual data points, just the averages. Moreover, you don’t want to access the permanent data storage (like a file on disk) everytime a new data point comes in, but only when the average stats need to be stored. The primary reason for this could be efficient resource usage.
4. You don’t want to store the process parameters as variables in the program, but rather using a better option like Redis. (Though you can tweak the script to do that too.)
Heres the script:
#Imports from redis import StrictRedis import datetime #Redis Client for communicating with Redis redisdb = StrictRedis() def timestamp(): """ Returns a String that denotes a unique time-period. The output of this function determines the period over which the averaging is done. """ return datetime.datetime.now().strftime("%d-%m-%Y") def _fetch_state(identifier): """ Fetches all the state data from Redis corresponding to the identifier string. If any part of it is not present, it returns empty data. Returns last date timestamp, the sum, the sum of squares, and the counter value (in that order). """ #Get the string data from Redis last_timestamp = redisdb.get(identifier + '_last_timestamp') sigma = redisdb.get(identifier + '_sigma') squares_sigma = redisdb.get( identifier + '_squares_sigma') counter = redisdb.get(identifier + '_counter') #Check if any of the above is None(not present) #If not, parse the numbers if None in [last_timestamp, sigma, squares_sigma, counter]: #If any one is not available, the others are useless #Just reset the values last_timestamp = '' sigma = 0 squares_sigma = 0 counter = 0 else: sigma = float(sigma) squares_sigma = float(squares_sigma) counter = int(counter) return last_timestamp, sigma, squares_sigma, counter def _store_state(identifier, last_timestamp, sigma, squares_sigma, counter): """ Stores the state data corresponding to the identifier, to Redis. """ redisdb.set(identifier + '_last_timestamp', last_timestamp) redisdb.set(identifier + '_sigma', sigma) redisdb.set(identifier + '_squares_sigma', squares_sigma) redisdb.set(identifier + '_counter', counter) def _permanent_store(identifier, mean, variance): """ Stores statistical data to some kind of permanent storage (A file in this case). """ f = open(identifier + '.txt', 'a') f.write(str(identifier) + ', ' + `mean` + ', ' + `variance` + '\n') f.close() def record(identifier, value): """ Records a numeric value corresponding to the identifier. """ #Fetch state data last_timestamp, sigma, squares_sigma, counter = ( _fetch_state(identifier)) #Compute current timestamp current_timestamp = timestamp() if last_timestamp != current_timestamp: #Either a new time period has started, or #this is a new identifier, or #previous state data was lost for some reason. if counter != 0: #A new time period has started, #so compute parameters for previous one and store. counter = float(counter) mean = sigma/counter variance = squares_sigma/counter - mean**2 _permanent_store(identifier, mean, variance) #Intialize state based on newly received data last_timestamp = current_timestamp sigma = value squares_sigma = value**2 counter = 1 else: #Same time period as before, update state sigma += value squares_sigma += value**2 counter += 1 #Store state _store_state(identifier, last_timestamp, sigma, squares_sigma, counter)
Some things to note:
1. The script is resistant to restarts (keeping in mind that any data that comes in during the down-time is lost). It’s also resistant to faults on Redis’ behalf, though any state data stored would then be lost. In both cases, the averaging information may be inaccurate, but not unavailable.
2. You can group sources together if you want (like averaging weather data from all sources in a region). You would just have to implement functionality to map each source ID to the relevant group ID.
3. The script computes averages over days, but using little creativity with the
timestamp function, you can compute statistics over any custom time periods. In fact, you can modify the source IDs to tell the system what time interval to average over.
4. Though the input in the above code is single numbers, you can work with multidimensional data(using NumPy arrays/Pandas) and even other types of information extraction(that can work with streams without requiring historical data).
5. The script stores the data into a text file; you can obviously make it store the data to any other destination.
6. The code is pretty basic. To actually use it as a part of a bigger framework, I would suggest taking measures such as bundling the code into a class, implementing synchronization locks over the state data, etc. All that is upto you and your application ofcourse.
Thanks for reading!