Redis + Celery: Reactive Computing in Django for IoT applications

To quote Wikipedia, Reactive Programming is a programming paradigm oriented around data flows and the propagation of change.

Consider the simple equation a = b + c. This relationship defines a flow of data, in that it describes how the data for a is computed given the data for b and c. Suppose the values of b and c at time t=0 are 4 and 5 respectively. Therefore, a_{t=0} = 9. Now, lets say that the value of b changes at t=1 so that b_{t=1} = 10. By the definition of a mentioned before, it follows that a_{t=1}=15. Thus, the change in the value of b got propagated, causing the value of its ‘parent’ a to change. I use the word parent because if we were to draw the relationship between a, b and c as a tree, a be the parent of b and c (like in a data flow graph). A common example of reactive programming is seen in spreadsheet software like MS-Excel, where the change in the value of a particular cell causes a change in the values of cells that depend on it by some pre-defined equation.

RxJS and Meteor are two javascript-based frameworks that implement reactive programming for web-based applications.

Reactivity, in general, is a paradigm that’s difficult to find in Python-based systems, and that includes Django. This post describes a method to perform mathematical computations (not reactive programming in general, just mathematical calculations) in a reactive manner in Django-based systems. I will soon describe the exact problem solved by my code, with suggestions on how to modify it to suit your needs.

Prerequisites

Heres the prerequisites, apart from Django:

i. We will use Celery to perform tasks asynchronously.

ii. We will use Redis as for temporary data storage, and synchronization.

This is a good place to look if you want to set up both.

iii. We will use python-redis-lock to use Redis for locking mechanisms. Locking ensures that one data update/read doesn’t get messed up due to another update that’s running in parallel.

The ‘Problem Statement’

Consider the two equations:

A = C + D + E …(1)

B = A * F …(2)

We will refer to A, B, C, D, E and F as computational nodes. C, D, E and F are input values that we acquire from certain sources of data (whatever they might be). They are essentially the independent variables in this context. A‘s value is derived C, D and E (making it their ‘parent’ node). You can also say that C, D and E are the arguments for A. B in turn has two dependencies, A and another independent variable F.

You don’t know when you will receive the values of the independent variables. You also don’t know what order they will arrive in. Some of them may come together, some very frequently, etc. etc. Lets say that your application dictates that the value of any node remains valid for about a minute its received or computed. Whenever you have had the latest values of C, D and E come within the past 60 seconds, you would want to compute the value of A. The same logic applies to B.

Once the latest value of A is computed, you would like the node to ‘forget’ the values of its arguments(children). Therefore, whenever comes the next time that we have fresh values of C, D and E together, A will be calculated again. The same logic once again applies to B. The code can be easily modified (and I will mention how) so that this restriction is lifted. In that case, A would be recomputed whenever the value of any of C, D and E changes, provided that the other values are still valid.

Whenever you receive/compute the value of a computational node (whether independent or derived), you would want to store its value – along with the appropriate timestamp – in your database.

Now lets see how this data-flow logic can be implemented in a flexible, efficient manner.

The Implementation

The whole functionality will be enclosed in a Django App called ‘reactive_compute’. It will have the following files:

i. models.py

ii. admin.py

iii. db.py

iv. functions.py

v. functions.py

and obviously

vi. __init__.py

First of all, we will need to define the derived nodes A and B, in terms of their Function and Number of Arguments. For example, A‘s function can be described as an Addition, with the number of arguments being 3. B on the other hand is a Multiplication, with 2 arguments. To remember this, we will first need to define the database model in models.py. You don’t store this information in Redis, since its ‘permanent’.

The model:

from django.db import models


class FunctionType(models.Model):
    """
    Stores information about the type of function/computation
    associated with a node.
    The function name should have a corresponding mapping in
    reactivecompute.functions.Functions.mapping
    """

    node = models.CharField('Node', max_length=25)
    functionname = models.CharField('Function Name', max_length=25)
    noofargs = models.IntegerField('Number of Arguments')

As you can see, functionname is a String that will act as an identifier for the Function. We will have “ADD” for Addition, and “MUL” for Multiplication. But there must be someplace to implement those functions. Thats where the functions.py files comes in.


##File with all computational functions


def add(args):
    """
    Addition Function.
    """
    return sum(args)


def sub(args):
    """
    Subtraction Function.
    """
    return (args[0] - args[1])


def mul(args):
    """
    Multiplication function.
    """
    prod = 1
    for arg in args:
        prod *= arg
    return prod


class Functions(object):
    """
    Class to store mapping between function name and function
    implementation.
    """

    mapping = {
        'ADD': add,
        'SUB': sub,
        'MUL': mul
        }

Adding the mapping as a dictionary to the Functions class allows easy importing. Each of the functions takes in the arguments(in appropriate order) as a list.

The following screenshot shows the appropriate database entries made for A and B in the Django Admin.

Screenshot from 2016-01-04 22:51:04

Now that we have defined the ‘nodes’ we need to direct the appropriate args to them. This is done by creating another model in models.py, for storing the data-flow information as tree ‘edges’. Every child is an argument for the parent.


class Dependency(models.Model):
    """
    Models a Parent-Child relationship between computational
    nodes. Also defines the order in which the children are to be
    arranged, to get the Parent's value.
    """

    parent = models.CharField('Parent Node', max_length=25)
    child = models.CharField('Child Node', max_length=25)
    argno = models.IntegerField('Argument Number')

 

argno doesn’t really matter for the operations of multiplication and addition, but for others(like subtraction), it might.

Heres a Django-Admin screenshot with the required entries for A and B.

Screenshot from 2016-01-04 22:52:05

As I mentioned earlier, we would like to store the values of the nodes, whenever they are updated. So heres a model for that:


class DataLog(models.Model):
    """
    Stores information about the type of function/computation
    associated with nodes.
    """

    node = models.CharField('Node', max_length=25)
    timestamp = models.DateTimeField('Time Stamp')
    value = models.FloatField('Value', null=True)

And heres the contents of the db.py file, that define a function to store the latest value of a node into the database:


from reactive_compute.models import DataLog
from django.utils import timezone


def save_node_value(node, value):
    """
    Saves the latest computed value of the given node.
    """
    data_entry = DataLog(node=str(node),
                         timestamp=timezone.now(),
                         value=value)
    data_entry.save()

To present a complete example, the following are the contents of the admin.py file.

from django.contrib import admin
from reactive_compute.models import *


class DependencyAdmin(admin.ModelAdmin):
    list_display = ('parent', 'child', 'argno')
    list_filter = ['parent', 'child']
    search_fields = ['parent', 'child']

admin.site.register(Dependency, DependencyAdmin)


class FunctionTypeAdmin(admin.ModelAdmin):
    list_display = ('node', 'functionname', 'noofargs')
    list_filter = ['functionname']
    search_fields = ['node', 'functionname']

admin.site.register(FunctionType, FunctionTypeAdmin)


class DataLogAdmin(admin.ModelAdmin):
    list_display = ('node', 'timestamp', 'value')
    list_filter = ['node', 'timestamp']
    search_fields = ['node']

admin.site.register(DataLog, DataLogAdmin)

And finally, we come to the crux of the implementation – the tasks.py file. If you read the resource I mentioned for Celery installation, you know that the Celery task is defined in this file. I have heavily commented it, so do go through the in-line docs.


from __future__ import absolute_import
from celery import shared_task
from reactive_compute.models import Dependency, FunctionType
from reactive_compute.db import save_node_value
from reactive_compute.functions import Functions
from redis import StrictRedis
import redis_lock


@shared_task
def compute_nodes(nodes, timeout=60):
    """
    Computes values for all computational nodes as defined in the
    FunctionType model. Recursive calls are made based on information
    present in respective Dependency entries.

    'nodes' should be a dict mapping name of computational node, to:
    A floating point number as input, OR
    None, if the value is to be (possibly) computed.
    'timeout' defines the time interval for which the values of the
    nodes mentioned in 'nodes' will be valid. Default is a minute.
    """

    #First handle the boundary case
    if len(nodes) == 0:
        return None

    #Get a connection to Redis
    conn = StrictRedis()

    #This will contain all the parent nodes that we will try to compute
    #recursively based on the args currently provided.
    parents = set([])

    #Default initialization for Celery
    value = None

    #Iterate over all nodes
    for node in nodes:
        ##First obtain the value of the node
        if nodes[node] is not None:
            #Value has been provided as input
            try:
                #Ensure the given value can be parsed/converted to
                #a float.
                value = float(nodes[node])
            except:
                #If representing the value as a float fails,
                #skip this one.
                continue
        else:
            #Value has to be computed.

            #First acquire lock for the particular node.
            #This ensures that the inputs needed for computing
            #the current node don't get modified midway(unless
            #one expires, in which case the computation may or may
            #not go through).
            lock = redis_lock.RedisLock(conn, node + '_lock')
            if lock.acquire():
                try:
                    #This will indicate if all args are present for
                    #computation of the result
                    all_args_flag = True
                    #This will store all the required arguments in order
                    args = []
                    #Get the pertinent FunctionType instance
                    func_info = FunctionType.objects.get(node=node)
                    #Iterate over all arguments
                    for i in range(func_info.noofargs):
                        #Get Redis value
                        v = conn.get(node + '_' + `i`)
                        if v is None or v == 'None':
                            #If value not present stop iterations
                            all_args_flag = False
                            break
                        else:
                            args.append(float(v))
                    #If any arg was absent, abort processing of this node
                    if not all_args_flag:
                        continue
                    #Compute the value, since all args are present
                    value = Functions.mapping[func_info.functionname](
                        args)
                    #Delete info about current args
                    for i in range(func_info.noofargs):
                        conn.delete(node + '_' + `i`)
                except:
                    pass
                finally:
                    #Release lock
                    lock.release()

        ##Now that the value has been obtained, update the args info
        ##for all parent nodes
        parent_objs = Dependency.objects.filter(child=node)
        for parent_obj in parent_objs:
            #Get lock for parent
            lock = redis_lock.RedisLock(conn, parent_obj.parent + '_lock')
            if lock.acquire():
                try:
                    #Set value
                    conn.set(parent_obj.parent + '_' + `parent_obj.argno`,
                             value)
                    #Set expiry time
                    conn.expire(parent_obj.parent + '_' + `parent_obj.argno`,
                                timeout)
                except:
                    pass
                finally:
                    #Release lock
                    lock.release()
            #Add this parent to the set of parents to process
            parents.add(parent_obj.parent)

        #Save value in database as needed
        save_node_value(node, value)

    #Make the recursive call on parent nodes
    compute_nodes.delay(dict((parent, None) for parent in parents),
                        timeout)

Given below is an example of using the above task in a simple Django view. It accepts the complete input as a stringified JSON object (like “\{"C": 4, "D": 5\}“), and makes a call to the Celery task.

from django.http import HttpResponse
from django.views.decorators.csrf import csrf_exempt
from reactive_compute.tasks import compute_nodes


@csrf_exempt
def input_values(request):
    #Get JSON-ed values dictionary
    value_dict = eval(request.GET.get('input'))
    #Make Celery call
    compute_nodes.delay(value_dict)

    return HttpResponse("OK")

Lets run the server and see how things work.
Heres that the data-logs look like given the input “\{"C": 4, "D": 5\}“.

Screenshot from 2016-01-04 22:55:12

Within a minute, we provide “\{"E": 6\}“.

Screenshot from 2016-01-04 22:55:29

Notice how A gets computed as well. Now let us provide all inputs, with new values: “\{"C": 1, "D": 1, "E": 1, "F": 3\}“.

Screenshot from 2016-01-04 22:56:08

As you must have observed, B is computed with the latest value of A, which is 3.

One unusual flexibility that this codes provide, is that we can update the values of nodes that are ‘usually’ derived, manually. Heres what happens if we provide “\{"A": 4, "F": 5\}“.

Screenshot from 2016-01-04 22:56:33

You don’t need to Copy-Paste this code anywhere, its all present in my github repo. Feel free to fork it and send a PR if you can modify it in some useful way!

Thoughts

1. The way its implemented, you have complete control over how each computational node works. I have used simple mathematical operations, but you could have your own metrics/functionality written in. As long as you define the arguments and order them right, things will work well.

2. I have worked with floating-point numbers, but your arguments could as well be vectors! Then you might have to run eval over your Redis values, instead of a simple float() conversion. But since every call is made as a Celery task, your runtime will still be optimized.

3. Earlier, I mentioned that you could remove the restriction of deleting previous values of arguments (as long as they are valid). You could do this pretty easily by removing lines 85-87 in the tasks.py code. If you don’t want your node values to have a sense of time-based validity at all, you could just remove lines 106 and 107 from tasks.py.

4. If you want different values to be stored in different ways into the database, you could add another parameter to the FunctionType model, that specified a String that denotes how information about that particular node is added to the database. Ideally, this database storing would also be a Celery task.

5. You could also store the Data-Flow diagram better using a No-SQL database such as MongoDB.

Thats all for now :-). Cheers!

 

Advertisement

Simple production-time debugging in Django – and better error handling

We all know how amazingly exhaustive Django debugging can be, when you enable the Debug = True option in your settings file. It gives you a full traceback, complete with the str-ed versions of local variables. However, this post deals with the production-time scenario, when you might encounter an un-anticipated error in your code. Such errors can arise out of various reasons, such as-

1. A kind of input you did not anticipate.

2. A logical error of some kind, usually occuring out of boundary cases (like a sqrt function coming across a negative value)

3. (Worst kind) Some type of low-probability scenario you forgot to test.

In such cases, Django serves the 500 status code along with a “Server Error” message. All this is good, and if your application doesnt have any state variables that may get screwed, or low-probability errors aren’t that important to you, then you can simply ignore the rest of this post. However, if you would want atleast a minimalistic note being made on the server when something goes wrong, then you can set up a simple debug-logging system as a part of your Django project.

Heres how you go about it-

Step 1. Create a folder called debug in your Django project’s main directory (the one that contains the manage.py file).

Step 2. Make an empty __init__.py file inside the new folder, which will tell Python thats it contains relevant code.

Step 3. Make a models.py file in the folder, and add the following lines to it:


from django.db import models


class ExceptionLog(models.Model):
    """
    Models any error occuring on the server.
    """
    timestamp = models.DateTimeField('Time Stamp')
    view = models.CharField('View', max_length=30)
    exceptionclass = models.CharField('Exception Class', max_length=60)
    message = models.CharField('Exception Message', max_length=100)

This sets up the basic database model for logging of Python Exceptions. Modify it as you wish, especially the names, max lengths and stuff.

Step 4. Make a file named admin.py in the folder, and add the following code to it:


from django.contrib import admin
from debug.models import ExceptionLog


class ExceptionLogAdmin(admin.ModelAdmin):
    list_display = ('timestamp', 'view', 'exceptionclass',
                    'message')
    list_filter = ('view', 'timestamp')
    search_fields = ['message', 'exceptionclass', 'view']

admin.site.register(ExceptionLog, ExceptionLogAdmin)

The attributes are again to your taste – If you know the basics behind configuring the Django admin page, you already know what you want and how to get there. If you don’t, reading up on the Django tutorial will help.

Step 5. This is where we add the code that makes things work. Add a file called decorators.py in the directory, and add the given code to it:


from debug.models import ExceptionLog
from django.utils import timezone
from django.http import HttpResponse


def log_exceptions(view_name):
    """
    Logs all the exceptions occuring in a Django view, to the
    ExceptionLog model.
    'view_name' denotes an identifier for the view that is
    being debug-logged.
    """

    def real_decorator(actual_view):
        """
        This is the actual decorator.
        """

        def wrapped_view(request):
            """
            This is the version of the view that will monitor
            itself for any un-expected Exception, and maintain basic
            logs of the same.
            """
            try:
                #Run the view code
                response = actual_view(request)
                #If a response is generated without any Exception
                #coming up, return it
                return response
            except Exception as e:
                #If an unexpected Exception occurs, make a debug entry
                #and save it
                debug_entry = ExceptionLog(
                    timestamp=timezone.now(),
                    view=view_name,
                    exceptionclass=str(e.__class__),
                    message=str(e))
                debug_entry.save()
                #Return the Server Error(500) status code
                return HttpResponse(status=500)

        return wrapped_view

    return real_decorator

This code uses the Python ‘magic’ called decorators with arguments. Frankly, I had never implemented Python decorators (used yes, during SymPy work), let alone with arguments, before this. But trust me- unless you have a very special case, this is the way to go for what we want to achieve here.

The decorator basically takes in as argument the name of the view, so that its known during logging. It encapsulates the working of the actual Django view thats being decorated in a try...except block, and logs the time-stamp, view name and the most important Exception details, in the event that one is raised.

Step 6. Add 'debug' to the 'INSTALLED_APPS' tuple in your settings.py Django file. This officially recognizes your new debug-logging app as a part of your complete server code.

Then, before re-starting the server, dont forget to do syncdb or migrate (depending on which Django version you use) on your project. This will essentially register the new models on the server, and make the details appear on your Django admin page.

Step 7
. To set up any view in your project code for the debug-logging, just modify it as follows-

...other imports...
from debug.decorators import log_exceptions

@log_exceptions('Some View')
def some_view(request):
    ...view code...

If you also use the @csrf_exempt decorator on your view(s), make sure the log_exceptions decorator lies below it.

Voila! You are done! Now, whenever your decorated views raise an Exception, the relevant information will be logged on your Django admin page.

Ofcourse, this does not replace what real debugging does, nor is it as exhaustive (no traceback provided, for starters). It just stores the very basic info about what went wrong, when and where(in terms of which view). However, it will not give you complete details like file name, line no. etc. In a production environment, debugging to that extent is stupid and (should be) unnecessary. That brings me to the second important point-

Whenever an Exception is raised in your Python code, there are two things you can do about it-

1. ‘Duck it’ – Not handle it, letting it propagate down the function stack, OR

2. ‘Handle it’ – Using something like a try...except block, either repair the damage caused by the error (say returning a default value), or raise another Exception that better describes what went wrong (most often, in ‘high level’ terms).

I implore you to do the latter in your code, atleast as much as possible. Why? It just makes your life way easier. In simple programming terms, the lesser the depth of your function stack when an Exception is raised, the easier it is for you to understand what went wrong. But ofcourse, if you already knew things were going to go wrong, you would not need this debugging tool :-P.

But even then, try not to let Exceptions come out from places like the NumPy core, where it becomes frustratingly difficult to know how things screwed up. It is always a good practice to raise good-quality Exceptions yourself, with custom error-messages that say what went wrong. As long as you can understand the problem from looking at the Exception class and message, you are good to go!

EDIT 1: Ofcourse you could use better frameworks meant for this like Sentry and Opbeat. I wrote this post just to show a way to do it quick-and-dirty, in your code itself. If needed, not re-inventing the wheel is ofcourse always better :-).