From b1d33b50c759731deb9e5cef7f207aa65f0a4a22 Mon Sep 17 00:00:00 2001 From: Steve Kemp Date: Tue, 13 Jan 2015 15:23:04 +0000 Subject: Added queue-abstraction layer. This is not-yet used, but it will-be shortly. The intention is that we can seamlessly swap out the queue implemention in the near future so that we'll be able to use Redis. --- lib/custodian/queue.rb | 158 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 lib/custodian/queue.rb (limited to 'lib/custodian') diff --git a/lib/custodian/queue.rb b/lib/custodian/queue.rb new file mode 100644 index 0000000..a6e062c --- /dev/null +++ b/lib/custodian/queue.rb @@ -0,0 +1,158 @@ +# +# We don't necessarily expect that both libraries will be present, +# so long as one is that'll allow things to work. +# +%w( redis beanstalk-client ).each do |library| + begin + require library + rescue LoadError + puts "Failed to load the library: #{library}" + end +end + + + +module Custodian + + + # + # An abstraction layer for our queue. + # + class QueueType + + # + # Class-Factory + # + def self.create type + case type + when "redis" + RedisQueueType.new + when "beanstalk" + BeanstalkQueueType.new + else + raise "Bad queue-type: #{type}" + end + end + + + # + # Retrieve a job from the queue. + # + def fetch(timeout) + raise "Subclasses must implement this method!" + end + + + # + # Add a new job to the queue. + # + def add(job_string) + raise "Subclasses must implement this method!" + end + + + # + # Get the size of the queue + # + def size? + raise "Subclasses must implement this method!" + end + + end + + + + + # + # This is a simple FIFO queue which uses Redis for storage. + # + class RedisQueueType < QueueType + + + # + # Connect to the server on localhost + # + def initialize + host = ENV["REDIS"] || "127.0.0.1" + @redis = Redis.new( :host => host ) + end + + + # + # Fetch a job from the queue - the timeout parameter is ignored. + # + def fetch(timeout) + @redis.lpop( "queue" ) + end + + # + # Add a new job to the queue. + # + def add(job_string) + @redis.rpush( "queue", job_string ) + end + + + # + # How many jobs in the queue? + # + def size? + @redis.llen( "queue" ) + end + end + + + + # + # Use the beanstalkd-queue for its intended purpose + # + class BeanstalkQueueType < QueueType + + # + # Connect to the server on localhost + # + def initialize + host = ENV["QUEUE"] || "127.0.0.1:11300" + @queue = Beanstalk::Pool.new([host] ) + end + + # + # Here we fetch a value from the queue, and delete it at the same time. + # + # The timeout is used to specify the period we wait for a new job. + # + def fetch(timeout) + begin + j = @queue.reserve(timeout) + if ( j ) then + b = j.body + j.delete + return b + else + raise "ERRROR" + end + rescue Beanstalk::TimedOut => ex + return nil + end + end + + + # + # Add a new job to the queue. + # + def add(job_string) + @queue.put(job_string) + end + + + # + # Get the size of the queue + # + def size? + stats = @queue.stats() + ( stats['current-jobs-ready'] || 0 ) + end + + end + +end -- cgit v1.2.1