From a2fc458d4c1bc6027760546653cb153e775576ce Mon Sep 17 00:00:00 2001 From: Patrick J Cherry Date: Fri, 17 Jun 2011 12:48:53 +0100 Subject: * Notifications are now run in their separate threads. * Queues are now just arrays instead of "Queue"s * Updated templates to be saner. * Added flusing of queues when threads stop --- lib/mauve/alert.rb | 31 ++++++++++++++---------- lib/mauve/mauve_thread.rb | 5 ++-- lib/mauve/notifier.rb | 25 +++++++++++++++---- lib/mauve/notifiers/templates/email.html.erb | 4 ++-- lib/mauve/notifiers/templates/email.txt.erb | 4 ++-- lib/mauve/notifiers/templates/xmpp.txt.erb | 6 ++--- lib/mauve/person.rb | 16 ++++++++----- lib/mauve/processor.rb | 36 +++++++++++++++++++--------- lib/mauve/server.rb | 19 +++++++++------ 9 files changed, 97 insertions(+), 49 deletions(-) (limited to 'lib') diff --git a/lib/mauve/alert.rb b/lib/mauve/alert.rb index 6e5249f..86b49e3 100644 --- a/lib/mauve/alert.rb +++ b/lib/mauve/alert.rb @@ -162,9 +162,10 @@ module Mauve raise ArgumentError unless ack_until.is_a?(Time) self.acknowledged_by = person.username - self.acknowledged_at = Time.now - self.update_type = :acknowledged + self.acknowledged_at = MauveTime.now self.will_unacknowledge_at = ack_until + self.update_type = :acknowledged + logger.error("Couldn't save #{self}") unless save AlertGroup.notify([self]) end @@ -172,7 +173,9 @@ module Mauve def unacknowledge! self.acknowledged_by = nil self.acknowledged_at = nil - self.update_type = :raised + self.will_unacknowledge_at = nil + self.update_type = (raised? ? :raised : :cleared) + logger.error("Couldn't save #{self}") unless save AlertGroup.notify([self]) end @@ -182,19 +185,27 @@ module Mauve self.acknowledged_by = nil self.acknowledged_at = nil self.will_unacknowledge_at = nil - self.will_raise_at = nil - self.update_type = :raised self.raised_at = MauveTime.now + self.will_raise_at = nil self.cleared_at = nil + # Don't clear will_clear_at + self.update_type = :raised + logger.error("Couldn't save #{self}") unless save AlertGroup.notify([self]) unless already_raised end def clear!(notify=true) already_cleared = cleared? + self.acknowledged_by = nil + self.acknowledged_at = nil + self.will_unacknowledge_at = nil + self.raised_at = nil + # Don't clear will_raise_at self.cleared_at = MauveTime.now self.will_clear_at = nil self.update_type = :cleared + logger.error("Couldn't save #{self}") unless save AlertGroup.notify([self]) unless !notify || already_cleared end @@ -208,8 +219,8 @@ module Mauve end def poll - raise! if will_unacknowledge_at && will_unacknowledge_at.to_time <= MauveTime.now || - will_raise_at && will_raise_at.to_time <= MauveTime.now + raise! if (will_unacknowledge_at and will_unacknowledge_at.to_time <= MauveTime.now) or + (will_raise_at and will_raise_at.to_time <= MauveTime.now) clear! if will_clear_at && will_clear_at.to_time <= MauveTime.now end @@ -338,8 +349,6 @@ module Mauve raise_time = reception_time end - logger.debug("received at #{reception_time}, transmitted at #{transmission_time}, raised at #{raise_time}, clear at #{clear_time}") - # # Make sure there's no HTML in the ID... paranoia. The rest of the # HTML removal is done elsewhere. @@ -406,8 +415,6 @@ module Mauve alert_db.subject = alert_db.source end - logger.debug [alert_db.source, alert_db.subject].inspect - alert_db.summary = Alert.remove_html(alert.summary) if alert.summary && !alert.summary.empty? # @@ -465,7 +472,7 @@ module Mauve end end - logger.debug "Got #{alerts_updated.length} alerts to notify about" + logger.debug "Got #{alerts_updated.length} alerts to notify about" if alerts_updated.length > 0 AlertGroup.notify(alerts_updated) end diff --git a/lib/mauve/mauve_thread.rb b/lib/mauve/mauve_thread.rb index 3aba6fe..79e742a 100644 --- a/lib/mauve/mauve_thread.rb +++ b/lib/mauve/mauve_thread.rb @@ -12,7 +12,7 @@ module Mauve @logger ||= Log4r::Logger.new(self.class.to_s) end - def run_thread(interval = 0.2) + def run_thread(interval = 0.1) # # Good to go. # @@ -98,7 +98,7 @@ module Mauve self.stop self.start end - + def stop logger.debug("Stopping") @@ -117,6 +117,7 @@ module Mauve self.join end + alias exit stop def kill diff --git a/lib/mauve/notifier.rb b/lib/mauve/notifier.rb index 5c0e5a7..90df895 100644 --- a/lib/mauve/notifier.rb +++ b/lib/mauve/notifier.rb @@ -25,12 +25,24 @@ module Mauve logger.debug("Notifier buffer is #{sz} in length") - (sz > 50 ? 50 : sz).times do - person, level, alert = Server.notification_pop + my_threads = [] + sz.times do + person, *args = Server.notification_pop + + # + # Nil person.. that's craaazy too! + # + break if person.nil? + my_threads << Thread.new { + person.do_send_alert(*args) + } + end + + my_threads.each do |t| begin - person.do_send_alert(level, alert) + t.join rescue StandardError => ex - logger.debug ex.to_s + logger.error ex.to_s logger.debug ex.backtrace.join("\n") end end @@ -73,6 +85,11 @@ module Mauve end super + + # + # flush the queue + # + main_loop end end diff --git a/lib/mauve/notifiers/templates/email.html.erb b/lib/mauve/notifiers/templates/email.html.erb index 6c323b0..46b2ebc 100644 --- a/lib/mauve/notifiers/templates/email.html.erb +++ b/lib/mauve/notifiers/templates/email.html.erb @@ -2,7 +2,7 @@ "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">

<%= alert.update_type.upcase %>: <% -case alert.update_type +case alert.update_type.to_sym when :cleared %><%= alert.cleared_at.to_s_relative %><% when :acknowleged @@ -10,7 +10,7 @@ when :acknowleged else %><%= alert.raised_at.to_s_relative %><% end -%>: <%= alert.summary %><% +%>: <%= alert.subject %>: <%= alert.summary %><% if alert.source != alert.subject %> -- from <%= alert.source %><% end diff --git a/lib/mauve/notifiers/templates/email.txt.erb b/lib/mauve/notifiers/templates/email.txt.erb index 1bc79a4..5db9bd6 100644 --- a/lib/mauve/notifiers/templates/email.txt.erb +++ b/lib/mauve/notifiers/templates/email.txt.erb @@ -1,5 +1,5 @@ <%= alert.update_type.upcase %>: <% -case alert.update_type +case alert.update_type.to_sym when :cleared %><%= alert.cleared_at.to_s_relative %><% when :acknowleged @@ -7,7 +7,7 @@ when :acknowleged else %><%= alert.raised_at.to_s_relative %><% end -%>: <%= alert.summary %><% +%>: <%= alert.subject %>: <%= alert.summary %><% if alert.source != alert.subject %> -- from <%= alert.source %><% end diff --git a/lib/mauve/notifiers/templates/xmpp.txt.erb b/lib/mauve/notifiers/templates/xmpp.txt.erb index eafc12b..282f370 100644 --- a/lib/mauve/notifiers/templates/xmpp.txt.erb +++ b/lib/mauve/notifiers/templates/xmpp.txt.erb @@ -1,13 +1,13 @@ <%= alert.update_type.upcase %>: <% -case alert.update_type +case alert.update_type.to_sym when :cleared %><%= alert.cleared_at.to_s_relative %><% when :acknowleged -%><%= alert.acknowledged_at.to_time.to_s_relative %><% +%><%= alert.acknowledged_at.to_s_relative %><% else %><%= alert.raised_at.to_s_relative %><% end -%>: <%= alert.subject %> <%= alert.summary %><% +%>: <%= alert.subject %>: <%= alert.summary %><% if alert.source != alert.subject %> -- from <%= alert.source %><% end diff --git a/lib/mauve/person.rb b/lib/mauve/person.rb index 82845e6..2689bb3 100644 --- a/lib/mauve/person.rb +++ b/lib/mauve/person.rb @@ -8,7 +8,7 @@ module Mauve attr_reader :notification_thresholds def initialize(*args) - @notification_thresholds = { 60 => Array.new(10) } + @notification_thresholds = { } # 60 => Array.new(10) } @suppressed = false super(*args) end @@ -151,10 +151,6 @@ module Mauve # This just wraps send_alert by sending the job to a queue. # def send_alert(level, alert) - Server.notification_push([self, level, alert]) - end - - def do_send_alert(level, alert) now = MauveTime.now suppressed_changed = nil threshold_breached = @notification_thresholds.any? do |period, previous_alert_times| @@ -185,6 +181,10 @@ module Mauve return if suppressed? or this_alert_suppressed + Server.notification_push([self, level, alert, suppressed_changed]) + end + + def do_send_alert(level, alert, suppressed_changed) result = NotificationCaller.new( self, alert, @@ -198,7 +198,11 @@ module Mauve # Remember that we've sent an alert # @notification_thresholds.each do |period, previous_alert_times| - @notification_thresholds[period].replace(previous_alert_times[1..period-1] + [now]) + # + # Hmm.. not sure how to make this thread-safe. + # + @notification_thresholds[period].push MauveTime.now + @notification_thresholds[period].shift end logger.info("Notification for #{username} of #{alert} at level #{level} has been successful") diff --git a/lib/mauve/processor.rb b/lib/mauve/processor.rb index a46f229..083b9a1 100644 --- a/lib/mauve/processor.rb +++ b/lib/mauve/processor.rb @@ -19,26 +19,26 @@ module Mauve # @transmission_id_cache = {} @transmission_cache_expire_time = 300 - @sleep_interval = 1 + @transmission_cache_checked_at = Time.now end def main_loop - sz = Server.packet_buffer_size - - return if sz == 0 - - Timer.instance.freeze unless Timer.instance.frozen? - - logger.info("Buffer has #{sz} packets waiting...") + logger.info("Buffer has packets waiting...") if Server.packet_buffer_size > 0 # # Only do the loop a maximum of 10 times every @sleep_interval seconds # - - (sz > 50 ? 50 : sz).times do + 10.times do data, client, received_at = Server.packet_pop + # + # Uh-oh. Nil data? That's craaaazy + # + break if data.nil? + + Timer.instance.freeze unless Timer.instance.frozen? + @logger.debug("Got #{data.inspect} from #{client.inspect}") ip_source = "#{client[3]}:#{client[1]}" @@ -71,7 +71,6 @@ module Mauve ensure @transmission_id_cache[update.transmission_id.to_s] = MauveTime.now - end end @@ -85,6 +84,11 @@ module Mauve def expire_transmission_id_cache now = MauveTime.now + # + # Only check once every minute. + # + return unless (now - @transmission_cache_checked_at) > 60 + to_delete = [] @transmission_id_cache.each do |tid, received_at| @@ -94,7 +98,17 @@ module Mauve to_delete.each do |tid| @transmission_id_cache.delete(tid) end + + @transmission_cache_checked_at = now end + def stop + super + + # + # flush the queue + # + main_loop + end end end diff --git a/lib/mauve/server.rb b/lib/mauve/server.rb index 2df3888..c985e15 100644 --- a/lib/mauve/server.rb +++ b/lib/mauve/server.rb @@ -53,8 +53,8 @@ module Mauve # Keep these queues here to prevent a crash in a subthread losing all the # subsquent things in the queue. # - @packet_buffer = Queue.new - @notification_buffer = Queue.new + @packet_buffer = [] + @notification_buffer = [] @config = DEFAULT_CONFIGURATION end @@ -115,13 +115,18 @@ module Mauve end def stop + if @stop + logger.debug("Stop already called!") + return + end + @stop = true thread_list = Thread.list thread_list.delete(Thread.current) - THREAD_CLASSES.reverse.each do |klass| + THREAD_CLASSES.each do |klass| thread_list.delete(klass.instance) klass.instance.stop unless klass.instance.nil? end @@ -209,11 +214,11 @@ module Mauve # These methods pop things on and off the packet_buffer # def packet_enq(a) - instance.packet_buffer.enq(a) + instance.packet_buffer.push(a) end def packet_deq - instance.packet_buffer.deq + instance.packet_buffer.shift end def packet_buffer_size @@ -227,11 +232,11 @@ module Mauve # These methods pop things on and off the notification_buffer # def notification_enq(a) - instance.notification_buffer.enq(a) + instance.notification_buffer.push(a) end def notification_deq - instance.notification_buffer.deq + instance.notification_buffer.shift end def notification_buffer_size -- cgit v1.2.1