diff options
| author | Patrick J Cherry <patrick@bytemark.co.uk> | 2011-06-17 12:48:53 +0100 | 
|---|---|---|
| committer | Patrick J Cherry <patrick@bytemark.co.uk> | 2011-06-17 12:48:53 +0100 | 
| commit | a2fc458d4c1bc6027760546653cb153e775576ce (patch) | |
| tree | f3af3775adab48d9a18495eeb410cf48269ffd9e /lib | |
| parent | 4b39583e63b59d73e2855d776f7cca12d734f2af (diff) | |
 * Notifications are now run in their separate threads.
 * Queues are now just arrays instead of "Queue"s
 * Updated templates to be saner.
 * Added flusing of queues when threads stop
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/mauve/alert.rb | 31 | ||||
| -rw-r--r-- | lib/mauve/mauve_thread.rb | 5 | ||||
| -rw-r--r-- | lib/mauve/notifier.rb | 25 | ||||
| -rw-r--r-- | lib/mauve/notifiers/templates/email.html.erb | 4 | ||||
| -rw-r--r-- | lib/mauve/notifiers/templates/email.txt.erb | 4 | ||||
| -rw-r--r-- | lib/mauve/notifiers/templates/xmpp.txt.erb | 6 | ||||
| -rw-r--r-- | lib/mauve/person.rb | 16 | ||||
| -rw-r--r-- | lib/mauve/processor.rb | 36 | ||||
| -rw-r--r-- | lib/mauve/server.rb | 19 | 
9 files changed, 97 insertions, 49 deletions
| diff --git a/lib/mauve/alert.rb b/lib/mauve/alert.rb index 6e5249f..86b49e3 100644 --- a/lib/mauve/alert.rb +++ b/lib/mauve/alert.rb @@ -162,9 +162,10 @@ module Mauve        raise ArgumentError unless ack_until.is_a?(Time)        self.acknowledged_by = person.username -      self.acknowledged_at = Time.now -      self.update_type = :acknowledged +      self.acknowledged_at = MauveTime.now        self.will_unacknowledge_at = ack_until +      self.update_type = :acknowledged +        logger.error("Couldn't save #{self}") unless save        AlertGroup.notify([self])      end @@ -172,7 +173,9 @@ module Mauve      def unacknowledge!        self.acknowledged_by = nil        self.acknowledged_at = nil -      self.update_type = :raised +      self.will_unacknowledge_at = nil +      self.update_type = (raised? ? :raised : :cleared) +        logger.error("Couldn't save #{self}") unless save        AlertGroup.notify([self])      end @@ -182,19 +185,27 @@ module Mauve        self.acknowledged_by = nil        self.acknowledged_at = nil        self.will_unacknowledge_at = nil -      self.will_raise_at = nil -      self.update_type = :raised        self.raised_at = MauveTime.now +      self.will_raise_at = nil        self.cleared_at = nil +      # Don't clear will_clear_at +      self.update_type = :raised +        logger.error("Couldn't save #{self}") unless save        AlertGroup.notify([self]) unless already_raised      end      def clear!(notify=true)        already_cleared = cleared? +      self.acknowledged_by = nil +      self.acknowledged_at = nil +      self.will_unacknowledge_at = nil +      self.raised_at = nil +      # Don't clear will_raise_at        self.cleared_at = MauveTime.now        self.will_clear_at = nil        self.update_type = :cleared +        logger.error("Couldn't save #{self}") unless save        AlertGroup.notify([self]) unless !notify || already_cleared      end @@ -208,8 +219,8 @@ module Mauve      end      def poll -      raise! if will_unacknowledge_at && will_unacknowledge_at.to_time <= MauveTime.now || -        will_raise_at && will_raise_at.to_time <= MauveTime.now +      raise! if (will_unacknowledge_at and will_unacknowledge_at.to_time <= MauveTime.now) or +        (will_raise_at and will_raise_at.to_time <= MauveTime.now)        clear! if will_clear_at && will_clear_at.to_time <= MauveTime.now      end @@ -338,8 +349,6 @@ module Mauve              raise_time = reception_time             end -          logger.debug("received at #{reception_time}, transmitted at #{transmission_time}, raised at #{raise_time}, clear at #{clear_time}") -            #            # Make sure there's no HTML in the ID... paranoia.  The rest of the            # HTML removal is done elsewhere. @@ -406,8 +415,6 @@ module Mauve              alert_db.subject = alert_db.source            end -          logger.debug [alert_db.source, alert_db.subject].inspect -            alert_db.summary = Alert.remove_html(alert.summary) if alert.summary && !alert.summary.empty?            # @@ -465,7 +472,7 @@ module Mauve            end          end -        logger.debug "Got #{alerts_updated.length} alerts to notify about" +        logger.debug "Got #{alerts_updated.length} alerts to notify about" if alerts_updated.length > 0          AlertGroup.notify(alerts_updated)        end diff --git a/lib/mauve/mauve_thread.rb b/lib/mauve/mauve_thread.rb index 3aba6fe..79e742a 100644 --- a/lib/mauve/mauve_thread.rb +++ b/lib/mauve/mauve_thread.rb @@ -12,7 +12,7 @@ module Mauve        @logger ||= Log4r::Logger.new(self.class.to_s)       end -    def run_thread(interval = 0.2) +    def run_thread(interval = 0.1)        #        # Good to go.        # @@ -98,7 +98,7 @@ module Mauve        self.stop        self.start      end - +          def stop        logger.debug("Stopping") @@ -117,6 +117,7 @@ module Mauve        self.join       end +      alias exit stop      def kill diff --git a/lib/mauve/notifier.rb b/lib/mauve/notifier.rb index 5c0e5a7..90df895 100644 --- a/lib/mauve/notifier.rb +++ b/lib/mauve/notifier.rb @@ -25,12 +25,24 @@ module Mauve        logger.debug("Notifier buffer is #{sz} in length")  -      (sz > 50 ? 50 : sz).times do -        person, level, alert = Server.notification_pop +      my_threads = [] +      sz.times do +        person, *args = Server.notification_pop +         +        # +        # Nil person.. that's craaazy too! +        # +        break if person.nil? +        my_threads << Thread.new { +          person.do_send_alert(*args)  +        } +      end + +      my_threads.each do |t|          begin -          person.do_send_alert(level, alert)  +          t.join          rescue StandardError => ex -          logger.debug ex.to_s +          logger.error ex.to_s            logger.debug ex.backtrace.join("\n")          end        end @@ -73,6 +85,11 @@ module Mauve        end        super + +      #  +      # flush the queue +      # +      main_loop      end    end diff --git a/lib/mauve/notifiers/templates/email.html.erb b/lib/mauve/notifiers/templates/email.html.erb index 6c323b0..46b2ebc 100644 --- a/lib/mauve/notifiers/templates/email.html.erb +++ b/lib/mauve/notifiers/templates/email.html.erb @@ -2,7 +2,7 @@     "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">  <html xmlns="http://www.w3.org/1999/xhtml"><head><title></title></head><body>  <p><strong><%= alert.update_type.upcase %>:</strong> <%  -case alert.update_type  +case alert.update_type.to_sym   when :cleared   %><%= alert.cleared_at.to_s_relative %><%   when :acknowleged  @@ -10,7 +10,7 @@ when :acknowleged  else  %><%= alert.raised_at.to_s_relative %><%   end  -%>: <%= alert.summary %><% +%>: <%= alert.subject %>: <%= alert.summary %><%  if alert.source != alert.subject   %> -- <em>from <%= alert.source %></em><%  end  diff --git a/lib/mauve/notifiers/templates/email.txt.erb b/lib/mauve/notifiers/templates/email.txt.erb index 1bc79a4..5db9bd6 100644 --- a/lib/mauve/notifiers/templates/email.txt.erb +++ b/lib/mauve/notifiers/templates/email.txt.erb @@ -1,5 +1,5 @@  <%= alert.update_type.upcase %>: <%  -case alert.update_type  +case alert.update_type.to_sym  when :cleared   %><%= alert.cleared_at.to_s_relative %><%  when :acknowleged  @@ -7,7 +7,7 @@ when :acknowleged  else  %><%= alert.raised_at.to_s_relative %><%   end  -%>: <%= alert.summary %><% +%>: <%= alert.subject %>: <%= alert.summary %><%  if alert.source != alert.subject   %> -- from <%= alert.source %><%  end  diff --git a/lib/mauve/notifiers/templates/xmpp.txt.erb b/lib/mauve/notifiers/templates/xmpp.txt.erb index eafc12b..282f370 100644 --- a/lib/mauve/notifiers/templates/xmpp.txt.erb +++ b/lib/mauve/notifiers/templates/xmpp.txt.erb @@ -1,13 +1,13 @@  <%= alert.update_type.upcase %>: <%  -case alert.update_type  +case alert.update_type.to_sym   when :cleared   %><%= alert.cleared_at.to_s_relative %><%   when :acknowleged  -%><%= alert.acknowledged_at.to_time.to_s_relative %><%  +%><%= alert.acknowledged_at.to_s_relative %><%   else  %><%= alert.raised_at.to_s_relative %><%   end  -%>: <%= alert.subject %> <%= alert.summary %><% +%>: <%= alert.subject %>: <%= alert.summary %><%  if alert.source != alert.subject   %> -- from <%= alert.source %><%  end  diff --git a/lib/mauve/person.rb b/lib/mauve/person.rb index 82845e6..2689bb3 100644 --- a/lib/mauve/person.rb +++ b/lib/mauve/person.rb @@ -8,7 +8,7 @@ module Mauve      attr_reader :notification_thresholds      def initialize(*args)  -      @notification_thresholds = { 60 => Array.new(10) } +      @notification_thresholds = { } # 60 => Array.new(10) }        @suppressed = false        super(*args)      end @@ -151,10 +151,6 @@ module Mauve      # This just wraps send_alert by sending the job to a queue.      #      def send_alert(level, alert) -      Server.notification_push([self, level, alert]) -    end - -    def do_send_alert(level, alert)        now = MauveTime.now        suppressed_changed = nil        threshold_breached = @notification_thresholds.any? do |period, previous_alert_times| @@ -185,6 +181,10 @@ module Mauve        return if suppressed? or this_alert_suppressed +      Server.notification_push([self, level, alert, suppressed_changed]) +    end + +    def do_send_alert(level, alert, suppressed_changed)        result = NotificationCaller.new(          self,          alert, @@ -198,7 +198,11 @@ module Mauve          # Remember that we've sent an alert          #          @notification_thresholds.each do |period, previous_alert_times| -          @notification_thresholds[period].replace(previous_alert_times[1..period-1] + [now]) +          # +          # Hmm.. not sure how to make this thread-safe. +          # +          @notification_thresholds[period].push MauveTime.now +          @notification_thresholds[period].shift          end          logger.info("Notification for #{username} of #{alert} at level #{level} has been successful") diff --git a/lib/mauve/processor.rb b/lib/mauve/processor.rb index a46f229..083b9a1 100644 --- a/lib/mauve/processor.rb +++ b/lib/mauve/processor.rb @@ -19,26 +19,26 @@ module Mauve        #        @transmission_id_cache = {}        @transmission_cache_expire_time = 300 -      @sleep_interval = 1 +      @transmission_cache_checked_at = Time.now      end      def main_loop -      sz = Server.packet_buffer_size - -      return if sz == 0 -   -      Timer.instance.freeze unless Timer.instance.frozen? - -      logger.info("Buffer has #{sz} packets waiting...") +      logger.info("Buffer has packets waiting...") if Server.packet_buffer_size > 0        #        # Only do the loop a maximum of 10 times every @sleep_interval seconds        # - -      (sz > 50 ? 50 : sz).times do +      10.times do          data, client, received_at = Server.packet_pop +        # +        # Uh-oh.  Nil data?  That's craaaazy +        # +        break if data.nil? +         +        Timer.instance.freeze unless Timer.instance.frozen? +          @logger.debug("Got #{data.inspect} from #{client.inspect}")          ip_source = "#{client[3]}:#{client[1]}" @@ -71,7 +71,6 @@ module Mauve          ensure            @transmission_id_cache[update.transmission_id.to_s] = MauveTime.now -          end        end @@ -85,6 +84,11 @@ module Mauve      def expire_transmission_id_cache        now = MauveTime.now +      # +      # Only check once every minute. +      # +      return unless (now - @transmission_cache_checked_at) > 60 +        to_delete = []        @transmission_id_cache.each do |tid, received_at| @@ -94,7 +98,17 @@ module Mauve        to_delete.each do |tid|          @transmission_id_cache.delete(tid)        end +       +      @transmission_cache_checked_at = now      end +    def stop +      super + +      #  +      # flush the queue +      # +      main_loop +    end    end     end diff --git a/lib/mauve/server.rb b/lib/mauve/server.rb index 2df3888..c985e15 100644 --- a/lib/mauve/server.rb +++ b/lib/mauve/server.rb @@ -53,8 +53,8 @@ module Mauve        # Keep these queues here to prevent a crash in a subthread losing all the        # subsquent things in the queue.        # -      @packet_buffer       = Queue.new -      @notification_buffer = Queue.new +      @packet_buffer       = [] +      @notification_buffer = []        @config = DEFAULT_CONFIGURATION      end @@ -115,13 +115,18 @@ module Mauve      end      def stop +      if @stop +        logger.debug("Stop already called!") +        return +      end +        @stop = true        thread_list = Thread.list         thread_list.delete(Thread.current) -      THREAD_CLASSES.reverse.each do |klass| +      THREAD_CLASSES.each do |klass|          thread_list.delete(klass.instance)          klass.instance.stop unless klass.instance.nil?        end @@ -209,11 +214,11 @@ module Mauve        # These methods pop things on and off the packet_buffer        #        def packet_enq(a) -        instance.packet_buffer.enq(a) +        instance.packet_buffer.push(a)        end        def packet_deq -        instance.packet_buffer.deq +        instance.packet_buffer.shift        end        def packet_buffer_size @@ -227,11 +232,11 @@ module Mauve        # These methods pop things on and off the notification_buffer        #        def notification_enq(a) -        instance.notification_buffer.enq(a) +        instance.notification_buffer.push(a)        end        def notification_deq -        instance.notification_buffer.deq +        instance.notification_buffer.shift        end        def notification_buffer_size | 
