diff options
| -rwxr-xr-x | bin/custodian-enqueue | 11 | ||||
| -rw-r--r-- | lib/custodian/queue.rb | 69 | 
2 files changed, 71 insertions, 9 deletions
| diff --git a/bin/custodian-enqueue b/bin/custodian-enqueue index 7dd864b..f4f9f0e 100755 --- a/bin/custodian-enqueue +++ b/bin/custodian-enqueue @@ -85,16 +85,25 @@ if __FILE__ == $PROGRAM_NAME      exit(1)    end +  # +  #  Did we fail to add any tests to the queue? +  # +  failed = false +    mon.jobs.each do |test|      if  ENV['TEST']        # nop      elsif  ENV['DUMP']        puts test      else -      queue.add(test.to_s) +      failed = true unless queue.add(test.to_s)      end    end +  if ( failed ) +    puts "We failed to add at least one job to the queue." +    exit( 1 ) +  end  end diff --git a/lib/custodian/queue.rb b/lib/custodian/queue.rb index d1f4a2c..7caad1a 100644 --- a/lib/custodian/queue.rb +++ b/lib/custodian/queue.rb @@ -81,12 +81,15 @@ module Custodian        job = nil        loop do +        job = @redis.zrange('zset', '0', '0') -        # Get a random job -        job = @redis.spop('custodian_queue') +        if !job.empty? +          # We only have one entry in our array +          job = job[0] + +          # Remove from the queue +          @redis.zrem('zset', job ); -        # If that worked return it -        if !job.nil?            return job          else            sleep(timeout) @@ -96,13 +99,63 @@ module Custodian      # -    #  Add a new job to the queue. +    #  Add a new job to the queue - this can stall for the case where the +    # job is already pending.      #      def add(test) -      # Add unless already present -      @redis.sadd('custodian_queue', test) unless -        ( @redis.sismember( 'custodian_queue', test ) ) +        @redis.watch('zset') + +        # +        # Count the number of times we attempt to add the test +        # +        attempts = 0 +        added    = false + + +        # +        # This is run in a loop, as we have to wait until both +        # +        # (a) the score is missing +        # (b) the zadd function succeeds +        # +        while( attempts < 40 ) do + +          # +          # Only update if no score is set +          # +          if !@redis.zscore("zset", test) + +            # +            # If MULTI returns nil, the transaction failed, so we need to try +            # again. +            # +            break unless @redis.multi do |r| +              @redis.zadd('zset', Time.now.to_f, test) +              added = true +            end.nil? +          end + +          # +          # This could be tighter.. +          # +          sleep 0.1 + +          # +          # Bump the count of attempts. +          # +          attempts = attempts + 1 +        end + +        # +        # Do we need to unwatch here? +        # +        @redis.unwatch + +        # +        # Return the success/fail +        # +        return added      end | 
