#!/usr/bin/ruby1.8 -Ilib/ -I../lib/ # # NAME # custodian-dequeue - Pull network tests from a queue and execute them in series. # # SYNOPSIS # custodian-dequeue [ -h | --help ] # [ -m | --manual] # [ --server 1.2.3.4:123 ] # [ -s | --single ] # [ -v | --verbose ] # [ -f | --flush ] # # OPTIONS # # -h, --help Show a help message, and exit. # # -m, --manual Show this manual, and exit. # # -f, --flush Flush the queue, removing all jobs. # # -s, --single Run a single test and exit. Don't poll the queue for more. # # -v, --verbose Be noisy. # # ABOUT # # This tool is designed to pull JSON-encoded network/protocol-tests from # a beanstalkd server and execute them, in series. # # The tests are created, via custodian-enqueue, by parsing a configuration # file largely compatible with that used for our obsolete sentinel tool. # # The results of the testing will be sent to a mauvealert server. # # # AUTHOR # # Steve Kemp <steve@bytemark.co.uk> # require 'beanstalk-client' require 'getoptlong' require 'json' require 'logger' require 'mauve/sender' require 'mauve/proto' # # Implementation of our protocol tests. # require 'custodian/protocol-tests/dns.rb' require 'custodian/protocol-tests/ftp.rb' require 'custodian/protocol-tests/http.rb' require 'custodian/protocol-tests/https.rb' require 'custodian/protocol-tests/jabber.rb' require 'custodian/protocol-tests/ldap.rb' require 'custodian/protocol-tests/ping.rb' require 'custodian/protocol-tests/rsync.rb' require 'custodian/protocol-tests/smtp.rb' require 'custodian/protocol-tests/ssh.rb' # # This class encapsulates the raising and clearing of alerts via Mauve. # class Alert attr_reader :details def initialize( test_details ) @details = test_details end # # Raise the alert. # def raise( detail ) puts "RAISE: #{detail}" return update = Mauve::Proto::AlertUpdate.new update.alert = [] update.source = "custodian" update.replace = true alert = Mauve::Proto::Alert.new alert.id = @details['test_type'] alert.summary = "#{@details['test_host']} #{@details['test_alert']}" alert.detail = "The #{@details['test_type']} test failed against #{@details['test_host']}: #{detail}" alert.raise_time = Time.now.to_i update.alert << alert Mauve::Sender.new("alert.bytemark.co.uk").send(update) end # # Clear the alert. # def clear puts "CLEAR" return update = Mauve::Proto::AlertUpdate.new update.alert = [] update.source = "custodian" update.replace = true alert = Mauve::Proto::Alert.new alert.id = @details['test_type'] alert.summary = "#{@details['test_host']} #{@details['test_alert']}" alert.detail = "The #{@details['test_type']} test succeeded against #{@details['test_host']}" alert.clear_time = Time.now.to_i update.alert << alert Mauve::Sender.new("alert.bytemark.co.uk").send(update) end end # # This class contains the code for connecting to a Beanstalk queue, # fetching tests from it, and executing them # class Custodian # # The beanstalk queue. # attr_reader :queue # # How many times we re-test before we detect a failure # attr_reader :retry_count # # The log-file object # attr_reader :logger # # Constructor: Connect to the queue # def initialize( server ) # Connect to the queue @queue = Beanstalk::Pool.new([server]) # Instantiate the logger. @logger = Logger.new( "worker.log", "daily" ) if ( ENV['REPEAT'] ) @retry_count=ENV['REPEAT'].to_i else @retry_count=5 end log_message( "We'll run each test #{@retry_count} before alerting failures." ) end # # Write the given message to our logfile - and show it to the console # if we're running with '--verbose' in play # def log_message( msg ) @logger.info( msg ) puts msg if ( ENV['VERBOSE'] ) end # # Flush the queue. # def flush_queue! log_message( "Flushing queue" ) while( true ) begin job = @queue.reserve(1) id = job.id log_message( "Deleted job #{id}" ) job.delete rescue Beanstalk::TimedOut => ex log_message( "The queue is now empty" ) return end end end # # Process jobs from the queue - never return. # def run! while( true ) log_message( "\n" ) log_message( "\n" ) log_message( "Waiting for job.." ) process_single_job() end end # # Fetch a single job from the queue, and process it. # def process_single_job begin job = @queue.reserve() log_message( "Job aquired - Job ID : #{job.id}" ) # # Parse the JSON of the job body. # json = job.body hash = JSON.parse( json ) hash['verbose'] = 1 if ( ENV['VERBOSE'] ) # # Output the details. # log_message( "Job body contains the following keys & values:") hash.keys.each do |key| log_message( "#{key} => #{hash[key]}" ) end # # Did the test succeed? If not count the number of times it failed in # a row. We'll repeat several times # success = false count = 0 # # As a result of this test we'll either raise/clear with mauve. # # This helper will do that job. # alert = Alert.new( hash ) # # Convert the test-type to a class name, to do the protocol test. # # Given a test-type "foo" we'll attempt to instantiate a class called FOOTest. # test = hash['test_type'] clazz = test.upcase clazz = "#{clazz}Test" # # Create the test object. # obj = eval(clazz).new( hash ) # # Ensure that the object we load implements the two methods # we expect. # if ( ( ! obj.respond_to?( "error") ) || ( ! obj.respond_to?( "run_test" ) ) ) puts "Class #{clazz} doesn't implement the full protocol-test API" end # # We'll run no more than MAX times. # # We stop the execution on a single success. # while ( ( count < @retry_count ) && ( success == false ) ) if ( obj.run_test() ) log_message( "Test succeeed - clearing alert" ) alert.clear() success= true end count += 1 end # # If we didn't succeed on any of the attempts raise the alert. # if ( ! success ) # # Raise the alert, passing the error message. # log_message( "Test failed - alerting with #{obj.error()}" ) alert.raise( obj.error() ) end rescue => ex puts "Exception raised processing job: #{ex}" ensure # # Delete the job - either we received an error, in which case # we should remove it to avoid picking it up again, or we handled # it successfully so it should be removed. # log_message( "Job ID : #{job.id} - Removed" ) job.delete if ( job ) end end end # # Entry-point to our code. # if __FILE__ == $0 then $SERVER = "127.0.0.1:11300" $help = false $manual = false begin opts = GetoptLong.new( [ "--flush", "-f", GetoptLong::NO_ARGUMENT ], [ "--help", "-h", GetoptLong::NO_ARGUMENT ], [ "--manual", "-m", GetoptLong::NO_ARGUMENT ], [ "--repeat", "-r", GetoptLong::REQUIRED_ARGUMENT ], [ "--server", "-S", GetoptLong::REQUIRED_ARGUMENT ], [ "--single", "-s", GetoptLong::NO_ARGUMENT ], [ "--verbose", "-v", GetoptLong::NO_ARGUMENT ] ) opts.each do |opt, arg| case opt when "--verbose": ENV["VERBOSE"] = "1" when "--flush": ENV["FLUSH"] = "1" when "--repeat": ENV["REPEAT"] = arg when "--server": $SERVER = arg when "--single": ENV["SINGLE"] = "1" when "--help": $help = true when "--manual": $manual = true end end rescue StandardError => ex puts "Option parsing failed: #{ex.to_s}" exit end # # CAUTION! Here be quality kode. # if $manual or $help # Open the file, stripping the shebang line lines = File.open(__FILE__){|fh| fh.readlines}[1..-1] found_synopsis = false lines.each do |line| line.chomp! break if line.empty? if $help and !found_synopsis found_synopsis = (line =~ /^#\s+SYNOPSIS\s*$/) next end puts line[2..-1].to_s break if $help and found_synopsis and line =~ /^#\s*$/ end exit 0 end # # Create the object # worker = Custodian.new( $SERVER ) # # Are we flushing the queue? # if ( ENV['FLUSH'] ) worker.flush_queue! exit(0) end # # Single step? # if ( ENV['SINGLE'] ) worker.process_single_job exit(0) end # # Otherwise loop indefinitely # worker.run! end