summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteve Kemp <steve@steve.org.uk>2015-01-13 15:23:04 +0000
committerSteve Kemp <steve@steve.org.uk>2015-01-13 15:23:04 +0000
commit9eb289dc0908651f458657d1b10d6a57170e2855 (patch)
tree7efda072778f1664a2a6fa583824b3870420013b
parentb7654e353baa5ba8f49fd1d2668717e01437b036 (diff)
Added queue-abstraction layer.
This is not-yet used, but it will-be shortly. The intention is that we can seamlessly swap out the queue implemention in the near future so that we'll be able to use Redis.
-rw-r--r--lib/custodian/queue.rb158
1 files changed, 158 insertions, 0 deletions
diff --git a/lib/custodian/queue.rb b/lib/custodian/queue.rb
new file mode 100644
index 0000000..a6e062c
--- /dev/null
+++ b/lib/custodian/queue.rb
@@ -0,0 +1,158 @@
+#
+# We don't necessarily expect that both libraries will be present,
+# so long as one is that'll allow things to work.
+#
+%w( redis beanstalk-client ).each do |library|
+ begin
+ require library
+ rescue LoadError
+ puts "Failed to load the library: #{library}"
+ end
+end
+
+
+
+module Custodian
+
+
+ #
+ # An abstraction layer for our queue.
+ #
+ class QueueType
+
+ #
+ # Class-Factory
+ #
+ def self.create type
+ case type
+ when "redis"
+ RedisQueueType.new
+ when "beanstalk"
+ BeanstalkQueueType.new
+ else
+ raise "Bad queue-type: #{type}"
+ end
+ end
+
+
+ #
+ # Retrieve a job from the queue.
+ #
+ def fetch(timeout)
+ raise "Subclasses must implement this method!"
+ end
+
+
+ #
+ # Add a new job to the queue.
+ #
+ def add(job_string)
+ raise "Subclasses must implement this method!"
+ end
+
+
+ #
+ # Get the size of the queue
+ #
+ def size?
+ raise "Subclasses must implement this method!"
+ end
+
+ end
+
+
+
+
+ #
+ # This is a simple FIFO queue which uses Redis for storage.
+ #
+ class RedisQueueType < QueueType
+
+
+ #
+ # Connect to the server on localhost
+ #
+ def initialize
+ host = ENV["REDIS"] || "127.0.0.1"
+ @redis = Redis.new( :host => host )
+ end
+
+
+ #
+ # Fetch a job from the queue - the timeout parameter is ignored.
+ #
+ def fetch(timeout)
+ @redis.lpop( "queue" )
+ end
+
+ #
+ # Add a new job to the queue.
+ #
+ def add(job_string)
+ @redis.rpush( "queue", job_string )
+ end
+
+
+ #
+ # How many jobs in the queue?
+ #
+ def size?
+ @redis.llen( "queue" )
+ end
+ end
+
+
+
+ #
+ # Use the beanstalkd-queue for its intended purpose
+ #
+ class BeanstalkQueueType < QueueType
+
+ #
+ # Connect to the server on localhost
+ #
+ def initialize
+ host = ENV["QUEUE"] || "127.0.0.1:11300"
+ @queue = Beanstalk::Pool.new([host] )
+ end
+
+ #
+ # Here we fetch a value from the queue, and delete it at the same time.
+ #
+ # The timeout is used to specify the period we wait for a new job.
+ #
+ def fetch(timeout)
+ begin
+ j = @queue.reserve(timeout)
+ if ( j ) then
+ b = j.body
+ j.delete
+ return b
+ else
+ raise "ERRROR"
+ end
+ rescue Beanstalk::TimedOut => ex
+ return nil
+ end
+ end
+
+
+ #
+ # Add a new job to the queue.
+ #
+ def add(job_string)
+ @queue.put(job_string)
+ end
+
+
+ #
+ # Get the size of the queue
+ #
+ def size?
+ stats = @queue.stats()
+ ( stats['current-jobs-ready'] || 0 )
+ end
+
+ end
+
+end