summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xbin/custodian-enqueue11
-rw-r--r--lib/custodian/queue.rb69
2 files changed, 71 insertions, 9 deletions
diff --git a/bin/custodian-enqueue b/bin/custodian-enqueue
index 7dd864b..f4f9f0e 100755
--- a/bin/custodian-enqueue
+++ b/bin/custodian-enqueue
@@ -85,16 +85,25 @@ if __FILE__ == $PROGRAM_NAME
exit(1)
end
+ #
+ # Did we fail to add any tests to the queue?
+ #
+ failed = false
+
mon.jobs.each do |test|
if ENV['TEST']
# nop
elsif ENV['DUMP']
puts test
else
- queue.add(test.to_s)
+ failed = true unless queue.add(test.to_s)
end
end
+ if ( failed )
+ puts "We failed to add at least one job to the queue."
+ exit( 1 )
+ end
end
diff --git a/lib/custodian/queue.rb b/lib/custodian/queue.rb
index d1f4a2c..7caad1a 100644
--- a/lib/custodian/queue.rb
+++ b/lib/custodian/queue.rb
@@ -81,12 +81,15 @@ module Custodian
job = nil
loop do
+ job = @redis.zrange('zset', '0', '0')
- # Get a random job
- job = @redis.spop('custodian_queue')
+ if !job.empty?
+ # We only have one entry in our array
+ job = job[0]
+
+ # Remove from the queue
+ @redis.zrem('zset', job );
- # If that worked return it
- if !job.nil?
return job
else
sleep(timeout)
@@ -96,13 +99,63 @@ module Custodian
#
- # Add a new job to the queue.
+ # Add a new job to the queue - this can stall for the case where the
+ # job is already pending.
#
def add(test)
- # Add unless already present
- @redis.sadd('custodian_queue', test) unless
- ( @redis.sismember( 'custodian_queue', test ) )
+ @redis.watch('zset')
+
+ #
+ # Count the number of times we attempt to add the test
+ #
+ attempts = 0
+ added = false
+
+
+ #
+ # This is run in a loop, as we have to wait until both
+ #
+ # (a) the score is missing
+ # (b) the zadd function succeeds
+ #
+ while( attempts < 40 ) do
+
+ #
+ # Only update if no score is set
+ #
+ if !@redis.zscore("zset", test)
+
+ #
+ # If MULTI returns nil, the transaction failed, so we need to try
+ # again.
+ #
+ break unless @redis.multi do |r|
+ @redis.zadd('zset', Time.now.to_f, test)
+ added = true
+ end.nil?
+ end
+
+ #
+ # This could be tighter..
+ #
+ sleep 0.1
+
+ #
+ # Bump the count of attempts.
+ #
+ attempts = attempts + 1
+ end
+
+ #
+ # Do we need to unwatch here?
+ #
+ @redis.unwatch
+
+ #
+ # Return the success/fail
+ #
+ return added
end