redis task queue

background

redis is a in-memory database, like SQlite, but the other part of redis, is that it can use to write message queue for task scheduler.

job/task scheduler is a popular automatic tool used in cloud/HPC, when need to deal with lots of taskes/jobs.

for ADS simulation, a job scheduler can be used to manage the scenarios feed into the simulator. where the existing scenarios are stored as items in redis, and the redis server looks like a shared volume, which can be access by any worker services through redis_host. where simulator can enpacakged in a redis_worker.

redis

redis commands

redis is database, so there are plenty of commands used to do db operation. e.g.

1
2
3
4
5
6
GET, SET #string
HGET, HSET, #hash table
LPUSH, LPOP, BLPOP #list
SADD, SPOP, #set
PUBLISH, SUBSCRIBE, UNSUBSCRIBE, PUBSUB #pub&sub
...

redis in python

the following is from redis-py api reference

  • get/set string
1
2
3
r = redis.Redis(host='localhost', port=6379, db=0)
r.set('foo', 'bar')
r.get('foo')
  • connection pools

redis-py uses a connection pool to manage connections to a Redis server.

1
2
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
r = redis.Redis(connection_pool=pool)
  • connections

connectionPools manage a set of Connection instances. the default connection is a normal TCP socket based; also it can use UnixDomainSocketConnection, or customized protocol.

connection maintain an open socket to Redis server. when these sockets are disconnected, redis-py will raise a ConnectionError to the caller, also redis-py can issue regular health check to assess the liveliness of a connection.

  • parsers

parser provides the way to control how response from Redis server are parsed, redis-py has: PythonParser and the default HiredisParser.

  • response callbacks

the redis-py has its own client callbacks in RESPONSE_CALLBACKS. custom callbacks can add on per-instance using set_response_callback(command_name, callfn)

  • pub/sub

PubSub object can subscribes/unsubscribes to channels or patterns and listens for new messages.

1
2
3
4
5
6
r = redis.Redis(...)
p = r.pubsub()
p.subscribe('my-first-channel', ...)
p.psubscribe('my-first-patttern', ...)
p.unsubscribe('my-first-channel')
p.punsubscribe('my-first-pattern')

every message read from a PubSub instance will be a dict with following keys:

- type, e.g. subscribe, unsubscribe, psubscribe, message e.t.c
- channel
- pattern
- data

redis-py allows to register callback funs to handle published messages. these message handlers take a single argument the message. when a message is read with a message handler, the message is created and passed to the message handler:

1
2
3
4
5
6
def a_handler(message):
print message['data']
p.subscribe(**('my-channel': a_handler})
r.publish('my-channel', 'awesome handler')
p.get_message()
  • get_message()

get_message() use system’s select module to quickly poll the connection’s socket. if there’s data available to be read, get_message() will read it; if there’s no data to be read, get_message() will immediately return None:

1
2
3
4
5
while True:
message = p.get_message()
if message:
#do something with the message
time.sleep(0.01)
  • listen()

listen() is a generator(e.g. yield keyword), which blocks until a message is avaiable. if the app is ok to be blocked until next message avaiable, listen() is an easy way:

1
2
for message in p.listen():
#do something with the message
  • run_in_thread()

run an event loop in a separate thread. run_in_thread() returns a thread object, and it is simply a wrapper around get_message(), that runs in a separate thread, essentially creating a tiny non-blocking event loop. since it’s running in a separate thread, there is no way to handle message that aren’t automatically handled with registered message handlers.

1
2
3
4
p.subscribe(**{'my-channel': a_handler})
thread = p.run_in_thread(sleep_time = 0.01)
#when need shut down
thread.stop()

redis task queue in k8s

fine parallel processing using work queue is an good example of how to use redis as task queue.

first, fill the existing task lists into redis database(server) as a shared volume, then start mutli worker services to the shared volume to get the job to run.