diff options
Diffstat (limited to 'bin/worker')
| -rwxr-xr-x | bin/worker | 367 | 
1 files changed, 367 insertions, 0 deletions
| diff --git a/bin/worker b/bin/worker new file mode 100755 index 0000000..3c9e355 --- /dev/null +++ b/bin/worker @@ -0,0 +1,367 @@ +#!/usr/bin/ruby +# +#  This script will pull tests to complete from the Beanstalk Queue, +# where they will be found in JSON form, and executes them. +# +# Steve +# -- +# + + + +require 'beanstalk-client' +require 'getoptlong' +require 'json' +require 'logger' + +require 'mauve/sender' +require 'mauve/proto' + + + +# +# Implementation of our protocol tests. +# +require 'tests/ftp' +require 'tests/http' +require 'tests/https' +require 'tests/jabber' +require 'tests/ldap' +require 'tests/ping' +require 'tests/rsync' +require 'tests/smtp' +require 'tests/ssh' + + + + +# +#  This class encapsulates the raising and clearing of alerts via Mauve. +# +class Alert + +  attr_reader :details + +  def initialize( test_details ) +    @details = test_details +  end + + +  # +  # Raise the alert. +  # +  def raise( detail ) + +    puts "RAISE: #{detail}" +    return + +    update = Mauve::Proto::AlertUpdate.new +    update.alert   = [] +    update.source  = "custodian" +    update.replace = true + +    alert            = Mauve::Proto::Alert.new +    alert.id         = @details['test_type'] +    alert.summary    = "#{@details['test_host']} #{@details['test_alert']}" +    alert.detail     = "The #{@details['test_type']} test failed against #{@details['test_host']}: #{detail}" +    alert.raise_time = Time.now.to_i +    update.alert << alert + +    Mauve::Sender.new("alert.bytemark.co.uk").send(update) + +  end + +  # +  #  Clear the alert. +  # +  def clear +    puts "CLEAR" +    return + +    update = Mauve::Proto::AlertUpdate.new +    update.alert   = [] +    update.source  = "custodian" +    update.replace = true + +    alert            = Mauve::Proto::Alert.new +    alert.id         = @details['test_type'] +    alert.summary    = "#{@details['test_host']} #{@details['test_alert']}" +    alert.detail     = "The #{@details['test_type']} test succeeded against #{@details['test_host']}" +    alert.clear_time = Time.now.to_i +    update.alert << alert + +    Mauve::Sender.new("alert.bytemark.co.uk").send(update) +  end + +end + + + + +# +# This class contains the code for connecting to a Beanstalk queue, +# fetching tests from it, and executing them +# +class Custodian + +  # +  # The beanstalk queue. +  # +  attr_reader :queue + +  # +  # How many times we re-test before we detect a failure +  # +  attr_reader :retry_count + +  # +  # The log-file object +  # +  attr_reader :logger + +  # +  # Constructor: Connect to the queue +  # +  def initialize( server ) + +    # Connect to the queue +    @queue = Beanstalk::Pool.new([server]) + +    # Instantiate the logger. +    @logger = Logger.new( "worker.log", "daily" ) + +    if ( ENV['REPEAT'] ) +       @retry_count=ENV['REPEAT'].to_i +    else +       @retry_count=5 +    end + +    log_message( "We'll run each test #{@retry_count} before alerting failures." ) +  end + + +  # +  # Write the given message to our logfile - and show it to the console +  # if we're running with '--verbose' in play +  # +  def log_message( msg ) +      @logger.info( msg ) +      puts msg if ( ENV['VERBOSE'] ) +  end + +  # +  #  Flush the queue. +  # +  def flush_queue! + +    log_message( "Flushing queue" ) + +    while( true ) +      begin +        job = @queue.reserve(1) +        id  = job.id +        log_message( "Deleted job #{id}" ) +        job.delete +      rescue Beanstalk::TimedOut => ex +        log_message( "The queue is now empty" ) +        return +      end +    end +  end + + + +  # +  # Process jobs from the queue - never return. +  # +  def run! +    while( true ) +      log_message( "\n" ) +      log_message( "\n" ) +      log_message( "Waiting for job.." ) +      process_single_job() +    end +  end + + + +  # +  # Fetch a single job from the queue, and process it. +  # +  def process_single_job + +    begin +      job = @queue.reserve() + +      log_message( "Job aquired - Job ID : #{job.id}" ) + + +      # +      #  Parse the JSON of the job body. +      # +      json = job.body +      hash = JSON.parse( json ) +      hash['verbose'] = 1 if ( ENV['VERBOSE'] ) + + +      # +      #  Output the details. +      # +      log_message( "Job body contains the following keys & values:") +      hash.keys.each do |key| +        log_message( "#{key} => #{hash[key]}" ) +      end + + + +      # +      # Did the test succeed?  If not count the number of times it failed in +      # a row.  We'll repeat several times +      # +      success = false +      count   = 0 + +      # +      # As a result of this test we'll either raise/clear with mauve. +      # +      # This helper will do that job. +      # +      alert = Alert.new( hash ) + + +      # +      # Convert the test-type to a class name, to do the protocol test. +      # +      # Given a test-type "foo" we'll attempt to instantiate a class called FOOTest. +      # +      test  = hash['test_type'] +      clazz = test.upcase +      clazz = "#{clazz}Test" + + +      # +      # Create the test object. +      # +      obj = eval(clazz).new( hash ) + + +      # +      # Ensure that the object we load implements the two methods +      # we expect. +      # +      if ( ( ! obj.respond_to?( "error") ) || +           ( ! obj.respond_to?( "run_test" ) ) ) +        puts "Class #{clazz} doesn't implement the full protocol-test API" +      end + + + +      # +      #  We'll run no more than MAX times. +      # +      #  We stop the execution on a single success. +      # +      while ( ( count < @retry_count ) && ( success == false ) ) + +        if ( obj.run_test() ) +          log_message( "Test succeeed - clearing alert" ) +          alert.clear() +          success= true +        end +        count += 1 +      end + +      # +      #  If we didn't succeed on any of the attempts raise the alert. +      # +      if ( ! success ) + +        # +        # Raise the alert, passing the error message. +        # +        log_message( "Test failed - alerting with #{obj.error()}" ) +        alert.raise( obj.error() ) +      end + +    rescue => ex +      puts "Exception raised processing job: #{ex}" + +    ensure +      # +      #  Delete the job - either we received an error, in which case +      # we should remove it to avoid picking it up again, or we handled +      # it successfully so it should be removed. +      # +      log_message( "Job ID : #{job.id} - Removed" ) +      job.delete if ( job ) +    end +  end +end + + + + + + + +# +#  Entry-point to our code. +# +if __FILE__ == $0 then + +  $SERVER = "127.0.0.1:11300"; + +  begin +    opts = GetoptLong.new( +                          [ "--verbose", "-v", GetoptLong::NO_ARGUMENT ], +                          [ "--flush",   "-f", GetoptLong::NO_ARGUMENT ], +                          [ "--server",  "-S", GetoptLong::REQUIRED_ARGUMENT ], +                          [ "--repeat",  "-r", GetoptLong::REQUIRED_ARGUMENT ], +                          [ "--single",  "-s", GetoptLong::NO_ARGUMENT ] +                          ) +    opts.each do |opt, arg| +      case opt +      when "--verbose": +          ENV["VERBOSE"] = "1" +      when "--flush": +          ENV["FLUSH"] = "1" +      when "--repeat": +          ENV["REPEAT"] = arg +      when "--server": +          $SERVER = arg +      when "--single": +          ENV["SINGLE"] = "1" +      end +    end +  rescue StandardError => ex +    puts "Option parsing failed: #{ex.to_s}" +    exit +  end + +  # +  #  Create the object +  # +  worker = Custodian.new( $SERVER ) + +  # +  #  Are we flushing the queue? +  # +  if ( ENV['FLUSH'] ) +    worker.flush_queue! +    exit(0) +  end + +  # +  #  Single step? +  # +  if ( ENV['SINGLE'] ) +    worker.process_single_job +    exit(0) +  end + +  # +  #  Otherwise loop indefinitely +  # +  worker.run! + +end | 
