diff options
| author | Steve Kemp <steve@steve.org.uk> | 2016-01-18 14:13:24 +0200 | 
|---|---|---|
| committer | Steve Kemp <steve@steve.org.uk> | 2016-01-18 14:13:24 +0200 | 
| commit | fbd4e3bf99afbc80012d6dc029be19e1bd8e6890 (patch) | |
| tree | 30104c04fb5eb11dadb2b32028ca48971704f226 /lib/custodian | |
| parent | cee58bfa0e6fbcd3d147a7f7a694187d36ef9368 (diff) | |
Updated the queue-handling.
We now use a zset to store our pending tests.  This means that
jobs are only in the queue once - no duplicates are allowed.
This closes #12428.
Diffstat (limited to 'lib/custodian')
| -rw-r--r-- | lib/custodian/queue.rb | 69 | 
1 files changed, 61 insertions, 8 deletions
| diff --git a/lib/custodian/queue.rb b/lib/custodian/queue.rb index d1f4a2c..7caad1a 100644 --- a/lib/custodian/queue.rb +++ b/lib/custodian/queue.rb @@ -81,12 +81,15 @@ module Custodian        job = nil        loop do +        job = @redis.zrange('zset', '0', '0') -        # Get a random job -        job = @redis.spop('custodian_queue') +        if !job.empty? +          # We only have one entry in our array +          job = job[0] + +          # Remove from the queue +          @redis.zrem('zset', job ); -        # If that worked return it -        if !job.nil?            return job          else            sleep(timeout) @@ -96,13 +99,63 @@ module Custodian      # -    #  Add a new job to the queue. +    #  Add a new job to the queue - this can stall for the case where the +    # job is already pending.      #      def add(test) -      # Add unless already present -      @redis.sadd('custodian_queue', test) unless -        ( @redis.sismember( 'custodian_queue', test ) ) +        @redis.watch('zset') + +        # +        # Count the number of times we attempt to add the test +        # +        attempts = 0 +        added    = false + + +        # +        # This is run in a loop, as we have to wait until both +        # +        # (a) the score is missing +        # (b) the zadd function succeeds +        # +        while( attempts < 40 ) do + +          # +          # Only update if no score is set +          # +          if !@redis.zscore("zset", test) + +            # +            # If MULTI returns nil, the transaction failed, so we need to try +            # again. +            # +            break unless @redis.multi do |r| +              @redis.zadd('zset', Time.now.to_f, test) +              added = true +            end.nil? +          end + +          # +          # This could be tighter.. +          # +          sleep 0.1 + +          # +          # Bump the count of attempts. +          # +          attempts = attempts + 1 +        end + +        # +        # Do we need to unwatch here? +        # +        @redis.unwatch + +        # +        # Return the success/fail +        # +        return added      end | 
