Efficient computation and storage of basic data statistics using Redis

This post describes a script for efficient computation and storage of the mean and variance corresponding to data from multiple sources. It uses Redis as a backend storage system. Though its written in Python(mainly cause I work with Django a lot), it can be translated into any popular language out there.

The basic assumptions are as follows:

1. You have data arriving from multiple sources, each possessing a unique ID(String) of some sort. For example, the location in case of weather data.

2. You need to store the mean and variance corresponding to non-overlapping time periods, like days(as given in the script).

3. You don’t want to/need to store the individual data points, just the averages. Moreover, you don’t want to access the permanent data storage (like a file on disk) everytime a new data point comes in, but only when the average stats need to be stored. The primary reason for this could be efficient resource usage.

4. You don’t want to store the process parameters as variables in the program, but rather using a better option like Redis. (Though you can tweak the script to do that too.)

Heres the script:


#Imports
from redis import StrictRedis
import datetime

#Redis Client for communicating with Redis
redisdb = StrictRedis()


def timestamp():
    """
    Returns a String that denotes a unique time-period.
    The output of this function determines the period over which
    the averaging is done.
    """
    return datetime.datetime.now().strftime("%d-%m-%Y")


def _fetch_state(identifier):
    """
    Fetches all the state data from Redis corresponding to the
    identifier string.
    If any part of it is not present, it returns empty data.
    Returns last date timestamp, the sum, the sum of squares,
    and the counter value (in that order).
    """

    #Get the string data from Redis
    last_timestamp = redisdb.get(identifier + '_last_timestamp')
    sigma = redisdb.get(identifier + '_sigma')
    squares_sigma = redisdb.get(
    identifier + '_squares_sigma')
    counter = redisdb.get(identifier + '_counter')

    #Check if any of the above is None(not present)
    #If not, parse the numbers
    if None in [last_timestamp, sigma, squares_sigma,
           counter]:
        #If any one is not available, the others are useless
        #Just reset the values
        last_timestamp = ''
        sigma = 0
        squares_sigma = 0
        counter = 0
    else:
        sigma = float(sigma)
        squares_sigma = float(squares_sigma)
        counter = int(counter)

    return last_timestamp, sigma, squares_sigma, counter


def _store_state(identifier, last_timestamp, sigma, squares_sigma,
    counter):
    """
    Stores the state data corresponding to the identifier, to Redis.
    """

    redisdb.set(identifier + '_last_timestamp', last_timestamp)
    redisdb.set(identifier + '_sigma', sigma)
    redisdb.set(identifier + '_squares_sigma',
        squares_sigma)
    redisdb.set(identifier + '_counter', counter)


def _permanent_store(identifier, mean, variance):
    """
    Stores statistical data to some kind of permanent storage
    (A file in this case).
    """
    f = open(identifier + '.txt', 'a')
    f.write(str(identifier) + ', ' + `mean` + ', ' + `variance` + '\n')
    f.close()


def record(identifier, value):
    """
    Records a numeric value corresponding to the identifier.
    """

    #Fetch state data
    last_timestamp, sigma, squares_sigma, counter = (
        _fetch_state(identifier))

    #Compute current timestamp
    current_timestamp = timestamp()

    if last_timestamp != current_timestamp:
        #Either a new time period has started, or
        #this is a new identifier, or
        #previous state data was lost for some reason.
        if counter != 0:
            #A new time period has started,
            #so compute parameters for previous one and store.
            counter = float(counter)
            mean = sigma/counter
            variance = squares_sigma/counter - mean**2

            _permanent_store(identifier, mean, variance)

        #Intialize state based on newly received data
        last_timestamp = current_timestamp
        sigma = value
        squares_sigma = value**2
        counter = 1

    else:
        #Same time period as before, update state
        sigma += value
        squares_sigma += value**2
        counter += 1

    #Store state
    _store_state(identifier, last_timestamp, sigma, squares_sigma,
        counter)

Some things to note:

1. The script is resistant to restarts (keeping in mind that any data that comes in during the down-time is lost). It’s also resistant to faults on Redis’ behalf, though any state data stored would then be lost. In both cases, the averaging information may be inaccurate, but not unavailable.

2. You can group sources together if you want (like averaging weather data from all sources in a region). You would just have to implement functionality to map each source ID to the relevant group ID.

3. The script computes averages over days, but using little creativity with the timestamp function, you can compute statistics over any custom time periods. In fact, you can modify the source IDs to tell the system what time interval to average over.

4. Though the input in the above code is single numbers, you can work with multidimensional data(using NumPy arrays/Pandas) and even other types of information extraction(that can work with streams without requiring historical data).

5. The script stores the data into a text file; you can obviously make it store the data to any other destination.

6. The code is pretty basic. To actually use it as a part of a bigger framework, I would suggest taking measures such as bundling the code into a class, implementing synchronization locks over the state data, etc. All that is upto you and your application ofcourse.

Thanks for reading!

Advertisements

2 thoughts on “Efficient computation and storage of basic data statistics using Redis

  1. First off I want to say fantastic blog! I had a quick question that I’d
    like to ask if you don’t mind. I was interested
    to know how you center yourself and clear your mind before writing.
    I’ve had difficulty clearing my thoughts in getting my thoughts out.
    I truly do enjoy writing but it just seems like the first 10 to 15 minutes tend to be lost
    simply just trying to figure out how to begin. Any ideas or tips?
    Cheers!

    1. I identify with this! Thats why I usually give separate time to the ‘brainstorming’/research part. So during the week, whenever I will have free time, I will dedicate a small amount of it to thinking about what to write, how to put forth my thoughts, doing any research if needed, etc. So by the time I am ready to write the blog post (thats mostly during the weekend), I have already gathered my thoughts in a proper order. So try doing the thinking separately from the writing, and see how it goes. That way, even the pressure is low when you are contemplating what to write.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s