#!/usr/bin/ruby # # This script will pull tests to complete from the Beanstalk Queue, # where they will be found in JSON form, and executes them. # # # TODO: Command line parsing: # # 1. enable/disable logging to a file # # # # Steve # -- # require 'beanstalk-client' require 'getoptlong' require 'json' require 'mauve/sender' require 'mauve/proto' # # Implementation of our protocol tests. # require 'tests/ftp' require 'tests/http' require 'tests/https' require 'tests/jabber' require 'tests/ldap' require 'tests/ping' require 'tests/rsync' require 'tests/smtp' require 'tests/ssh' # # This class encapsulates the raising and clearing of alerts via Mauve. # class Alert attr_reader :details def initialize( test_details ) @details = test_details end # # Raise the alert. # def raise update = Mauve::Proto::AlertUpdate.new update.alert = [] update.source = "custodian" update.replace = true alert = Mauve::Proto::Alert.new alert.id = @details['test_type'] alert.subject = "#{@details['test_host']} #{@details['test_alert']]" alert.detail = "The test of type #{@details['test_type']} failed" alert.raise_time = Time.now.to_i update.alert << alert Mauve::Sender.new("alert.bytemark.co.uk").send(update) end # # Clear the alert. # def clear update = Mauve::Proto::AlertUpdate.new update.alert = [] update.source = "custodian" update.replace = true alert = Mauve::Proto::Alert.new alert.id = @details['test_type'] alert.subject = "#{@details['test_host']} #{@details['test_alert']]" alert.detail = "The test of type #{@details['test_type']} failed" alert.clear_time = Time.now.to_i update.alert << alert Mauve::Sender.new("alert.bytemark.co.uk").send(update) end end # # This class contains the code for connecting to a Beanstalk queue, # fetching tests from it, and executing them # class Custodian # # The beanstalk queue. # attr_reader :queue # # How many times we re-test before we detect a failure # attr_reader :retry_count # # Constructor: Connect to the queue # def initialize( server ) @queue = Beanstalk::Pool.new([server]) if ( ENV['REPEAT'] ) @retry_count=ENV['REPEAT'].to_i else @retry_count=5 end puts "Retrying each test #{@retry_count} times before failures" if ( ENV['VERBOSE'] ) end # # Flush the queue. # def flush_queue! while( true ) begin job = @queue.reserve(1) id = job.id puts "\tDeleted job #{id}" if ( ENV['VERBOSE'] ) job.delete rescue Beanstalk::TimedOut => ex return end end end # # Process jobs from the queue - never return. # def run! while( true ) puts "\n\nWaiting for job.." if ( ENV['VERBOSE'] ) process_single_job() end end # # Fetch a single job from the queue, and process it. # def process_single_job begin job = @queue.reserve() puts "Job acquired: #{Time.new.inspect}" if ( ENV['VERBOSE'] ) # # Parse the JSON of the job body. # json = job.body hash = JSON.parse( json ) hash['verbose'] = 1 if ( ENV['VERBOSE'] ) # # Output the details. # if ( ENV['VERBOSE'] ) puts "JOB: #{job.id}" puts "Type of test is #{hash['test_type']}" hash.keys.each do |key| puts "\t#{key} => #{hash[key]}" end end # # Given the test-type of "YYY" we'll call the method "YYY_test", which # we assume comes from one of the files beneath ./tests/ # test = hash['test_type'] method = "#{test}_test".to_sym # # Did the test succeed? If not count the number of times it failed in # a row. We'll repeat several times # success = false count = 0 # # As a result of this test we'll either raise/clear with mauve. # # This helper will do that job. # alert = Alert.new( hash ) # # We'll run no more than MAX times. # # We stop the execution on a single success. # while ( ( count < @retry_count ) && ( success == false ) ) if ( send( method, hash ) ) alert.clear() success= true end count += 1 end # # If we didn't succeed on any of the attempts raise the alert. # if ( ! success ) alert.raise() end rescue => ex puts "Exception raised processing job: #{ex}" ensure # # Delete the job - either we received an error, in which case # we should remove it to avoid picking it up again, or we handled # it successfully so it should be removed. # job.delete end end end # # Entry-point to our code. # if __FILE__ == $0 then $SERVER = "127.0.0.1:11300"; begin opts = GetoptLong.new( [ "--verbose", "-v", GetoptLong::NO_ARGUMENT ], [ "--flush", "-f", GetoptLong::NO_ARGUMENT ], [ "--server", "-S", GetoptLong::REQUIRED_ARGUMENT ], [ "--repeat", "-r", GetoptLong::REQUIRED_ARGUMENT ], [ "--single", "-s", GetoptLong::NO_ARGUMENT ] ) opts.each do |opt, arg| case opt when "--verbose": ENV["VERBOSE"] = "1" when "--flush": ENV["FLUSH"] = "1" when "--repeat": ENV["REPEAT"] = arg when "--server": ENV["SERVER"] = arg when "--single": ENV["SINGLE"] = "1" end end rescue StandardError => ex puts "Option parsing failed: #{ex.to_s}" exit end # # Create the object # worker = Custodian.new( $SERVER ) # # Are we flushing the queue? # if ( ENV['FLUSH'] ) worker.flush_queue! exit(0) end # # Single step? # if ( ENV['SINGLE'] ) worker.process_single_job exit(0) end # # Otherwise loop indefinitely # worker.run! end