summaryrefslogtreecommitdiff
path: root/lib/custodian/queue.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/custodian/queue.rb')
-rw-r--r--lib/custodian/queue.rb88
1 files changed, 5 insertions, 83 deletions
diff --git a/lib/custodian/queue.rb b/lib/custodian/queue.rb
index d919e07..8c8ea98 100644
--- a/lib/custodian/queue.rb
+++ b/lib/custodian/queue.rb
@@ -1,12 +1,13 @@
#
-# We don't necessarily expect that both libraries will be present,
-# so long as one is that'll allow things to work.
+# Attempt to load the Redis-library.
#
-%w( redis beanstalk-client ).each do |library|
+# Without this we cannot connect to our queue.
+#
+%w( redis ).each do |library|
begin
require library
rescue LoadError
- ENV['DEBUG'] && puts("Failed to load the library: #{library}")
+ puts("Failed to load the #{library} library - queue access will fail!")
end
end
@@ -21,21 +22,6 @@ module Custodian
class QueueType
#
- # Class-Factory
- #
- def self.create(type)
- case type
- when 'redis'
- RedisQueueType.new
- when 'beanstalk'
- BeanstalkQueueType.new
- else
- raise "Bad queue-type: #{type}"
- end
- end
-
-
- #
# Retrieve a job from the queue.
#
def fetch(_timeout)
@@ -165,68 +151,4 @@ module Custodian
end
-
-
- #
- # Use the beanstalkd-queue for its intended purpose
- #
- class BeanstalkQueueType < QueueType
-
- #
- # Connect to the server on localhost, unless QUEUE_ADDRESS is set.
- #
- def initialize
- host = ENV['QUEUE_ADDRESS'] || '127.0.0.1'
- @queue = Beanstalk::Pool.new(["#{host}:11300"])
- end
-
-
- #
- # Here we fetch a value from the queue, and delete it at the same time.
- #
- # The timeout is used to specify the period we wait for a new job.
- #
- def fetch(timeout)
- begin
- j = @queue.reserve(timeout)
- if j then
- b = j.body
- j.delete
- return b
- else
- raise 'ERRROR'
- end
- rescue Beanstalk::TimedOut => _ex
- return nil
- end
- end
-
-
- #
- # Add a new job to the queue.
- #
- def add(job_string)
- @queue.put(job_string)
- end
-
-
- #
- # Get the size of the queue
- #
- def size?
- stats = @queue.stats
- (stats['current-jobs-ready'] || 0)
- end
-
-
- #
- # Flush the queue, discarding all pending jobs.
- #
- def flush!
- while fetch(1)
- # nop
- end
- end
- end
-
end