#!/usr/bin/ruby -Ilib/ -I../lib/ # # This script will pull tests to complete from the Beanstalk Queue, # where they will be found in JSON form, and executes them. # # Steve # -- # require 'beanstalk-client' require 'getoptlong' require 'json' require 'logger' require 'mauve/sender' require 'mauve/proto' # # Implementation of our protocol tests. # require 'custodian/protocol-tests/ftp.rb' require 'custodian/protocol-tests/http.rb' require 'custodian/protocol-tests/https.rb' require 'custodian/protocol-tests/jabber.rb' require 'custodian/protocol-tests/ldap.rb' require 'custodian/protocol-tests/ping.rb' require 'custodian/protocol-tests/rsync.rb' require 'custodian/protocol-tests/smtp.rb' require 'custodian/protocol-tests/ssh.rb' # # This class encapsulates the raising and clearing of alerts via Mauve. # class Alert attr_reader :details def initialize( test_details ) @details = test_details end # # Raise the alert. # def raise( detail ) puts "RAISE: #{detail}" return update = Mauve::Proto::AlertUpdate.new update.alert = [] update.source = "custodian" update.replace = true alert = Mauve::Proto::Alert.new alert.id = @details['test_type'] alert.summary = "#{@details['test_host']} #{@details['test_alert']}" alert.detail = "The #{@details['test_type']} test failed against #{@details['test_host']}: #{detail}" alert.raise_time = Time.now.to_i update.alert << alert Mauve::Sender.new("alert.bytemark.co.uk").send(update) end # # Clear the alert. # def clear puts "CLEAR" return update = Mauve::Proto::AlertUpdate.new update.alert = [] update.source = "custodian" update.replace = true alert = Mauve::Proto::Alert.new alert.id = @details['test_type'] alert.summary = "#{@details['test_host']} #{@details['test_alert']}" alert.detail = "The #{@details['test_type']} test succeeded against #{@details['test_host']}" alert.clear_time = Time.now.to_i update.alert << alert Mauve::Sender.new("alert.bytemark.co.uk").send(update) end end # # This class contains the code for connecting to a Beanstalk queue, # fetching tests from it, and executing them # class Custodian # # The beanstalk queue. # attr_reader :queue # # How many times we re-test before we detect a failure # attr_reader :retry_count # # The log-file object # attr_reader :logger # # Constructor: Connect to the queue # def initialize( server ) # Connect to the queue @queue = Beanstalk::Pool.new([server]) # Instantiate the logger. @logger = Logger.new( "worker.log", "daily" ) if ( ENV['REPEAT'] ) @retry_count=ENV['REPEAT'].to_i else @retry_count=5 end log_message( "We'll run each test #{@retry_count} before alerting failures." ) end # # Write the given message to our logfile - and show it to the console # if we're running with '--verbose' in play # def log_message( msg ) @logger.info( msg ) puts msg if ( ENV['VERBOSE'] ) end # # Flush the queue. # def flush_queue! log_message( "Flushing queue" ) while( true ) begin job = @queue.reserve(1) id = job.id log_message( "Deleted job #{id}" ) job.delete rescue Beanstalk::TimedOut => ex log_message( "The queue is now empty" ) return end end end # # Process jobs from the queue - never return. # def run! while( true ) log_message( "\n" ) log_message( "\n" ) log_message( "Waiting for job.." ) process_single_job() end end # # Fetch a single job from the queue, and process it. # def process_single_job begin job = @queue.reserve() log_message( "Job aquired - Job ID : #{job.id}" ) # # Parse the JSON of the job body. # json = job.body hash = JSON.parse( json ) hash['verbose'] = 1 if ( ENV['VERBOSE'] ) # # Output the details. # log_message( "Job body contains the following keys & values:") hash.keys.each do |key| log_message( "#{key} => #{hash[key]}" ) end # # Did the test succeed? If not count the number of times it failed in # a row. We'll repeat several times # success = false count = 0 # # As a result of this test we'll either raise/clear with mauve. # # This helper will do that job. # alert = Alert.new( hash ) # # Convert the test-type to a class name, to do the protocol test. # # Given a test-type "foo" we'll attempt to instantiate a class called FOOTest. # test = hash['test_type'] clazz = test.upcase clazz = "#{clazz}Test" # # Create the test object. # obj = eval(clazz).new( hash ) # # Ensure that the object we load implements the two methods # we expect. # if ( ( ! obj.respond_to?( "error") ) || ( ! obj.respond_to?( "run_test" ) ) ) puts "Class #{clazz} doesn't implement the full protocol-test API" end # # We'll run no more than MAX times. # # We stop the execution on a single success. # while ( ( count < @retry_count ) && ( success == false ) ) if ( obj.run_test() ) log_message( "Test succeeed - clearing alert" ) alert.clear() success= true end count += 1 end # # If we didn't succeed on any of the attempts raise the alert. # if ( ! success ) # # Raise the alert, passing the error message. # log_message( "Test failed - alerting with #{obj.error()}" ) alert.raise( obj.error() ) end rescue => ex puts "Exception raised processing job: #{ex}" ensure # # Delete the job - either we received an error, in which case # we should remove it to avoid picking it up again, or we handled # it successfully so it should be removed. # log_message( "Job ID : #{job.id} - Removed" ) job.delete if ( job ) end end end # # Entry-point to our code. # if __FILE__ == $0 then $SERVER = "127.0.0.1:11300"; begin opts = GetoptLong.new( [ "--verbose", "-v", GetoptLong::NO_ARGUMENT ], [ "--flush", "-f", GetoptLong::NO_ARGUMENT ], [ "--server", "-S", GetoptLong::REQUIRED_ARGUMENT ], [ "--repeat", "-r", GetoptLong::REQUIRED_ARGUMENT ], [ "--single", "-s", GetoptLong::NO_ARGUMENT ] ) opts.each do |opt, arg| case opt when "--verbose": ENV["VERBOSE"] = "1" when "--flush": ENV["FLUSH"] = "1" when "--repeat": ENV["REPEAT"] = arg when "--server": $SERVER = arg when "--single": ENV["SINGLE"] = "1" end end rescue StandardError => ex puts "Option parsing failed: #{ex.to_s}" exit end # # Create the object # worker = Custodian.new( $SERVER ) # # Are we flushing the queue? # if ( ENV['FLUSH'] ) worker.flush_queue! exit(0) end # # Single step? # if ( ENV['SINGLE'] ) worker.process_single_job exit(0) end # # Otherwise loop indefinitely # worker.run! end