# Redis + Celery: Reactive Computing in Django for IoT applications

To quote Wikipedia, Reactive Programming is a programming paradigm oriented around data flows and the propagation of change.

Consider the simple equation $a = b + c$. This relationship defines a flow of data, in that it describes how the data for $a$ is computed given the data for $b$ and $c$. Suppose the values of $b$ and $c$ at time t=0 are 4 and 5 respectively. Therefore, $a_{t=0} = 9$. Now, lets say that the value of $b$ changes at t=1 so that $b_{t=1} = 10$. By the definition of $a$ mentioned before, it follows that $a_{t=1}=15$. Thus, the change in the value of $b$ got propagated, causing the value of its ‘parent’ $a$ to change. I use the word parent because if we were to draw the relationship between $a$, $b$ and $c$ as a tree, $a$ be the parent of $b$ and $c$ (like in a data flow graph). A common example of reactive programming is seen in spreadsheet software like MS-Excel, where the change in the value of a particular cell causes a change in the values of cells that depend on it by some pre-defined equation.

RxJS and Meteor are two javascript-based frameworks that implement reactive programming for web-based applications.

Reactivity, in general, is a paradigm that’s difficult to find in Python-based systems, and that includes Django. This post describes a method to perform mathematical computations (not reactive programming in general, just mathematical calculations) in a reactive manner in Django-based systems. I will soon describe the exact problem solved by my code, with suggestions on how to modify it to suit your needs.

Prerequisites

Heres the prerequisites, apart from Django:

i. We will use Celery to perform tasks asynchronously.

ii. We will use Redis as for temporary data storage, and synchronization.

This is a good place to look if you want to set up both.

iii. We will use python-redis-lock to use Redis for locking mechanisms. Locking ensures that one data update/read doesn’t get messed up due to another update that’s running in parallel.

The ‘Problem Statement’

Consider the two equations:

$A = C + D + E$ …(1)

$B = A * F$ …(2)

We will refer to $A$, $B$, $C$, $D$, $E$ and $F$ as computational nodes. $C$, $D$, $E$ and $F$ are input values that we acquire from certain sources of data (whatever they might be). They are essentially the independent variables in this context. $A$‘s value is derived $C$, $D$ and $E$ (making it their ‘parent’ node). You can also say that $C$, $D$ and $E$ are the arguments for $A$. $B$ in turn has two dependencies, $A$ and another independent variable $F$.

You don’t know when you will receive the values of the independent variables. You also don’t know what order they will arrive in. Some of them may come together, some very frequently, etc. etc. Lets say that your application dictates that the value of any node remains valid for about a minute its received or computed. Whenever you have had the latest values of $C$, $D$ and $E$ come within the past 60 seconds, you would want to compute the value of $A$. The same logic applies to $B$.

Once the latest value of $A$ is computed, you would like the node to ‘forget’ the values of its arguments(children). Therefore, whenever comes the next time that we have fresh values of $C$, $D$ and $E$ together, $A$ will be calculated again. The same logic once again applies to $B$. The code can be easily modified (and I will mention how) so that this restriction is lifted. In that case, $A$ would be recomputed whenever the value of any of $C$, $D$ and $E$ changes, provided that the other values are still valid.

Whenever you receive/compute the value of a computational node (whether independent or derived), you would want to store its value – along with the appropriate timestamp – in your database.

Now lets see how this data-flow logic can be implemented in a flexible, efficient manner.

The Implementation

The whole functionality will be enclosed in a Django App called ‘reactive_compute’. It will have the following files:

i. models.py

iii. db.py

iv. functions.py

v. functions.py

and obviously

vi. __init__.py

First of all, we will need to define the derived nodes $A$ and $B$, in terms of their Function and Number of Arguments. For example, $A$‘s function can be described as an Addition, with the number of arguments being 3. $B$ on the other hand is a Multiplication, with 2 arguments. To remember this, we will first need to define the database model in models.py. You don’t store this information in Redis, since its ‘permanent’.

The model:

from django.db import models

class FunctionType(models.Model):
"""
Stores information about the type of function/computation
associated with a node.
The function name should have a corresponding mapping in
reactivecompute.functions.Functions.mapping
"""

node = models.CharField('Node', max_length=25)
functionname = models.CharField('Function Name', max_length=25)
noofargs = models.IntegerField('Number of Arguments')

As you can see, $functionname$ is a String that will act as an identifier for the Function. We will have “$ADD$” for Addition, and “$MUL$” for Multiplication. But there must be someplace to implement those functions. Thats where the functions.py files comes in.

##File with all computational functions

"""
"""
return sum(args)

def sub(args):
"""
Subtraction Function.
"""
return (args[0] - args[1])

def mul(args):
"""
Multiplication function.
"""
prod = 1
for arg in args:
prod *= arg
return prod

class Functions(object):
"""
Class to store mapping between function name and function
implementation.
"""

mapping = {
'SUB': sub,
'MUL': mul
}

Adding the mapping as a dictionary to the $Functions$ class allows easy importing. Each of the functions takes in the arguments(in appropriate order) as a list.

The following screenshot shows the appropriate database entries made for $A$ and $B$ in the Django Admin.

Now that we have defined the ‘nodes’ we need to direct the appropriate args to them. This is done by creating another model in models.py, for storing the data-flow information as tree ‘edges’. Every child is an argument for the parent.

class Dependency(models.Model):
"""
Models a Parent-Child relationship between computational
nodes. Also defines the order in which the children are to be
arranged, to get the Parent's value.
"""

parent = models.CharField('Parent Node', max_length=25)
child = models.CharField('Child Node', max_length=25)
argno = models.IntegerField('Argument Number')

$argno$ doesn’t really matter for the operations of multiplication and addition, but for others(like subtraction), it might.

Heres a Django-Admin screenshot with the required entries for $A$ and $B$.

As I mentioned earlier, we would like to store the values of the nodes, whenever they are updated. So heres a model for that:

class DataLog(models.Model):
"""
Stores information about the type of function/computation
associated with nodes.
"""

node = models.CharField('Node', max_length=25)
timestamp = models.DateTimeField('Time Stamp')
value = models.FloatField('Value', null=True)

And heres the contents of the db.py file, that define a function to store the latest value of a node into the database:

from reactive_compute.models import DataLog
from django.utils import timezone

def save_node_value(node, value):
"""
Saves the latest computed value of the given node.
"""
data_entry = DataLog(node=str(node),
timestamp=timezone.now(),
value=value)
data_entry.save()

To present a complete example, the following are the contents of the admin.py file.

from reactive_compute.models import *

list_display = ('parent', 'child', 'argno')
list_filter = ['parent', 'child']
search_fields = ['parent', 'child']

list_display = ('node', 'functionname', 'noofargs')
list_filter = ['functionname']
search_fields = ['node', 'functionname']

list_display = ('node', 'timestamp', 'value')
list_filter = ['node', 'timestamp']
search_fields = ['node']

And finally, we come to the crux of the implementation – the tasks.py file. If you read the resource I mentioned for Celery installation, you know that the Celery task is defined in this file. I have heavily commented it, so do go through the in-line docs.

from __future__ import absolute_import
from reactive_compute.models import Dependency, FunctionType
from reactive_compute.db import save_node_value
from reactive_compute.functions import Functions
from redis import StrictRedis
import redis_lock

def compute_nodes(nodes, timeout=60):
"""
Computes values for all computational nodes as defined in the
FunctionType model. Recursive calls are made based on information
present in respective Dependency entries.

'nodes' should be a dict mapping name of computational node, to:
A floating point number as input, OR
None, if the value is to be (possibly) computed.
'timeout' defines the time interval for which the values of the
nodes mentioned in 'nodes' will be valid. Default is a minute.
"""

#First handle the boundary case
if len(nodes) == 0:
return None

#Get a connection to Redis
conn = StrictRedis()

#This will contain all the parent nodes that we will try to compute
#recursively based on the args currently provided.
parents = set([])

#Default initialization for Celery
value = None

#Iterate over all nodes
for node in nodes:
##First obtain the value of the node
if nodes[node] is not None:
#Value has been provided as input
try:
#Ensure the given value can be parsed/converted to
#a float.
value = float(nodes[node])
except:
#If representing the value as a float fails,
#skip this one.
continue
else:
#Value has to be computed.

#First acquire lock for the particular node.
#This ensures that the inputs needed for computing
#the current node don't get modified midway(unless
#one expires, in which case the computation may or may
#not go through).
lock = redis_lock.RedisLock(conn, node + '_lock')
if lock.acquire():
try:
#This will indicate if all args are present for
#computation of the result
all_args_flag = True
#This will store all the required arguments in order
args = []
#Get the pertinent FunctionType instance
func_info = FunctionType.objects.get(node=node)
#Iterate over all arguments
for i in range(func_info.noofargs):
#Get Redis value
v = conn.get(node + '_' + `i`)
if v is None or v == 'None':
#If value not present stop iterations
all_args_flag = False
break
else:
args.append(float(v))
#If any arg was absent, abort processing of this node
if not all_args_flag:
continue
#Compute the value, since all args are present
value = Functions.mapping[func_info.functionname](
args)
for i in range(func_info.noofargs):
conn.delete(node + '_' + `i`)
except:
pass
finally:
#Release lock
lock.release()

##Now that the value has been obtained, update the args info
##for all parent nodes
parent_objs = Dependency.objects.filter(child=node)
for parent_obj in parent_objs:
#Get lock for parent
lock = redis_lock.RedisLock(conn, parent_obj.parent + '_lock')
if lock.acquire():
try:
#Set value
conn.set(parent_obj.parent + '_' + `parent_obj.argno`,
value)
#Set expiry time
conn.expire(parent_obj.parent + '_' + `parent_obj.argno`,
timeout)
except:
pass
finally:
#Release lock
lock.release()
#Add this parent to the set of parents to process

#Save value in database as needed
save_node_value(node, value)

#Make the recursive call on parent nodes
compute_nodes.delay(dict((parent, None) for parent in parents),
timeout)

Given below is an example of using the above task in a simple Django view. It accepts the complete input as a stringified JSON object (like “$\{"C": 4, "D": 5\}$“), and makes a call to the Celery task.

from django.http import HttpResponse
from django.views.decorators.csrf import csrf_exempt

@csrf_exempt
def input_values(request):
#Get JSON-ed values dictionary
value_dict = eval(request.GET.get('input'))
#Make Celery call
compute_nodes.delay(value_dict)

return HttpResponse("OK")

Lets run the server and see how things work.
Heres that the data-logs look like given the input “$\{"C": 4, "D": 5\}$“.

Within a minute, we provide “$\{"E": 6\}$“.

Notice how $A$ gets computed as well. Now let us provide all inputs, with new values: “$\{"C": 1, "D": 1, "E": 1, "F": 3\}$“.

As you must have observed, $B$ is computed with the latest value of $A$, which is 3.

One unusual flexibility that this codes provide, is that we can update the values of nodes that are ‘usually’ derived, manually. Heres what happens if we provide “$\{"A": 4, "F": 5\}$“.

You don’t need to Copy-Paste this code anywhere, its all present in my github repo. Feel free to fork it and send a PR if you can modify it in some useful way!

Thoughts

1. The way its implemented, you have complete control over how each computational node works. I have used simple mathematical operations, but you could have your own metrics/functionality written in. As long as you define the arguments and order them right, things will work well.

2. I have worked with floating-point numbers, but your arguments could as well be vectors! Then you might have to run eval over your Redis values, instead of a simple float() conversion. But since every call is made as a Celery task, your runtime will still be optimized.

3. Earlier, I mentioned that you could remove the restriction of deleting previous values of arguments (as long as they are valid). You could do this pretty easily by removing lines 85-87 in the tasks.py code. If you don’t want your node values to have a sense of time-based validity at all, you could just remove lines 106 and 107 from tasks.py.

4. If you want different values to be stored in different ways into the database, you could add another parameter to the $FunctionType$ model, that specified a String that denotes how information about that particular node is added to the database. Ideally, this database storing would also be a Celery task.

5. You could also store the Data-Flow diagram better using a No-SQL database such as MongoDB.

Thats all for now :-). Cheers!