diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/custodian/queue.rb | 69 | 
1 files changed, 61 insertions, 8 deletions
| diff --git a/lib/custodian/queue.rb b/lib/custodian/queue.rb index d1f4a2c..7caad1a 100644 --- a/lib/custodian/queue.rb +++ b/lib/custodian/queue.rb @@ -81,12 +81,15 @@ module Custodian        job = nil        loop do +        job = @redis.zrange('zset', '0', '0') -        # Get a random job -        job = @redis.spop('custodian_queue') +        if !job.empty? +          # We only have one entry in our array +          job = job[0] + +          # Remove from the queue +          @redis.zrem('zset', job ); -        # If that worked return it -        if !job.nil?            return job          else            sleep(timeout) @@ -96,13 +99,63 @@ module Custodian      # -    #  Add a new job to the queue. +    #  Add a new job to the queue - this can stall for the case where the +    # job is already pending.      #      def add(test) -      # Add unless already present -      @redis.sadd('custodian_queue', test) unless -        ( @redis.sismember( 'custodian_queue', test ) ) +        @redis.watch('zset') + +        # +        # Count the number of times we attempt to add the test +        # +        attempts = 0 +        added    = false + + +        # +        # This is run in a loop, as we have to wait until both +        # +        # (a) the score is missing +        # (b) the zadd function succeeds +        # +        while( attempts < 40 ) do + +          # +          # Only update if no score is set +          # +          if !@redis.zscore("zset", test) + +            # +            # If MULTI returns nil, the transaction failed, so we need to try +            # again. +            # +            break unless @redis.multi do |r| +              @redis.zadd('zset', Time.now.to_f, test) +              added = true +            end.nil? +          end + +          # +          # This could be tighter.. +          # +          sleep 0.1 + +          # +          # Bump the count of attempts. +          # +          attempts = attempts + 1 +        end + +        # +        # Do we need to unwatch here? +        # +        @redis.unwatch + +        # +        # Return the success/fail +        # +        return added      end | 
