From d1af2bb786fd82a071718bd4776a900c754191ec Mon Sep 17 00:00:00 2001 From: Steve Kemp Date: Tue, 13 Nov 2012 17:28:34 +0000 Subject: Renamed --HG-- rename : bin/worker => bin/custodian-dequeue rename : bin/parser.rb => bin/custodian-enqueue --- bin/worker | 368 ------------------------------------------------------------- 1 file changed, 368 deletions(-) delete mode 100755 bin/worker (limited to 'bin/worker') diff --git a/bin/worker b/bin/worker deleted file mode 100755 index e897c84..0000000 --- a/bin/worker +++ /dev/null @@ -1,368 +0,0 @@ -#!/usr/bin/ruby -Ilib/ -I../lib/ -# -# This script will pull tests to complete from the Beanstalk Queue, -# where they will be found in JSON form, and executes them. -# -# Steve -# -- -# - - - -require 'beanstalk-client' -require 'getoptlong' -require 'json' -require 'logger' - -require 'mauve/sender' -require 'mauve/proto' - - - -# -# Implementation of our protocol tests. -# -require 'custodian/protocol-tests/ftp.rb' -require 'custodian/protocol-tests/http.rb' -require 'custodian/protocol-tests/https.rb' -require 'custodian/protocol-tests/jabber.rb' -require 'custodian/protocol-tests/ldap.rb' -require 'custodian/protocol-tests/ping.rb' -require 'custodian/protocol-tests/rsync.rb' -require 'custodian/protocol-tests/smtp.rb' -require 'custodian/protocol-tests/ssh.rb' - - - - - -# -# This class encapsulates the raising and clearing of alerts via Mauve. -# -class Alert - - attr_reader :details - - def initialize( test_details ) - @details = test_details - end - - - # - # Raise the alert. - # - def raise( detail ) - - puts "RAISE: #{detail}" - return - - update = Mauve::Proto::AlertUpdate.new - update.alert = [] - update.source = "custodian" - update.replace = true - - alert = Mauve::Proto::Alert.new - alert.id = @details['test_type'] - alert.summary = "#{@details['test_host']} #{@details['test_alert']}" - alert.detail = "The #{@details['test_type']} test failed against #{@details['test_host']}: #{detail}" - alert.raise_time = Time.now.to_i - update.alert << alert - - Mauve::Sender.new("alert.bytemark.co.uk").send(update) - - end - - # - # Clear the alert. - # - def clear - puts "CLEAR" - return - - update = Mauve::Proto::AlertUpdate.new - update.alert = [] - update.source = "custodian" - update.replace = true - - alert = Mauve::Proto::Alert.new - alert.id = @details['test_type'] - alert.summary = "#{@details['test_host']} #{@details['test_alert']}" - alert.detail = "The #{@details['test_type']} test succeeded against #{@details['test_host']}" - alert.clear_time = Time.now.to_i - update.alert << alert - - Mauve::Sender.new("alert.bytemark.co.uk").send(update) - end - -end - - - - -# -# This class contains the code for connecting to a Beanstalk queue, -# fetching tests from it, and executing them -# -class Custodian - - # - # The beanstalk queue. - # - attr_reader :queue - - # - # How many times we re-test before we detect a failure - # - attr_reader :retry_count - - # - # The log-file object - # - attr_reader :logger - - # - # Constructor: Connect to the queue - # - def initialize( server ) - - # Connect to the queue - @queue = Beanstalk::Pool.new([server]) - - # Instantiate the logger. - @logger = Logger.new( "worker.log", "daily" ) - - if ( ENV['REPEAT'] ) - @retry_count=ENV['REPEAT'].to_i - else - @retry_count=5 - end - - log_message( "We'll run each test #{@retry_count} before alerting failures." ) - end - - - # - # Write the given message to our logfile - and show it to the console - # if we're running with '--verbose' in play - # - def log_message( msg ) - @logger.info( msg ) - puts msg if ( ENV['VERBOSE'] ) - end - - # - # Flush the queue. - # - def flush_queue! - - log_message( "Flushing queue" ) - - while( true ) - begin - job = @queue.reserve(1) - id = job.id - log_message( "Deleted job #{id}" ) - job.delete - rescue Beanstalk::TimedOut => ex - log_message( "The queue is now empty" ) - return - end - end - end - - - - # - # Process jobs from the queue - never return. - # - def run! - while( true ) - log_message( "\n" ) - log_message( "\n" ) - log_message( "Waiting for job.." ) - process_single_job() - end - end - - - - # - # Fetch a single job from the queue, and process it. - # - def process_single_job - - begin - job = @queue.reserve() - - log_message( "Job aquired - Job ID : #{job.id}" ) - - - # - # Parse the JSON of the job body. - # - json = job.body - hash = JSON.parse( json ) - hash['verbose'] = 1 if ( ENV['VERBOSE'] ) - - - # - # Output the details. - # - log_message( "Job body contains the following keys & values:") - hash.keys.each do |key| - log_message( "#{key} => #{hash[key]}" ) - end - - - - # - # Did the test succeed? If not count the number of times it failed in - # a row. We'll repeat several times - # - success = false - count = 0 - - # - # As a result of this test we'll either raise/clear with mauve. - # - # This helper will do that job. - # - alert = Alert.new( hash ) - - - # - # Convert the test-type to a class name, to do the protocol test. - # - # Given a test-type "foo" we'll attempt to instantiate a class called FOOTest. - # - test = hash['test_type'] - clazz = test.upcase - clazz = "#{clazz}Test" - - - # - # Create the test object. - # - obj = eval(clazz).new( hash ) - - - # - # Ensure that the object we load implements the two methods - # we expect. - # - if ( ( ! obj.respond_to?( "error") ) || - ( ! obj.respond_to?( "run_test" ) ) ) - puts "Class #{clazz} doesn't implement the full protocol-test API" - end - - - - # - # We'll run no more than MAX times. - # - # We stop the execution on a single success. - # - while ( ( count < @retry_count ) && ( success == false ) ) - - if ( obj.run_test() ) - log_message( "Test succeeed - clearing alert" ) - alert.clear() - success= true - end - count += 1 - end - - # - # If we didn't succeed on any of the attempts raise the alert. - # - if ( ! success ) - - # - # Raise the alert, passing the error message. - # - log_message( "Test failed - alerting with #{obj.error()}" ) - alert.raise( obj.error() ) - end - - rescue => ex - puts "Exception raised processing job: #{ex}" - - ensure - # - # Delete the job - either we received an error, in which case - # we should remove it to avoid picking it up again, or we handled - # it successfully so it should be removed. - # - log_message( "Job ID : #{job.id} - Removed" ) - job.delete if ( job ) - end - end -end - - - - - - - -# -# Entry-point to our code. -# -if __FILE__ == $0 then - - $SERVER = "127.0.0.1:11300"; - - begin - opts = GetoptLong.new( - [ "--verbose", "-v", GetoptLong::NO_ARGUMENT ], - [ "--flush", "-f", GetoptLong::NO_ARGUMENT ], - [ "--server", "-S", GetoptLong::REQUIRED_ARGUMENT ], - [ "--repeat", "-r", GetoptLong::REQUIRED_ARGUMENT ], - [ "--single", "-s", GetoptLong::NO_ARGUMENT ] - ) - opts.each do |opt, arg| - case opt - when "--verbose": - ENV["VERBOSE"] = "1" - when "--flush": - ENV["FLUSH"] = "1" - when "--repeat": - ENV["REPEAT"] = arg - when "--server": - $SERVER = arg - when "--single": - ENV["SINGLE"] = "1" - end - end - rescue StandardError => ex - puts "Option parsing failed: #{ex.to_s}" - exit - end - - # - # Create the object - # - worker = Custodian.new( $SERVER ) - - # - # Are we flushing the queue? - # - if ( ENV['FLUSH'] ) - worker.flush_queue! - exit(0) - end - - # - # Single step? - # - if ( ENV['SINGLE'] ) - worker.process_single_job - exit(0) - end - - # - # Otherwise loop indefinitely - # - worker.run! - -end -- cgit v1.2.1