From d1af2bb786fd82a071718bd4776a900c754191ec Mon Sep 17 00:00:00 2001 From: Steve Kemp Date: Tue, 13 Nov 2012 17:28:34 +0000 Subject: Renamed --HG-- rename : bin/worker => bin/custodian-dequeue rename : bin/parser.rb => bin/custodian-enqueue --- bin/custodian-dequeue | 368 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 368 insertions(+) create mode 100755 bin/custodian-dequeue (limited to 'bin/custodian-dequeue') diff --git a/bin/custodian-dequeue b/bin/custodian-dequeue new file mode 100755 index 0000000..e897c84 --- /dev/null +++ b/bin/custodian-dequeue @@ -0,0 +1,368 @@ +#!/usr/bin/ruby -Ilib/ -I../lib/ +# +# This script will pull tests to complete from the Beanstalk Queue, +# where they will be found in JSON form, and executes them. +# +# Steve +# -- +# + + + +require 'beanstalk-client' +require 'getoptlong' +require 'json' +require 'logger' + +require 'mauve/sender' +require 'mauve/proto' + + + +# +# Implementation of our protocol tests. +# +require 'custodian/protocol-tests/ftp.rb' +require 'custodian/protocol-tests/http.rb' +require 'custodian/protocol-tests/https.rb' +require 'custodian/protocol-tests/jabber.rb' +require 'custodian/protocol-tests/ldap.rb' +require 'custodian/protocol-tests/ping.rb' +require 'custodian/protocol-tests/rsync.rb' +require 'custodian/protocol-tests/smtp.rb' +require 'custodian/protocol-tests/ssh.rb' + + + + + +# +# This class encapsulates the raising and clearing of alerts via Mauve. +# +class Alert + + attr_reader :details + + def initialize( test_details ) + @details = test_details + end + + + # + # Raise the alert. + # + def raise( detail ) + + puts "RAISE: #{detail}" + return + + update = Mauve::Proto::AlertUpdate.new + update.alert = [] + update.source = "custodian" + update.replace = true + + alert = Mauve::Proto::Alert.new + alert.id = @details['test_type'] + alert.summary = "#{@details['test_host']} #{@details['test_alert']}" + alert.detail = "The #{@details['test_type']} test failed against #{@details['test_host']}: #{detail}" + alert.raise_time = Time.now.to_i + update.alert << alert + + Mauve::Sender.new("alert.bytemark.co.uk").send(update) + + end + + # + # Clear the alert. + # + def clear + puts "CLEAR" + return + + update = Mauve::Proto::AlertUpdate.new + update.alert = [] + update.source = "custodian" + update.replace = true + + alert = Mauve::Proto::Alert.new + alert.id = @details['test_type'] + alert.summary = "#{@details['test_host']} #{@details['test_alert']}" + alert.detail = "The #{@details['test_type']} test succeeded against #{@details['test_host']}" + alert.clear_time = Time.now.to_i + update.alert << alert + + Mauve::Sender.new("alert.bytemark.co.uk").send(update) + end + +end + + + + +# +# This class contains the code for connecting to a Beanstalk queue, +# fetching tests from it, and executing them +# +class Custodian + + # + # The beanstalk queue. + # + attr_reader :queue + + # + # How many times we re-test before we detect a failure + # + attr_reader :retry_count + + # + # The log-file object + # + attr_reader :logger + + # + # Constructor: Connect to the queue + # + def initialize( server ) + + # Connect to the queue + @queue = Beanstalk::Pool.new([server]) + + # Instantiate the logger. + @logger = Logger.new( "worker.log", "daily" ) + + if ( ENV['REPEAT'] ) + @retry_count=ENV['REPEAT'].to_i + else + @retry_count=5 + end + + log_message( "We'll run each test #{@retry_count} before alerting failures." ) + end + + + # + # Write the given message to our logfile - and show it to the console + # if we're running with '--verbose' in play + # + def log_message( msg ) + @logger.info( msg ) + puts msg if ( ENV['VERBOSE'] ) + end + + # + # Flush the queue. + # + def flush_queue! + + log_message( "Flushing queue" ) + + while( true ) + begin + job = @queue.reserve(1) + id = job.id + log_message( "Deleted job #{id}" ) + job.delete + rescue Beanstalk::TimedOut => ex + log_message( "The queue is now empty" ) + return + end + end + end + + + + # + # Process jobs from the queue - never return. + # + def run! + while( true ) + log_message( "\n" ) + log_message( "\n" ) + log_message( "Waiting for job.." ) + process_single_job() + end + end + + + + # + # Fetch a single job from the queue, and process it. + # + def process_single_job + + begin + job = @queue.reserve() + + log_message( "Job aquired - Job ID : #{job.id}" ) + + + # + # Parse the JSON of the job body. + # + json = job.body + hash = JSON.parse( json ) + hash['verbose'] = 1 if ( ENV['VERBOSE'] ) + + + # + # Output the details. + # + log_message( "Job body contains the following keys & values:") + hash.keys.each do |key| + log_message( "#{key} => #{hash[key]}" ) + end + + + + # + # Did the test succeed? If not count the number of times it failed in + # a row. We'll repeat several times + # + success = false + count = 0 + + # + # As a result of this test we'll either raise/clear with mauve. + # + # This helper will do that job. + # + alert = Alert.new( hash ) + + + # + # Convert the test-type to a class name, to do the protocol test. + # + # Given a test-type "foo" we'll attempt to instantiate a class called FOOTest. + # + test = hash['test_type'] + clazz = test.upcase + clazz = "#{clazz}Test" + + + # + # Create the test object. + # + obj = eval(clazz).new( hash ) + + + # + # Ensure that the object we load implements the two methods + # we expect. + # + if ( ( ! obj.respond_to?( "error") ) || + ( ! obj.respond_to?( "run_test" ) ) ) + puts "Class #{clazz} doesn't implement the full protocol-test API" + end + + + + # + # We'll run no more than MAX times. + # + # We stop the execution on a single success. + # + while ( ( count < @retry_count ) && ( success == false ) ) + + if ( obj.run_test() ) + log_message( "Test succeeed - clearing alert" ) + alert.clear() + success= true + end + count += 1 + end + + # + # If we didn't succeed on any of the attempts raise the alert. + # + if ( ! success ) + + # + # Raise the alert, passing the error message. + # + log_message( "Test failed - alerting with #{obj.error()}" ) + alert.raise( obj.error() ) + end + + rescue => ex + puts "Exception raised processing job: #{ex}" + + ensure + # + # Delete the job - either we received an error, in which case + # we should remove it to avoid picking it up again, or we handled + # it successfully so it should be removed. + # + log_message( "Job ID : #{job.id} - Removed" ) + job.delete if ( job ) + end + end +end + + + + + + + +# +# Entry-point to our code. +# +if __FILE__ == $0 then + + $SERVER = "127.0.0.1:11300"; + + begin + opts = GetoptLong.new( + [ "--verbose", "-v", GetoptLong::NO_ARGUMENT ], + [ "--flush", "-f", GetoptLong::NO_ARGUMENT ], + [ "--server", "-S", GetoptLong::REQUIRED_ARGUMENT ], + [ "--repeat", "-r", GetoptLong::REQUIRED_ARGUMENT ], + [ "--single", "-s", GetoptLong::NO_ARGUMENT ] + ) + opts.each do |opt, arg| + case opt + when "--verbose": + ENV["VERBOSE"] = "1" + when "--flush": + ENV["FLUSH"] = "1" + when "--repeat": + ENV["REPEAT"] = arg + when "--server": + $SERVER = arg + when "--single": + ENV["SINGLE"] = "1" + end + end + rescue StandardError => ex + puts "Option parsing failed: #{ex.to_s}" + exit + end + + # + # Create the object + # + worker = Custodian.new( $SERVER ) + + # + # Are we flushing the queue? + # + if ( ENV['FLUSH'] ) + worker.flush_queue! + exit(0) + end + + # + # Single step? + # + if ( ENV['SINGLE'] ) + worker.process_single_job + exit(0) + end + + # + # Otherwise loop indefinitely + # + worker.run! + +end -- cgit v1.2.1