diff options
| -rwxr-xr-x | bin/custodian-dequeue | 7 | ||||
| -rwxr-xr-x | bin/custodian-enqueue | 34 | ||||
| -rwxr-xr-x | bin/custodian-queue | 25 | ||||
| -rw-r--r-- | lib/custodian/queue.rb | 6 | ||||
| -rw-r--r-- | lib/custodian/settings.rb | 12 | ||||
| -rw-r--r-- | lib/custodian/worker.rb | 35 | 
6 files changed, 36 insertions, 83 deletions
| diff --git a/bin/custodian-dequeue b/bin/custodian-dequeue index 3072fbd..7a99186 100755 --- a/bin/custodian-dequeue +++ b/bin/custodian-dequeue @@ -53,6 +53,7 @@ require 'getoptlong'  # Our code.  #  require 'custodian/settings' +require 'custodian/queue'  require 'custodian/worker' @@ -71,7 +72,6 @@ if __FILE__ == $0 then    #    settings = Custodian::Settings.instance()    $SERVER  = settings.queue_server -  $QUEUE   = settings.queue_name    $ALERTER = settings.alerter    $LOGFILE = settings.log_file @@ -82,7 +82,6 @@ if __FILE__ == $0 then                            [ "--manual",  "-m", GetoptLong::NO_ARGUMENT ],                            [ "--fail",    "-f", GetoptLong::NO_ARGUMENT  ],                            [ "--logfile", "-l", GetoptLong::REQUIRED_ARGUMENT ], -                          [ "--queue",   "-q", GetoptLong::REQUIRED_ARGUMENT ],                            [ "--server",  "-S", GetoptLong::REQUIRED_ARGUMENT ],                            [ "--alerter", "-a", GetoptLong::REQUIRED_ARGUMENT ],                            [ "--single",  "-s", GetoptLong::NO_ARGUMENT ], @@ -96,8 +95,6 @@ if __FILE__ == $0 then            $LOGFILE = arg        when "--server" then            $SERVER = arg -      when "--queue" then -          $QUEUE = arg        when "--alerter" then            $ALERTER = arg        when "--single" then @@ -149,7 +146,7 @@ if __FILE__ == $0 then    #    #  Create the object    # -  worker = Custodian::Worker.new( $SERVER, $QUEUE, $ALERTER, $LOGFILE, settings ) +  worker = Custodian::Worker.new( $SERVER, $ALERTER, $LOGFILE, settings )    # diff --git a/bin/custodian-enqueue b/bin/custodian-enqueue index a6b9828..20654c1 100755 --- a/bin/custodian-enqueue +++ b/bin/custodian-enqueue @@ -1,7 +1,7 @@  #!/usr/bin/ruby -Ilib/  #  # NAME -#  custodian-enqueue - Parse tests from a file and enqueue to beanstalkd. +#  custodian-enqueue - Parse tests from a file and enqueue them.  #  # SYNOPSIS  #  custodian-enqueue  [ -h | --help ] @@ -19,8 +19,6 @@  #  #  -d, --dump          Dump the parsed tests to the console; don't insert in the queue.  # -#  -q, --queue         Use the named beanstalkd tube. -#  #  --test              Test the parsing of the given file, alert on errors.  #  #  -f, --file FILE     Parse the given configuration file. @@ -29,17 +27,15 @@  #  # ABOUT  # -#  This tool reads a single configuration file and parses it into a +# This tool reads a single configuration file and parses it into a  # series of network & protocol tests.   These tests are then stored in -# a beanstalkd queue. +# a queue from which workers can retrieve and execute them.  # -#  The intention is that the tests will be pulled from the queue and -# executed by the companion program custodian-dequeue.  The dequeing -# process may occur up numerous other hosts +# The dequeing process may occur up numerous other hosts  #  # CONFIGURATION FILE  # -#  The configuration file is 99% compatible with that used in the tool +# The configuration file is 99% compatible with that used in the tool  # custodian replaces.  #  # @@ -53,13 +49,13 @@  #  # Standard modules  # -require 'beanstalk-client'  require 'getoptlong'  #  # Our code.  #  require 'custodian/parser' +require 'custodian/queue'  require 'custodian/settings' @@ -77,7 +73,6 @@ if __FILE__ == $0 then    #    settings = Custodian::Settings.instance()    $SERVER  = settings.queue_server -  $QUEUE   = settings.queue_name    begin      opts = GetoptLong.new( @@ -86,7 +81,6 @@ if __FILE__ == $0 then                            [ "--server",   GetoptLong::REQUIRED_ARGUMENT ],                            [ "--file",  "-f", GetoptLong::REQUIRED_ARGUMENT ],                            [ "--help",  "-h", GetoptLong::NO_ARGUMENT ], -                          [ "--queue",  "-q", GetoptLong::REQUIRED_ARGUMENT ],                            [ "--manual","-m", GetoptLong::NO_ARGUMENT ]                            )      opts.each do |opt, arg| @@ -97,8 +91,6 @@ if __FILE__ == $0 then            ENV["TEST"] = "1"        when "--server" then            $SERVER = arg -      when "--queue" then -          $QUEUE= arg         when "--file" then            ENV["FILE"] = arg        when "--help" then @@ -146,7 +138,7 @@ if __FILE__ == $0 then    #    # Connected to the server    # -  queue = Beanstalk::Pool.new([$SERVER], $QUEUE ) +  queue = Custodian::QueueType.create( "redis" )    if ( ! queue )      puts "Failed to connect to beanstalk server: #{$SERVER}"      exit 1 @@ -178,17 +170,7 @@ if __FILE__ == $0 then      elsif ( ENV['DUMP'] )        puts test      else - -      # priority of new job. -      priority = 1000 - -      # delay before putting into queue. -      delay    = 0 - -      # time-to-remove: how long job.reserve() will live for. -      ttr      = 300 - -      queue.put( test.to_s, priority, delay, ttr ) +      queue.add( test.to_s )      end    end diff --git a/bin/custodian-queue b/bin/custodian-queue index afcba2a..23f3841 100755 --- a/bin/custodian-queue +++ b/bin/custodian-queue @@ -1,4 +1,4 @@ -#!/usr/bin/ruby +#!/usr/bin/ruby -Ilib/  #  # NAME  #  custodian-queue - Work with the queue. @@ -42,7 +42,6 @@  #  # Standard modules  # -require 'beanstalk-client'  require 'getoptlong' @@ -50,7 +49,7 @@ require 'getoptlong'  # Our code  #  require 'custodian/settings' - +require 'custodian/queue'  # @@ -69,7 +68,6 @@ if __FILE__ == $0 then    #    settings = Custodian::Settings.instance()    $SERVER  = settings.queue_server -  $QUEUE   = settings.queue_name    begin      opts = GetoptLong.new( @@ -78,7 +76,6 @@ if __FILE__ == $0 then                            [ "--manual",  "-m", GetoptLong::NO_ARGUMENT ],                            [ "--monitor", "-M", GetoptLong::OPTIONAL_ARGUMENT ],                            [ "--server",  "-S", GetoptLong::REQUIRED_ARGUMENT ], -                          [ "--queue",   "-q", GetoptLong::REQUIRED_ARGUMENT ],                            [ "--stats",   "-s", GetoptLong::NO_ARGUMENT ]                            )      opts.each do |opt, arg| @@ -95,8 +92,6 @@ if __FILE__ == $0 then            $FLUSH = true        when "--server" then            $SERVER = arg -      when "--queue" then -          $QUEUE = arg        when "--help" then            $help = true        when "--manual" then @@ -140,7 +135,7 @@ if __FILE__ == $0 then    #    #  Create the queue object.    # -  queue = Beanstalk::Pool.new([$SERVER], $QUEUE ) +  queue = Custodian::QueueType.create( "redis" )    #    # Alerting on a queue that is too-full? @@ -150,8 +145,7 @@ if __FILE__ == $0 then      #      # Find the number of jobs      # -    stats = queue.stats() -    jobs  = stats['current-jobs-ready'] || 0 +    jobs = queue.size?      if ( jobs > $MONITOR )          exit 1 @@ -165,8 +159,8 @@ if __FILE__ == $0 then    # Showing stats?    #    if ( $STATS ) -     stats = queue.stats() -     puts "There are #{stats['current-jobs-ready'] || 0} jobs pending." +     jobs = queue.size?() +     puts "There are #{jobs || 0} jobs pending."       exit( 0 )    end @@ -176,10 +170,11 @@ if __FILE__ == $0 then    #    if ( $FLUSH )      count = 0 -    while( true ) +    run = true +    while( run )        begin -        job = queue.reserve(1) -        job.delete +        job = queue.fetch(1) +        exit(0) if ( job.nil? )          count += 1        rescue Beanstalk::TimedOut => ex          puts "Flushed #{count} pending jobs." diff --git a/lib/custodian/queue.rb b/lib/custodian/queue.rb index a6e062c..d1f2057 100644 --- a/lib/custodian/queue.rb +++ b/lib/custodian/queue.rb @@ -82,7 +82,11 @@ module Custodian      #  Fetch a job from the queue - the timeout parameter is ignored.      #      def fetch(timeout) -      @redis.lpop( "queue" ) +      job = false +      while( ! job ) +        job = @redis.lpop( "queue" ) +      end +      return( job )      end      # diff --git a/lib/custodian/settings.rb b/lib/custodian/settings.rb index 453a35a..edbd798 100644 --- a/lib/custodian/settings.rb +++ b/lib/custodian/settings.rb @@ -140,7 +140,7 @@ module Custodian      # -    # The beanstalkd server address +    # The address of the queue.      #      def queue_server        _load() unless( _loaded? ) @@ -149,16 +149,6 @@ module Custodian      end -    # -    # The name of the beanstalkd tube we'll use -    # -    def queue_name -      _load() unless( _loaded? ) - -      @settings['queue_name'] || "Custodian" -    end - -      #      # The filename for the logfile. diff --git a/lib/custodian/worker.rb b/lib/custodian/worker.rb index b9c9995..964d345 100644 --- a/lib/custodian/worker.rb +++ b/lib/custodian/worker.rb @@ -3,7 +3,6 @@  #  # Standard modules  # -require 'beanstalk-client'  require 'logger' @@ -13,7 +12,7 @@ require 'logger'  #  require 'custodian/alerts'  require 'custodian/settings' - +require 'custodian/queue'  # @@ -29,7 +28,7 @@ require 'custodian/protocoltests'  # -# This class contains the code for connecting to a Beanstalk queue, +# This class contains the code for connecting to the queue,  # fetching tests from it, and executing them  #  module Custodian @@ -38,7 +37,7 @@ module Custodian      # -    # The beanstalk queue. +    # The queue we're using for retrieving tests.      #      attr_reader :queue @@ -77,10 +76,10 @@ module Custodian      #      # Constructor: Connect to the queue      # -    def initialize( server, queue, alerter, logfile, settings ) +    def initialize( server, alerter, logfile, settings )        # Connect to the queue -      @queue = Beanstalk::Pool.new([server], queue ) +      @queue = QueueType.create( "redis" )        # Get the alerter-type to instantiate        @alerter = alerter @@ -141,20 +140,14 @@ module Custodian          #          #  Acquire a job.          # -        job = @queue.reserve() -        log_message( "Job aquired - Job ID : #{job.id}" ) +        job = @queue.fetch(1) +        log_message( "Job aquired: #{job}" )          #          #  Get the job body          # -        body = job.body -        raise ArgumentError, "Job was empty" if (body.nil?) -        raise ArgumentError, "Job was not a string" unless body.kind_of?(String) - -        # -        #  Output the job. -        # -        log_message( "Job: #{body}" ) +        raise ArgumentError, "Job was empty" if (job.nil?) +        raise ArgumentError, "Job was not a string" unless job.kind_of?(String)          # @@ -166,7 +159,7 @@ module Custodian          #          # Create the test-object.          # -        test = Custodian::TestFactory.create( body ) +        test = Custodian::TestFactory.create( job )          start_time = Time.now @@ -240,14 +233,6 @@ module Custodian        rescue => ex          log_message( "Exception raised processing job: #{ex}" ) -      ensure -        # -        #  Delete the job - either we received an error, in which case -        # we should remove it to avoid picking it up again, or we handled -        # it successfully so it should be removed. -        # -        log_message( "Job ID : #{job.id} - Removed" ) -        job.delete if ( job )        end        return result | 
