From fbd4e3bf99afbc80012d6dc029be19e1bd8e6890 Mon Sep 17 00:00:00 2001 From: Steve Kemp Date: Mon, 18 Jan 2016 14:13:24 +0200 Subject: Updated the queue-handling. We now use a zset to store our pending tests. This means that jobs are only in the queue once - no duplicates are allowed. This closes #12428. --- lib/custodian/queue.rb | 69 ++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 61 insertions(+), 8 deletions(-) (limited to 'lib') diff --git a/lib/custodian/queue.rb b/lib/custodian/queue.rb index d1f4a2c..7caad1a 100644 --- a/lib/custodian/queue.rb +++ b/lib/custodian/queue.rb @@ -81,12 +81,15 @@ module Custodian job = nil loop do + job = @redis.zrange('zset', '0', '0') - # Get a random job - job = @redis.spop('custodian_queue') + if !job.empty? + # We only have one entry in our array + job = job[0] + + # Remove from the queue + @redis.zrem('zset', job ); - # If that worked return it - if !job.nil? return job else sleep(timeout) @@ -96,13 +99,63 @@ module Custodian # - # Add a new job to the queue. + # Add a new job to the queue - this can stall for the case where the + # job is already pending. # def add(test) - # Add unless already present - @redis.sadd('custodian_queue', test) unless - ( @redis.sismember( 'custodian_queue', test ) ) + @redis.watch('zset') + + # + # Count the number of times we attempt to add the test + # + attempts = 0 + added = false + + + # + # This is run in a loop, as we have to wait until both + # + # (a) the score is missing + # (b) the zadd function succeeds + # + while( attempts < 40 ) do + + # + # Only update if no score is set + # + if !@redis.zscore("zset", test) + + # + # If MULTI returns nil, the transaction failed, so we need to try + # again. + # + break unless @redis.multi do |r| + @redis.zadd('zset', Time.now.to_f, test) + added = true + end.nil? + end + + # + # This could be tighter.. + # + sleep 0.1 + + # + # Bump the count of attempts. + # + attempts = attempts + 1 + end + + # + # Do we need to unwatch here? + # + @redis.unwatch + + # + # Return the success/fail + # + return added end -- cgit v1.2.1