diff options
author | Steve Kemp <steve@steve.org.uk> | 2016-01-18 14:13:24 +0200 |
---|---|---|
committer | Steve Kemp <steve@steve.org.uk> | 2016-01-18 14:13:24 +0200 |
commit | fbd4e3bf99afbc80012d6dc029be19e1bd8e6890 (patch) | |
tree | 30104c04fb5eb11dadb2b32028ca48971704f226 /lib | |
parent | cee58bfa0e6fbcd3d147a7f7a694187d36ef9368 (diff) |
Updated the queue-handling.
We now use a zset to store our pending tests. This means that
jobs are only in the queue once - no duplicates are allowed.
This closes #12428.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/custodian/queue.rb | 69 |
1 files changed, 61 insertions, 8 deletions
diff --git a/lib/custodian/queue.rb b/lib/custodian/queue.rb index d1f4a2c..7caad1a 100644 --- a/lib/custodian/queue.rb +++ b/lib/custodian/queue.rb @@ -81,12 +81,15 @@ module Custodian job = nil loop do + job = @redis.zrange('zset', '0', '0') - # Get a random job - job = @redis.spop('custodian_queue') + if !job.empty? + # We only have one entry in our array + job = job[0] + + # Remove from the queue + @redis.zrem('zset', job ); - # If that worked return it - if !job.nil? return job else sleep(timeout) @@ -96,13 +99,63 @@ module Custodian # - # Add a new job to the queue. + # Add a new job to the queue - this can stall for the case where the + # job is already pending. # def add(test) - # Add unless already present - @redis.sadd('custodian_queue', test) unless - ( @redis.sismember( 'custodian_queue', test ) ) + @redis.watch('zset') + + # + # Count the number of times we attempt to add the test + # + attempts = 0 + added = false + + + # + # This is run in a loop, as we have to wait until both + # + # (a) the score is missing + # (b) the zadd function succeeds + # + while( attempts < 40 ) do + + # + # Only update if no score is set + # + if !@redis.zscore("zset", test) + + # + # If MULTI returns nil, the transaction failed, so we need to try + # again. + # + break unless @redis.multi do |r| + @redis.zadd('zset', Time.now.to_f, test) + added = true + end.nil? + end + + # + # This could be tighter.. + # + sleep 0.1 + + # + # Bump the count of attempts. + # + attempts = attempts + 1 + end + + # + # Do we need to unwatch here? + # + @redis.unwatch + + # + # Return the success/fail + # + return added end |