summaryrefslogtreecommitdiff
path: root/lib/custodian/queue.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/custodian/queue.rb')
-rw-r--r--lib/custodian/queue.rb69
1 files changed, 61 insertions, 8 deletions
diff --git a/lib/custodian/queue.rb b/lib/custodian/queue.rb
index d1f4a2c..7caad1a 100644
--- a/lib/custodian/queue.rb
+++ b/lib/custodian/queue.rb
@@ -81,12 +81,15 @@ module Custodian
job = nil
loop do
+ job = @redis.zrange('zset', '0', '0')
- # Get a random job
- job = @redis.spop('custodian_queue')
+ if !job.empty?
+ # We only have one entry in our array
+ job = job[0]
+
+ # Remove from the queue
+ @redis.zrem('zset', job );
- # If that worked return it
- if !job.nil?
return job
else
sleep(timeout)
@@ -96,13 +99,63 @@ module Custodian
#
- # Add a new job to the queue.
+ # Add a new job to the queue - this can stall for the case where the
+ # job is already pending.
#
def add(test)
- # Add unless already present
- @redis.sadd('custodian_queue', test) unless
- ( @redis.sismember( 'custodian_queue', test ) )
+ @redis.watch('zset')
+
+ #
+ # Count the number of times we attempt to add the test
+ #
+ attempts = 0
+ added = false
+
+
+ #
+ # This is run in a loop, as we have to wait until both
+ #
+ # (a) the score is missing
+ # (b) the zadd function succeeds
+ #
+ while( attempts < 40 ) do
+
+ #
+ # Only update if no score is set
+ #
+ if !@redis.zscore("zset", test)
+
+ #
+ # If MULTI returns nil, the transaction failed, so we need to try
+ # again.
+ #
+ break unless @redis.multi do |r|
+ @redis.zadd('zset', Time.now.to_f, test)
+ added = true
+ end.nil?
+ end
+
+ #
+ # This could be tighter..
+ #
+ sleep 0.1
+
+ #
+ # Bump the count of attempts.
+ #
+ attempts = attempts + 1
+ end
+
+ #
+ # Do we need to unwatch here?
+ #
+ @redis.unwatch
+
+ #
+ # Return the success/fail
+ #
+ return added
end