#
# We don't necessarily expect that both libraries will be present,
# so long as one is that'll allow things to work.
#
%w( redis beanstalk-client ).each do |library|
begin
require library
rescue LoadError
ENV["DEBUG"] && puts( "Failed to load the library: #{library}" )
end
end
module Custodian
#
# An abstraction layer for our queue.
#
class QueueType
#
# Class-Factory
#
def self.create type
case type
when "redis"
RedisQueueType.new
when "beanstalk"
BeanstalkQueueType.new
else
raise "Bad queue-type: #{type}"
end
end
#
# Retrieve a job from the queue.
#
def fetch(timeout)
raise "Subclasses must implement this method!"
end
#
# Add a new job to the queue.
#
def add(job_string)
raise "Subclasses must implement this method!"
end
#
# Get the size of the queue
#
def size?
raise "Subclasses must implement this method!"
end
end
#
# This is a simple FIFO queue which uses Redis for storage.
#
class RedisQueueType < QueueType
#
# Connect to the server on localhost
#
def initialize
host = ENV["REDIS"] || "127.0.0.1"
@redis = Redis.new( :host => host )
end
#
# Fetch a job from the queue - the timeout parameter is ignored.
#
def fetch(timeout)
job = false
while( ! job )
job = @redis.lpop( "queue" )
end
return( job )
end
#
# Add a new job to the queue.
#
def add(job_string)
@redis.rpush( "queue", job_string )
end
#
# How many jobs in the queue?
#
def size?
@redis.llen( "queue" )
end
end
#
# Use the beanstalkd-queue for its intended purpose
#
class BeanstalkQueueType < QueueType
#
# Connect to the server on localhost
#
def initialize
host = ENV["QUEUE"] || "127.0.0.1:11300"
@queue = Beanstalk::Pool.new([host] )
end
#
# Here we fetch a value from the queue, and delete it at the same time.
#
# The timeout is used to specify the period we wait for a new job.
#
def fetch(timeout)
begin
j = @queue.reserve(timeout)
if ( j ) then
b = j.body
j.delete
return b
else
raise "ERRROR"
end
rescue Beanstalk::TimedOut => ex
return nil
end
end
#
# Add a new job to the queue.
#
def add(job_string)
@queue.put(job_string)
end
#
# Get the size of the queue
#
def size?
stats = @queue.stats()
( stats['current-jobs-ready'] || 0 )
end
end
end