From 9a05949ea4d669cb192d3e1d04984e27b82b0a69 Mon Sep 17 00:00:00 2001 From: Steve Kemp Date: Mon, 19 Jan 2015 12:27:15 +0000 Subject: Updated to use our queue-abstraction. This means we can queue/dequeue to either Redis or Beanstalkd. --- bin/custodian-dequeue | 7 ++----- bin/custodian-enqueue | 34 ++++++++-------------------------- bin/custodian-queue | 25 ++++++++++--------------- lib/custodian/queue.rb | 6 +++++- lib/custodian/settings.rb | 12 +----------- lib/custodian/worker.rb | 35 ++++++++++------------------------- 6 files changed, 36 insertions(+), 83 deletions(-) diff --git a/bin/custodian-dequeue b/bin/custodian-dequeue index 3072fbd..7a99186 100755 --- a/bin/custodian-dequeue +++ b/bin/custodian-dequeue @@ -53,6 +53,7 @@ require 'getoptlong' # Our code. # require 'custodian/settings' +require 'custodian/queue' require 'custodian/worker' @@ -71,7 +72,6 @@ if __FILE__ == $0 then # settings = Custodian::Settings.instance() $SERVER = settings.queue_server - $QUEUE = settings.queue_name $ALERTER = settings.alerter $LOGFILE = settings.log_file @@ -82,7 +82,6 @@ if __FILE__ == $0 then [ "--manual", "-m", GetoptLong::NO_ARGUMENT ], [ "--fail", "-f", GetoptLong::NO_ARGUMENT ], [ "--logfile", "-l", GetoptLong::REQUIRED_ARGUMENT ], - [ "--queue", "-q", GetoptLong::REQUIRED_ARGUMENT ], [ "--server", "-S", GetoptLong::REQUIRED_ARGUMENT ], [ "--alerter", "-a", GetoptLong::REQUIRED_ARGUMENT ], [ "--single", "-s", GetoptLong::NO_ARGUMENT ], @@ -96,8 +95,6 @@ if __FILE__ == $0 then $LOGFILE = arg when "--server" then $SERVER = arg - when "--queue" then - $QUEUE = arg when "--alerter" then $ALERTER = arg when "--single" then @@ -149,7 +146,7 @@ if __FILE__ == $0 then # # Create the object # - worker = Custodian::Worker.new( $SERVER, $QUEUE, $ALERTER, $LOGFILE, settings ) + worker = Custodian::Worker.new( $SERVER, $ALERTER, $LOGFILE, settings ) # diff --git a/bin/custodian-enqueue b/bin/custodian-enqueue index a6b9828..20654c1 100755 --- a/bin/custodian-enqueue +++ b/bin/custodian-enqueue @@ -1,7 +1,7 @@ #!/usr/bin/ruby -Ilib/ # # NAME -# custodian-enqueue - Parse tests from a file and enqueue to beanstalkd. +# custodian-enqueue - Parse tests from a file and enqueue them. # # SYNOPSIS # custodian-enqueue [ -h | --help ] @@ -19,8 +19,6 @@ # # -d, --dump Dump the parsed tests to the console; don't insert in the queue. # -# -q, --queue Use the named beanstalkd tube. -# # --test Test the parsing of the given file, alert on errors. # # -f, --file FILE Parse the given configuration file. @@ -29,17 +27,15 @@ # # ABOUT # -# This tool reads a single configuration file and parses it into a +# This tool reads a single configuration file and parses it into a # series of network & protocol tests. These tests are then stored in -# a beanstalkd queue. +# a queue from which workers can retrieve and execute them. # -# The intention is that the tests will be pulled from the queue and -# executed by the companion program custodian-dequeue. The dequeing -# process may occur up numerous other hosts +# The dequeing process may occur up numerous other hosts # # CONFIGURATION FILE # -# The configuration file is 99% compatible with that used in the tool +# The configuration file is 99% compatible with that used in the tool # custodian replaces. # # @@ -53,13 +49,13 @@ # # Standard modules # -require 'beanstalk-client' require 'getoptlong' # # Our code. # require 'custodian/parser' +require 'custodian/queue' require 'custodian/settings' @@ -77,7 +73,6 @@ if __FILE__ == $0 then # settings = Custodian::Settings.instance() $SERVER = settings.queue_server - $QUEUE = settings.queue_name begin opts = GetoptLong.new( @@ -86,7 +81,6 @@ if __FILE__ == $0 then [ "--server", GetoptLong::REQUIRED_ARGUMENT ], [ "--file", "-f", GetoptLong::REQUIRED_ARGUMENT ], [ "--help", "-h", GetoptLong::NO_ARGUMENT ], - [ "--queue", "-q", GetoptLong::REQUIRED_ARGUMENT ], [ "--manual","-m", GetoptLong::NO_ARGUMENT ] ) opts.each do |opt, arg| @@ -97,8 +91,6 @@ if __FILE__ == $0 then ENV["TEST"] = "1" when "--server" then $SERVER = arg - when "--queue" then - $QUEUE= arg when "--file" then ENV["FILE"] = arg when "--help" then @@ -146,7 +138,7 @@ if __FILE__ == $0 then # # Connected to the server # - queue = Beanstalk::Pool.new([$SERVER], $QUEUE ) + queue = Custodian::QueueType.create( "redis" ) if ( ! queue ) puts "Failed to connect to beanstalk server: #{$SERVER}" exit 1 @@ -178,17 +170,7 @@ if __FILE__ == $0 then elsif ( ENV['DUMP'] ) puts test else - - # priority of new job. - priority = 1000 - - # delay before putting into queue. - delay = 0 - - # time-to-remove: how long job.reserve() will live for. - ttr = 300 - - queue.put( test.to_s, priority, delay, ttr ) + queue.add( test.to_s ) end end diff --git a/bin/custodian-queue b/bin/custodian-queue index afcba2a..23f3841 100755 --- a/bin/custodian-queue +++ b/bin/custodian-queue @@ -1,4 +1,4 @@ -#!/usr/bin/ruby +#!/usr/bin/ruby -Ilib/ # # NAME # custodian-queue - Work with the queue. @@ -42,7 +42,6 @@ # # Standard modules # -require 'beanstalk-client' require 'getoptlong' @@ -50,7 +49,7 @@ require 'getoptlong' # Our code # require 'custodian/settings' - +require 'custodian/queue' # @@ -69,7 +68,6 @@ if __FILE__ == $0 then # settings = Custodian::Settings.instance() $SERVER = settings.queue_server - $QUEUE = settings.queue_name begin opts = GetoptLong.new( @@ -78,7 +76,6 @@ if __FILE__ == $0 then [ "--manual", "-m", GetoptLong::NO_ARGUMENT ], [ "--monitor", "-M", GetoptLong::OPTIONAL_ARGUMENT ], [ "--server", "-S", GetoptLong::REQUIRED_ARGUMENT ], - [ "--queue", "-q", GetoptLong::REQUIRED_ARGUMENT ], [ "--stats", "-s", GetoptLong::NO_ARGUMENT ] ) opts.each do |opt, arg| @@ -95,8 +92,6 @@ if __FILE__ == $0 then $FLUSH = true when "--server" then $SERVER = arg - when "--queue" then - $QUEUE = arg when "--help" then $help = true when "--manual" then @@ -140,7 +135,7 @@ if __FILE__ == $0 then # # Create the queue object. # - queue = Beanstalk::Pool.new([$SERVER], $QUEUE ) + queue = Custodian::QueueType.create( "redis" ) # # Alerting on a queue that is too-full? @@ -150,8 +145,7 @@ if __FILE__ == $0 then # # Find the number of jobs # - stats = queue.stats() - jobs = stats['current-jobs-ready'] || 0 + jobs = queue.size? if ( jobs > $MONITOR ) exit 1 @@ -165,8 +159,8 @@ if __FILE__ == $0 then # Showing stats? # if ( $STATS ) - stats = queue.stats() - puts "There are #{stats['current-jobs-ready'] || 0} jobs pending." + jobs = queue.size?() + puts "There are #{jobs || 0} jobs pending." exit( 0 ) end @@ -176,10 +170,11 @@ if __FILE__ == $0 then # if ( $FLUSH ) count = 0 - while( true ) + run = true + while( run ) begin - job = queue.reserve(1) - job.delete + job = queue.fetch(1) + exit(0) if ( job.nil? ) count += 1 rescue Beanstalk::TimedOut => ex puts "Flushed #{count} pending jobs." diff --git a/lib/custodian/queue.rb b/lib/custodian/queue.rb index a6e062c..d1f2057 100644 --- a/lib/custodian/queue.rb +++ b/lib/custodian/queue.rb @@ -82,7 +82,11 @@ module Custodian # Fetch a job from the queue - the timeout parameter is ignored. # def fetch(timeout) - @redis.lpop( "queue" ) + job = false + while( ! job ) + job = @redis.lpop( "queue" ) + end + return( job ) end # diff --git a/lib/custodian/settings.rb b/lib/custodian/settings.rb index 453a35a..edbd798 100644 --- a/lib/custodian/settings.rb +++ b/lib/custodian/settings.rb @@ -140,7 +140,7 @@ module Custodian # - # The beanstalkd server address + # The address of the queue. # def queue_server _load() unless( _loaded? ) @@ -149,16 +149,6 @@ module Custodian end - # - # The name of the beanstalkd tube we'll use - # - def queue_name - _load() unless( _loaded? ) - - @settings['queue_name'] || "Custodian" - end - - # # The filename for the logfile. diff --git a/lib/custodian/worker.rb b/lib/custodian/worker.rb index b9c9995..964d345 100644 --- a/lib/custodian/worker.rb +++ b/lib/custodian/worker.rb @@ -3,7 +3,6 @@ # # Standard modules # -require 'beanstalk-client' require 'logger' @@ -13,7 +12,7 @@ require 'logger' # require 'custodian/alerts' require 'custodian/settings' - +require 'custodian/queue' # @@ -29,7 +28,7 @@ require 'custodian/protocoltests' # -# This class contains the code for connecting to a Beanstalk queue, +# This class contains the code for connecting to the queue, # fetching tests from it, and executing them # module Custodian @@ -38,7 +37,7 @@ module Custodian # - # The beanstalk queue. + # The queue we're using for retrieving tests. # attr_reader :queue @@ -77,10 +76,10 @@ module Custodian # # Constructor: Connect to the queue # - def initialize( server, queue, alerter, logfile, settings ) + def initialize( server, alerter, logfile, settings ) # Connect to the queue - @queue = Beanstalk::Pool.new([server], queue ) + @queue = QueueType.create( "redis" ) # Get the alerter-type to instantiate @alerter = alerter @@ -141,20 +140,14 @@ module Custodian # # Acquire a job. # - job = @queue.reserve() - log_message( "Job aquired - Job ID : #{job.id}" ) + job = @queue.fetch(1) + log_message( "Job aquired: #{job}" ) # # Get the job body # - body = job.body - raise ArgumentError, "Job was empty" if (body.nil?) - raise ArgumentError, "Job was not a string" unless body.kind_of?(String) - - # - # Output the job. - # - log_message( "Job: #{body}" ) + raise ArgumentError, "Job was empty" if (job.nil?) + raise ArgumentError, "Job was not a string" unless job.kind_of?(String) # @@ -166,7 +159,7 @@ module Custodian # # Create the test-object. # - test = Custodian::TestFactory.create( body ) + test = Custodian::TestFactory.create( job ) start_time = Time.now @@ -240,14 +233,6 @@ module Custodian rescue => ex log_message( "Exception raised processing job: #{ex}" ) - ensure - # - # Delete the job - either we received an error, in which case - # we should remove it to avoid picking it up again, or we handled - # it successfully so it should be removed. - # - log_message( "Job ID : #{job.id} - Removed" ) - job.delete if ( job ) end return result -- cgit v1.2.1