A unique Python redis-based queue with delay
This is a simple Redis-based queue. Two features that I needed were uniqueness (i.e. if an item exists in the queue already, it won't be added again) and a delay, like beanstalkd, where an item must wait a specified time before it can be popped from the queue. There are a number of other Redis-based queues that have many more features but I didn't see one that had these two features together. This 50-line class works for my needs. It may or may not work for you. Feel free to copy this and build on it.
Note: I wrote this in May 2010. I ended up using this solution after trying out beanstalkd and Gearman.
Install¶
Install on Ubuntu 10.10 Maverick
- Install the redis server
$ sudo apt-get install redis-server
- Install the python redis client
$ pip install redis
- Default conf file: /etc/redis/redis.conf
Default log file: /var/log/redis/redis-server.log
Default db dir: /var/lib/redis
Stop redis server: sudo /etc/init.d/redis-server stop
Start redis server: sudo /etc/init.d/redis-server start
Redis commands used¶
The queue is based on the redis sorted set data type and uses the following commands:
- ZADD - Add members to a sorted set, or update its score if it already exists
- ZRANGEBYSCORE - Return a range of members in a sorted set, by score
- ZREM - Remove one or more members from a sorted set
Code¶
import time
import redis
REDIS_ADDRESS = '127.0.0.1'
class UniqueMessageQueueWithDelay(object):
"""A message queue based on the Redis sorted set data type. Duplicate items
in the queue are not allowed. When a duplicate item is added to the queue,
the new item is added, and the old duplicate item is removed. A delay may be
specified when adding items to the queue. Items will only be popped after
the delay has passed. Pop() is non-blocking, so polling must be used. The
name of the queue == the Redis key for the sorted set.
"""
def __init__(self, name):
self.name = name
self.redis = redis.Redis(REDIS_ADDRESS)
def add(self, data, delay=0):
"""Add an item to the queue. delay is in seconds.
"""
score = time.time() + delay
self.redis.zadd(self.name, data, score)
debug('Added %.1f, %s' % (score, data))
def pop(self):
"""Pop one item from the front of the queue. Items are popped only if
the delay specified in the add() has passed. Return False if no items
are available.
"""
min_score = 0
max_score = time.time()
result = self.redis.zrangebyscore(
self.name, min_score, max_score, start=0, num=1, withscores=False)
if result == None:
return False
if len(result) == 1:
debug('Popped %s' % result[0])
return result[0]
else:
return False
def remove(self, data):
return self.redis.zrem(self.name, data)
def debug(msg):
print msg
def test_queue():
u = UniqueMessageQueueWithDelay('myqueue')
# add items to the queue
for i in [0, 1, 2, 3, 4, 0, 1]:
data = 'Item %d' % i
delay = 5
u.add(data, delay)
time.sleep(0.1)
# get items from the queue
while True:
print
result = u.pop()
print result
if result != False:
u.remove(result)
time.sleep(1)
if __name__ == '__main__':
test_queue()
Results:
Added 1320773851.8, Item 0 Added 1320773851.9, Item 1 Added 1320773852.0, Item 2 Added 1320773852.1, Item 3 Added 1320773852.2, Item 4 Added 1320773852.3, Item 0 Added 1320773852.4, Item 1 False False False False False Popped Item 2 Item 2 Popped Item 3 Item 3 Popped Item 4 Item 4 Popped Item 0 Item 0 Popped Item 1 Item 1 False False False ^CTraceback (most recent call last): File "umqwdredisqueue.py", line 102, intest_queue() File "umqwdredisqueue.py", line 98, in test_queue time.sleep(1) KeyboardInterrupt
Some links related to Redis queues¶
- http://github.com/defunkt/resque
- http://github.com/samuel/dreque
- http://github.com/gleicon/restmq
- http://github.com/tnm/qr
- http://github.com/blog/542-introducing-resque
- http://nosql.mypopescu.com/post/366619997/usecase-restmq-a-redis-based-message-queue
- http://nosql.mypopescu.com/post/426360602/redis-queues-an-emerging-usecase
See also¶
Comments
Was trying to do the same thing, with only difference was i dont care about uniqueness (my objects are unique from before), and i ended up writing pretty much the exact same approach as you, but i find one problem with this.
This is not a real pop. If 2 workers are working on the same que, it is possible both get the same object when doing zrangebyscore at the same time before any one of them is able to zrem it. This is a very important consideration since i am in process of re-architecting the application to be able to run concurrently across cores (and eventually machines) with redis (or something) maintaining state.
Since i use redis for other things, id much rather use it for the que as well. Any ideas of the pop can be done in an atomic manner?
Will look into beanstalkd next.
Why do you need a delay, where an item must wait a specified time before it can be popped from the queue? What use case did you have?
Great post
disqus:3206745204
Hi,
Is it possible to add identifier to a message
I will be having identifier as unique value instead of data
disqus:3691308638