diff options
Diffstat (limited to 'lib/custodian/queue.rb')
-rw-r--r-- | lib/custodian/queue.rb | 88 |
1 files changed, 5 insertions, 83 deletions
diff --git a/lib/custodian/queue.rb b/lib/custodian/queue.rb index d919e07..8c8ea98 100644 --- a/lib/custodian/queue.rb +++ b/lib/custodian/queue.rb @@ -1,12 +1,13 @@ # -# We don't necessarily expect that both libraries will be present, -# so long as one is that'll allow things to work. +# Attempt to load the Redis-library. # -%w( redis beanstalk-client ).each do |library| +# Without this we cannot connect to our queue. +# +%w( redis ).each do |library| begin require library rescue LoadError - ENV['DEBUG'] && puts("Failed to load the library: #{library}") + puts("Failed to load the #{library} library - queue access will fail!") end end @@ -21,21 +22,6 @@ module Custodian class QueueType # - # Class-Factory - # - def self.create(type) - case type - when 'redis' - RedisQueueType.new - when 'beanstalk' - BeanstalkQueueType.new - else - raise "Bad queue-type: #{type}" - end - end - - - # # Retrieve a job from the queue. # def fetch(_timeout) @@ -165,68 +151,4 @@ module Custodian end - - - # - # Use the beanstalkd-queue for its intended purpose - # - class BeanstalkQueueType < QueueType - - # - # Connect to the server on localhost, unless QUEUE_ADDRESS is set. - # - def initialize - host = ENV['QUEUE_ADDRESS'] || '127.0.0.1' - @queue = Beanstalk::Pool.new(["#{host}:11300"]) - end - - - # - # Here we fetch a value from the queue, and delete it at the same time. - # - # The timeout is used to specify the period we wait for a new job. - # - def fetch(timeout) - begin - j = @queue.reserve(timeout) - if j then - b = j.body - j.delete - return b - else - raise 'ERRROR' - end - rescue Beanstalk::TimedOut => _ex - return nil - end - end - - - # - # Add a new job to the queue. - # - def add(job_string) - @queue.put(job_string) - end - - - # - # Get the size of the queue - # - def size? - stats = @queue.stats - (stats['current-jobs-ready'] || 0) - end - - - # - # Flush the queue, discarding all pending jobs. - # - def flush! - while fetch(1) - # nop - end - end - end - end |