From fbd4e3bf99afbc80012d6dc029be19e1bd8e6890 Mon Sep 17 00:00:00 2001 From: Steve Kemp Date: Mon, 18 Jan 2016 14:13:24 +0200 Subject: Updated the queue-handling. We now use a zset to store our pending tests. This means that jobs are only in the queue once - no duplicates are allowed. This closes #12428. --- bin/custodian-enqueue | 11 +++++++- lib/custodian/queue.rb | 69 ++++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 71 insertions(+), 9 deletions(-) diff --git a/bin/custodian-enqueue b/bin/custodian-enqueue index 7dd864b..f4f9f0e 100755 --- a/bin/custodian-enqueue +++ b/bin/custodian-enqueue @@ -85,16 +85,25 @@ if __FILE__ == $PROGRAM_NAME exit(1) end + # + # Did we fail to add any tests to the queue? + # + failed = false + mon.jobs.each do |test| if ENV['TEST'] # nop elsif ENV['DUMP'] puts test else - queue.add(test.to_s) + failed = true unless queue.add(test.to_s) end end + if ( failed ) + puts "We failed to add at least one job to the queue." + exit( 1 ) + end end diff --git a/lib/custodian/queue.rb b/lib/custodian/queue.rb index d1f4a2c..7caad1a 100644 --- a/lib/custodian/queue.rb +++ b/lib/custodian/queue.rb @@ -81,12 +81,15 @@ module Custodian job = nil loop do + job = @redis.zrange('zset', '0', '0') - # Get a random job - job = @redis.spop('custodian_queue') + if !job.empty? + # We only have one entry in our array + job = job[0] + + # Remove from the queue + @redis.zrem('zset', job ); - # If that worked return it - if !job.nil? return job else sleep(timeout) @@ -96,13 +99,63 @@ module Custodian # - # Add a new job to the queue. + # Add a new job to the queue - this can stall for the case where the + # job is already pending. # def add(test) - # Add unless already present - @redis.sadd('custodian_queue', test) unless - ( @redis.sismember( 'custodian_queue', test ) ) + @redis.watch('zset') + + # + # Count the number of times we attempt to add the test + # + attempts = 0 + added = false + + + # + # This is run in a loop, as we have to wait until both + # + # (a) the score is missing + # (b) the zadd function succeeds + # + while( attempts < 40 ) do + + # + # Only update if no score is set + # + if !@redis.zscore("zset", test) + + # + # If MULTI returns nil, the transaction failed, so we need to try + # again. + # + break unless @redis.multi do |r| + @redis.zadd('zset', Time.now.to_f, test) + added = true + end.nil? + end + + # + # This could be tighter.. + # + sleep 0.1 + + # + # Bump the count of attempts. + # + attempts = attempts + 1 + end + + # + # Do we need to unwatch here? + # + @redis.unwatch + + # + # Return the success/fail + # + return added end -- cgit v1.2.1