From b22cbc87927553f6dbb5754281e95fe9bad2eed1 Mon Sep 17 00:00:00 2001 From: Patrick J Cherry Date: Mon, 13 Jun 2011 11:02:37 +0100 Subject: * Tidied up mauveserver to handle HUP restarts * Added HTML santizing to the alert class, so bad HTML is stripped as part of processing. * Alert#cleared? now means "not raised" * Better error handling in the Timer class, making sure that the timer never gets permanently frozen. * Moved notification and packet buffers to the Server class, meaning that if the Processor or Notifier threads crash, we don't lose all the items waiting to be processed/notified. * XMPP/Email Alerts now use templates, instead of instance methods. * Emails now get sent as multipart with HTML to allow detail fields to be shown as nature intended. --- bin/mauveserver | 26 +++--- bytemark_example_alerts.sh | 25 +++--- lib/mauve/alert.rb | 91 ++++++++++++++------- lib/mauve/alert_group.rb | 3 + lib/mauve/configuration.rb | 7 +- lib/mauve/mauve_thread.rb | 18 +++-- lib/mauve/mauve_time.rb | 18 +++++ lib/mauve/notification.rb | 5 +- lib/mauve/notifier.rb | 23 ++---- lib/mauve/notifiers/email.rb | 70 ++++------------ lib/mauve/notifiers/templates/email.html.erb | 28 +++++++ lib/mauve/notifiers/templates/email.txt.erb | 24 ++++++ lib/mauve/notifiers/templates/sms.txt.erb | 1 + lib/mauve/notifiers/templates/xmpp.html.erb | 0 lib/mauve/notifiers/templates/xmpp.txt.erb | 15 +++- lib/mauve/notifiers/xmpp.rb | 23 +++--- lib/mauve/person.rb | 16 +--- lib/mauve/processor.rb | 39 ++++----- lib/mauve/server.rb | 116 ++++++++++++++++++++++----- lib/mauve/timer.rb | 5 -- lib/mauve/udp_server.rb | 11 +-- 21 files changed, 342 insertions(+), 222 deletions(-) mode change 100644 => 120000 lib/mauve/notifiers/templates/sms.txt.erb delete mode 100644 lib/mauve/notifiers/templates/xmpp.html.erb diff --git a/bin/mauveserver b/bin/mauveserver index 57a2299..baeee75 100755 --- a/bin/mauveserver +++ b/bin/mauveserver @@ -9,12 +9,9 @@ rescue SyntaxError => no_blocks_with_procs end require 'mauve/configuration' -include Mauve configuration_file = ARGV[0] - configuration_file = [".", "/etc/mauvealert/"].find{|d| File.file?(File.join(d,"mauveserver.conf")) } if configuration_file.nil? - configuration_file = File.expand_path(configuration_file) unless File.file?(configuration_file) @@ -22,26 +19,35 @@ unless File.file?(configuration_file) Kernel.exit 1 end -Configuration.current = ConfigurationBuilder.load(configuration_file) +Mauve::Configuration.current = Mauve::ConfigurationBuilder.load(configuration_file) %w(HUP).each do |sig| trap("HUP") do # this blows up if you do it twice in quick succession, but don't really # care about that case as it's only for log rotation. - Configuration.current.logger.warn "#{sig} signal received. Restarting." - Configuration.current.server.stop + Mauve::Server.instance.logger.warn "#{sig} signal received. Stopping." + Mauve::Server.instance.stop + # # Reload configuration # - Configuration.current = ConfigurationBuilder.load(configuration_file) - Configuration.current.server.start + Mauve::Server.instance.logger.warn "Restarting." + + begin + new_config = Mauve::ConfigurationBuilder.load(configuration_file) + Mauve::Configuration.current = new_config + rescue BuildException => ex + Mauve::Server.instance.logger.warn "Reconfiguration failed: #{ex}. Sticking with old one." + end + Mauve::Server.instance.logger.warn "Restarting." + Mauve::Server.instance.start end end %w(QUIT TERM INT).each do |sig| trap(sig) do - Configuration.current.logger.warn "#{sig} signal received. Exiting." - Configuration.current.server.stop + Mauve::Server.instance.logger.warn "#{sig} signal received. Stopping." + Mauve::Server.instance.stop exit 0 end end diff --git a/bytemark_example_alerts.sh b/bytemark_example_alerts.sh index 6e90c5c..916ffb5 100755 --- a/bytemark_example_alerts.sh +++ b/bytemark_example_alerts.sh @@ -5,13 +5,15 @@ PRE="ruby -I lib ./bin/mauveclient 127.0.0.1 " $PRE -o supportbot -i 173123 \ -s "My server is not responding" \ -d "From: John Smith <john@smith.name>
-#To: support@support.bytemark.co.uk
-#
-#
It has been several hours now since I have been able to contact my server
-#foo.bar.bytemark.co.uk.  I am very upset that blah blah blah blah
-#and furthermore by business is under threat because £15.00 per month
-#is far too much blah blah blah
-#" +To: support@support.bytemark.co.uk
+
+
It has been several hours now since I have been able to contact my server
+foo.bar.bytemark.co.uk.  I am very upset that blah blah blah blah
+and furthermore by business is under threat because £15.00 per month
+is far too much blah blah blah
+ +

Brokent

html. +" $PRE -o networkmonitor -i 1 -u cr01.man.bytemark.co.uk \ -s "cr01.man.bytemark.co.uk did not respond to pings" @@ -20,13 +22,12 @@ $PRE -o networkmonitor -i 2 -u cr01.thn.bytemark.co.uk \ -s "cr02.man.bytemark.co.uk refused SSH connection" \ -d "
ssh: connect to host localhost port 1212: Connection refused
" -$PRE -o vmhs -i 12346 -u ventham.bytemark.co.uk \ - -s "ventham.bytemark.co.uk heartbeat not received" -r +5 - +$PRE -o ventham.bytemark.co.uk -i heartbeat -r now -s "heartbeat failed for ventham.bytemark.co.uk" --detail="

The heartbeat wasn't sent for the host ventham.bytemark.co.uk

This indicates that the host might be down

" >/dev/null -$PRE -o vmhs -i 12345 -u partridge.bytemark.co.uk \ - -s "partridge.bytemark.co.uk heartbeat not received" -r +10 -c now +$PRE -o networkmonitor -i ping-ventham -u ventham.bytemark.co.uk -r +10m -s "ping failed for ventham.bytemark.co.uk" +$PRE -o vmhs -i 12345 -u partridge.bytemark.co.uk \ + -s "partridge.bytemark.co.uk heartbeat not received" -r +10m -c now $PRE -o vmhs -i 12347 -u eider.bytemark.co.uk \ -s "eider.bytemark.co.uk heartbeat not received" -r +2 diff --git a/lib/mauve/alert.rb b/lib/mauve/alert.rb index b98866c..d0f134b 100644 --- a/lib/mauve/alert.rb +++ b/lib/mauve/alert.rb @@ -1,7 +1,7 @@ require 'mauve/proto' require 'mauve/alert_changed' require 'mauve/datamapper' - +require 'sanitize' module Mauve class AlertEarliestDate @@ -172,16 +172,14 @@ module Mauve self.alert_group.level end - def subject - attribute_get(:subject) || source - end - - def subject=(subject); set_changed_if_different(:subject, subject); end - def summary=(summary); set_changed_if_different(:summary, summary); end -# def detail=(detail); set_changed_if_different(:detail, detail); end - def detail=(detail); attribute_set(:detail, detail) ; end + def subject=(subject); set_changed_if_different( :subject, subject ); end + def summary=(summary); set_changed_if_different( :summary, summary ); end + + # def source=(source); attribute_set( :source, source ); end + # def detail=(detail); attribute_set( :detail, detail ); end protected + def set_changed_if_different(attribute, value) return if self.__send__(attribute) == value self.update_type ||= :changed @@ -244,7 +242,7 @@ module Mauve end def raised? - !raised_at.nil? && cleared_at.nil? + !raised_at.nil? and (cleared_at.nil? or raised_at > cleared_at) end def acknowledged? @@ -252,11 +250,32 @@ module Mauve end def cleared? - new? || !cleared_at.nil? + !raised? end class << self + # + # Utility methods to clean/remove html + # + def remove_html(txt) + Sanitize.clean( + txt.to_s, + Sanitize::Config::DEFAULT + ) + end + + def clean_html(txt) + Sanitize.clean( + txt.to_s, + Sanitize::Config::RELAXED.merge({:remove_contents => true}) + ) + end + + # + # Find stuff + # + # def all_raised all(:raised_at.not => nil, :cleared_at => nil) end @@ -341,20 +360,28 @@ module Mauve logger.debug("received at #{reception_time}, transmitted at #{transmission_time}, raised at #{raise_time}, clear at #{clear_time}") - do_clear = clear_time && clear_time <= reception_time - do_raise = raise_time && raise_time <= reception_time - + # + # Make sure there's no HTML in the ID... paranoia. The rest of the + # HTML removal is done elsewhere. + # + alert.id = Alert.remove_html(alert.id) + alert_db = first(:alert_id => alert.id, :source => update.source) || new(:alert_id => alert.id, :source => update.source) - - pre_raised = alert_db.raised? - pre_cleared = alert_db.cleared? - pre_acknowledged = alert_db.acknowledged? + + # + # Work out what state the alert was in before receiving this update. + # + was_raised = alert_db.raised? + was_cleared = alert_db.cleared? + was_acknowledged = alert_db.acknowledged? alert_db.update_type = nil ## # + # Work out if we're raising now, or in the future. + # # Allow a 5s offset in timings. # if raise_time @@ -372,9 +399,9 @@ module Mauve alert_db.will_clear_at = clear_time end end - + # - # Re-raise if raised_at and cleared_at are set. + # Clear old cleared_at time, if the raised_at time is newer # if alert_db.cleared_at && alert_db.raised_at && alert_db.cleared_at < alert_db.raised_at alert_db.cleared_at = nil @@ -383,24 +410,30 @@ module Mauve # # # - if (pre_raised or pre_cleared) && alert_db.raised? - alert_db.update_type = :raised - elsif pre_raised && alert_db.cleared? + if alert_db.cleared? alert_db.update_type = :cleared + else + alert_db.update_type = :raised end + # # Changing any of these attributes causes the alert to be sent back # out to the notification system with an update_type of :changed. # - alert_db.subject = alert.subject if alert.subject && !alert.subject.empty? - alert_db.summary = alert.summary if alert.summary && !alert.summary.empty? - alert_db.detail = alert.detail if alert.detail && !alert.detail.empty? + # Each of these should be just text, no HTML, so remove any tags we + # find. + # + + alert_db.subject = Alert.remove_html(alert.subject) if alert.subject && !alert.subject.empty? + alert_db.summary = Alert.remove_html(alert.summary) if alert.summary && !alert.summary.empty? - # These updates happen but do not sent the alert back to the - # notification system. # + # The detail can be HTML -- scrub out unwanted parts. + # + alert_db.detail = Alert.clean_html(alert.detail) if alert.detail && !alert.detail.empty? + alert_db.importance = alert.importance if alert.importance != 0 - + # FIXME: this logic ought to be clearer as it may get more complicated # if alert_db.update_type diff --git a/lib/mauve/alert_group.rb b/lib/mauve/alert_group.rb index 288b263..adb9909 100644 --- a/lib/mauve/alert_group.rb +++ b/lib/mauve/alert_group.rb @@ -97,6 +97,9 @@ module Mauve # has undergone a significant change. We resend this to every notify list. # def notify(alert) + # + # The notifications are specified in the config file. + # notifications.each do |notification| notification.alert_changed(alert) end diff --git a/lib/mauve/configuration.rb b/lib/mauve/configuration.rb index 4b7717d..66dfabf 100644 --- a/lib/mauve/configuration.rb +++ b/lib/mauve/configuration.rb @@ -73,13 +73,8 @@ module Mauve @alert_groups = [] @source_lists = SourceList.new() @logger = Log4r::Logger.new("Mauve") - end - def close - server.close - end - end class LoggerOutputterBuilder < ObjectBuilder @@ -442,7 +437,7 @@ module Mauve end def created_server(server) - raise ArgumentError.new("Only one 'server' clause can be specified") if + raise BuildError.new("Only one 'server' clause can be specified") if @result.server @result.server = server end diff --git a/lib/mauve/mauve_thread.rb b/lib/mauve/mauve_thread.rb index f40c79c..bd036db 100644 --- a/lib/mauve/mauve_thread.rb +++ b/lib/mauve/mauve_thread.rb @@ -57,16 +57,19 @@ module Mauve logger.debug("Thread has not frozen!") unless @thread.stop? end + def frozen? + @frozen and @thread.stop? + end + def thaw logger.debug("Thawing") - @frozen = false - @thread.wakeup if @thread.stop? end def start logger.debug("Starting") + @stop = false @thread = Thread.new{ self.run_thread { self.main_loop } } end @@ -83,12 +86,7 @@ module Mauve end def join(ok_exceptions=[]) - begin - @thread.join if @thread.is_a?(Thread) - rescue StandardError => err - logger.debug "#{err.to_s} #{err.class}" - Kernel.raise err unless ok_exceptions.any?{|e| err.is_a?(e)} - end + @thread.join if @thread.is_a?(Thread) end def raise(ex) @@ -127,6 +125,10 @@ module Mauve logger.debug("Killed") end + def thread + @thread + end + end end diff --git a/lib/mauve/mauve_time.rb b/lib/mauve/mauve_time.rb index bf69d1b..504e6ae 100644 --- a/lib/mauve/mauve_time.rb +++ b/lib/mauve/mauve_time.rb @@ -9,6 +9,24 @@ module Mauve self.iso8601 end + def to_s_relative(now = MauveTime.now) + diff = (self.to_f - now.to_f).to_i + case diff + when -5184000..-17200 then "in #{-diff/86400} days" + when -172799..-3600 then "in #{-diff/3600} hours" + when -3599..-300 then "in #{-diff/60} minutes" + when -299..-1 then "very soon" + when 0..299 then "just now" + when 300..3599 then "#{diff/60} minutes ago" + when 3600..172799 then "#{diff/3600} hours ago" + when 172800..5184000 then "#{diff/86400} days ago" + else + diff > 518400 ? + "#{diff/2592000} months ago" : + "in #{-diff/2592000} months" + end + end + end end diff --git a/lib/mauve/notification.rb b/lib/mauve/notification.rb index 02bf6fd..089180b 100644 --- a/lib/mauve/notification.rb +++ b/lib/mauve/notification.rb @@ -119,7 +119,6 @@ module Mauve end def logger ; Log4r::Logger.new self.class.to_s ; end - # Updated code, now takes account of lists of people. # @@ -142,10 +141,10 @@ module Mauve return end - # Should we notificy at all? + # Should we notify at all? is_relevant = DuringRunner.new(MauveTime.now, alert, &during).now? - to_notify = people.collect do |person| + people.collect do |person| case person when Person person diff --git a/lib/mauve/notifier.rb b/lib/mauve/notifier.rb index 0127b6b..ce8c2b0 100644 --- a/lib/mauve/notifier.rb +++ b/lib/mauve/notifier.rb @@ -10,23 +10,23 @@ module Mauve include Singleton - attr_accessor :buffer, :sleep_interval + attr_accessor :sleep_interval def initialize - @buffer = Queue.new end def main_loop - # # Cycle through the buffer. # - sz = @buffer.size + sz = Server.notification_buffer_size + + return if sz == 0 - logger.debug("Notifier buffer is #{sz} in length") if sz > 1 + logger.debug("Notifier buffer is #{sz} in length") (sz > 10 ? 10 : sz).times do - person, level, alert = @buffer.pop + person, level, alert = Server.notification_pop begin person.do_send_alert(level, alert) rescue StandardError => ex @@ -34,7 +34,6 @@ module Mauve logger.debug ex.backtrace.join("\n") end end - end def start @@ -76,16 +75,6 @@ module Mauve super end - class << self - - def enq(a) - instance.buffer.enq(a) - end - - alias push enq - - end - end end diff --git a/lib/mauve/notifiers/email.rb b/lib/mauve/notifiers/email.rb index f3b9a0f..168a9d6 100644 --- a/lib/mauve/notifiers/email.rb +++ b/lib/mauve/notifiers/email.rb @@ -66,70 +66,30 @@ module Mauve @suppressed_changed = conditions[:suppressed_changed] end - other_alerts = all_alerts - [alert] - m = RMail::Message.new - m.header.subject = subject_prefix + - case @suppressed_changed - when true - "Suppressing notifications (#{all_alerts.length} total)" - - else - alert.summary_one_line.to_s - end + m.header.subject = "Arse" m.header.to = destination m.header.from = @from m.header.date = MauveTime.now + m.header['Content-Type'] = "multipart/alternative" - summary_formatted = " * "+alert.summary_two_lines.join("\n ") - - case alert.update_type.to_sym - when :cleared - m.body = "An alert has been cleared:\n"+summary_formatted+"\n\n" - when :raised - m.body = "An alert has been raised:\n"+summary_formatted+"\n\n" - when :acknowledged - m.body = "An alert has been acknowledged by #{alert.acknowledged_by}:\n"+summary_formatted+"\n\n" - when :changed - m.body = "An alert has changed in nature:\n"+summary_formatted+"\n\n" - else - raise ArgumentError.new("Unknown update_type #{alert.update_type}") + txt_template = File.join(File.dirname(__FILE__), "templates", "email.txt.erb") + if File.exists?(txt_template) + txt = RMail::Message.new + txt.header['Content-Type'] = "text/plain; charset=\"utf-8\"" + txt.body = ERB.new(File.read(txt_template)).result(binding).chomp + m.add_part(txt) end - - # FIXME: include alert.detail as multipart mime - ##Thread.abort_on_exception = true - m.body += "\n" + '-'*10 + " This is the detail field " + '-'*44 + "\n\n" - m.body += alert.detail.to_s -#' m.body += alert.get_details_plain_text() - m.body += "\n" + '-'*80 + "\n\n" - - if @suppressed_changed == true - m.body += <<-END -IMPORTANT: I've been configured to suppress notification of individual changes -to alerts until their rate decreases. If you still need notification of evrey -single alert, you must watch the web front-end instead. - END - elsif @suppressed_changed == false - m.body += "(Notifications have slowed down - you will now be notified of every change)\n\n" - end - - if other_alerts.empty? - m.body += (alert.update_type == :cleared ? "That was" : "This is")+ - " currently the only alert outstanding\n\n" - else - m.body += other_alerts.length == 1 ? - "There is currently one other alert outstanding:\n\n" : - "There are currently #{other_alerts.length} other alerts outstanding:\n\n" - - other_alerts.each do |other| - m.body += " * "+other.summary_two_lines.join("\n ")+"\n\n" - end + html_template = File.join(File.dirname(__FILE__), "templates", "email.html.erb") + if File.exists?(html_template) + html = RMail::Message.new + html.header['Content-Type'] = "text/html; charset=\"utf-8\"" + html.body = ERB.new(File.read(html_template)).result(binding).chomp + m.add_part(html) end - - m.body += "-- \n"+@signature - + m.to_s end include Debug diff --git a/lib/mauve/notifiers/templates/email.html.erb b/lib/mauve/notifiers/templates/email.html.erb index e69de29..fcf0620 100644 --- a/lib/mauve/notifiers/templates/email.html.erb +++ b/lib/mauve/notifiers/templates/email.html.erb @@ -0,0 +1,28 @@ +

<%= alert.update_type.upcase %> <% +case alert.update_type +when :cleared +%><%= MauveTime.now.to_s_relative(alert.cleared_at.to_time) %><% +when :acknowleged +%><%= MauveTime.now.to_s_relative(alert.acknowledged_at.to_time) %><% +else +%><%= MauveTime.now.to_s_relative(alert.raised_at.to_time) %><% +end +%>: <%= alert.summary %><% +if alert.source != alert.subject +%> -- from <%= alert.source %><% +end +%>. + +


+ +
+<%=alert.detail %> +
+ +
+ +
+--
+Love mauve
+xxx. +
diff --git a/lib/mauve/notifiers/templates/email.txt.erb b/lib/mauve/notifiers/templates/email.txt.erb index e69de29..8f7e9a4 100644 --- a/lib/mauve/notifiers/templates/email.txt.erb +++ b/lib/mauve/notifiers/templates/email.txt.erb @@ -0,0 +1,24 @@ +<%= alert.update_type.upcase %>: <% +case alert.update_type +when :cleared +%><%= MauveTime.now.to_s_relative(alert.cleared_at.to_time) %><% +when :acknowleged +%><%= MauveTime.now.to_s_relative(alert.acknowledged_at.to_time) %><% +else +%><%= MauveTime.now.to_s_relative(alert.raised_at.to_time) %><% +end +%>: <%= alert.summary %><% +if alert.source != alert.subject +%> -- from <%= alert.source %><% +end +%>. + +-- Details ------------------------------------------------------------ + +<%= Sanitize.clean(alert.detail) %> + +----------------------------------------------------------------------- + +-- +Love mauve. +xx diff --git a/lib/mauve/notifiers/templates/sms.txt.erb b/lib/mauve/notifiers/templates/sms.txt.erb deleted file mode 100644 index e69de29..0000000 diff --git a/lib/mauve/notifiers/templates/sms.txt.erb b/lib/mauve/notifiers/templates/sms.txt.erb new file mode 120000 index 0000000..802c711 --- /dev/null +++ b/lib/mauve/notifiers/templates/sms.txt.erb @@ -0,0 +1 @@ +xmpp.txt.erb \ No newline at end of file diff --git a/lib/mauve/notifiers/templates/xmpp.html.erb b/lib/mauve/notifiers/templates/xmpp.html.erb deleted file mode 100644 index e69de29..0000000 diff --git a/lib/mauve/notifiers/templates/xmpp.txt.erb b/lib/mauve/notifiers/templates/xmpp.txt.erb index 881c197..22e2d8c 100644 --- a/lib/mauve/notifiers/templates/xmpp.txt.erb +++ b/lib/mauve/notifiers/templates/xmpp.txt.erb @@ -1 +1,14 @@ -<%=arse %> +<%= alert.update_type.upcase %>: <% +case alert.update_type +when :cleared +%><%= MauveTime.now.to_s_relative(alert.cleared_at.to_time) %><% +when :acknowleged +%><%= MauveTime.now.to_s_relative(alert.acknowledged_at.to_time) %><% +else +%><%= MauveTime.now.to_s_relative(alert.raised_at.to_time) %><% +end +%>: <%= alert.summary %><% +if alert.source != alert.subject +%> -- from <%= alert.source %><% +end +%>. diff --git a/lib/mauve/notifiers/xmpp.rb b/lib/mauve/notifiers/xmpp.rb index fbc9640..4c30643 100644 --- a/lib/mauve/notifiers/xmpp.rb +++ b/lib/mauve/notifiers/xmpp.rb @@ -165,18 +165,10 @@ module Mauve end end + # # Takes an alert and converts it into a message. # - # @param [Alert] alert The alert to convert. - # @return [String] The message, either as HTML. def convert_alert_to_message(alert) - arr = alert.summary_three_lines - str = arr[0] + ": " + arr[1] - str += " -- " + arr[2] if false == arr[2].nil? - str += "." - return str - #return alert.summary_two_lines.join(" -- ") - #return "

" + alert.summary_two_lines.join("
") + "

" end # Attempt to send an alert using XMPP. @@ -201,8 +193,17 @@ module Mauve logger.info("Alert conditions not met, not sending XMPP alert to #{destination_jid}") return false end + + template_file = File.join(File.dirname(__FILE__),"templates","xmpp.txt.erb") + + txt = if File.exists?(template_file) + ERB.new(File.read(template_file)).result(binding).chomp + else + logger.error("Could not find xmpp.txt.erb template") + alert.to_s + end - send_message(destination_jid, convert_alert_to_message(alert)) + send_message(destination_jid, txt) end # Sends a message to the destionation. @@ -413,7 +414,7 @@ module Mauve if @mucs[msg.from.strip].is_a?(MUC::MUCClient) and msg.from != @mucs[msg.from.strip].jid and msg.x("jabber:x:delay") == nil and - (msg.body =~ /\b#{Regexp.escape(@client.jid.resource)}\b/i or + (msg.body =~ /\b#{Regexp.escape(@mucs[msg.from.strip].jid.resource)}\b/i or msg.body =~ /\b#{Regexp.escape(@client.jid.node)}\b/i) receive_normal_message(msg) end diff --git a/lib/mauve/person.rb b/lib/mauve/person.rb index 42b6baf..82845e6 100644 --- a/lib/mauve/person.rb +++ b/lib/mauve/person.rb @@ -139,19 +139,7 @@ module Mauve last_change.was_relevant = true if false == last_change.nil? end - # Send the notification is need be. - if !last_change || last_change.update_type.to_sym == :cleared - # Person has never heard of this alert before, or previously cleared. - # - # We don't send any alert if such a change isn't relevant to this - # Person at this time. - send_alert(level, alert) if is_relevant and [:raised, :changed].include?(alert.update_type.to_sym) - - else - # Relevance is determined by whether the user heard of this alert - # being raised. - send_alert(level, alert) if last_change.was_relevant_when_raised? - end + send_alert(level, alert ) # if last_change.was_relevant_when_raised? end def remind(alert, level) @@ -163,7 +151,7 @@ module Mauve # This just wraps send_alert by sending the job to a queue. # def send_alert(level, alert) - Notifier.push([self, level, alert]) + Server.notification_push([self, level, alert]) end def do_send_alert(level, alert) diff --git a/lib/mauve/processor.rb b/lib/mauve/processor.rb index 414f640..86aaaec 100644 --- a/lib/mauve/processor.rb +++ b/lib/mauve/processor.rb @@ -8,12 +8,9 @@ module Mauve include Singleton - attr_accessor :buffer, :transmission_cache_expire_time, :sleep_interval + attr_accessor :transmission_cache_expire_time, :sleep_interval def initialize - # Buffer for UDP socket packets. - @buffer = Queue.new - # Set the logger up @logger = Log4r::Logger.new(self.class.to_s) @@ -27,23 +24,20 @@ module Mauve def main_loop - sz = @buffer.size + sz = Server.packet_buffer_size return if sz == 0 - Timer.instance.freeze + Timer.instance.freeze unless Timer.instance.frozen? logger.info("Buffer has #{sz} packets waiting...") - triplets = [] # # Only do the loop a maximum of 10 times every @sleep_interval seconds # - (sz > 20 ? 20 : sz).times do - triplets += @buffer.deq - end - triplets.each do |data, client, received_at| + (sz > 10 ? 10 : sz).times do + data, client, received_at = Server.packet_pop @logger.debug("Got #{data.inspect} from #{client.inspect}") @@ -52,7 +46,7 @@ module Mauve begin update.parse_from_string(data) - + if @transmission_id_cache[update.transmission_id.to_s] @logger.debug("Ignoring duplicate transmission id #{data.transmission_id}") # @@ -67,8 +61,8 @@ module Mauve Alert.receive_update(update, received_at) rescue Protobuf::InvalidWireType, - NotImplementedError, - DataObjects::IntegrityError => ex + NotImplementedError, + DataObjects::IntegrityError => ex @logger.error "#{ex} (#{ex.class}) while parsing #{data.length} bytes "+ "starting '#{data[0..16].inspect}' from #{ip_source}" @@ -79,8 +73,14 @@ module Mauve @transmission_id_cache[update.transmission_id.to_s] = MauveTime.now end + end - Timer.instance.thaw + + ensure + # + # Thaw the timer + # + Timer.instance.thaw if Timer.instance.frozen? end def expire_transmission_id_cache @@ -96,14 +96,5 @@ module Mauve end end - class << self - - def enq(a) - instance.buffer.enq(a) - end - - alias push enq - - end end end diff --git a/lib/mauve/server.rb b/lib/mauve/server.rb index ac24da4..2df3888 100644 --- a/lib/mauve/server.rb +++ b/lib/mauve/server.rb @@ -32,13 +32,12 @@ module Mauve THREAD_CLASSES = [UDPServer, HTTPServer, Processor, Notifier, Timer] attr_accessor :web_interface - attr_reader :stopped_at, :started_at, :initial_sleep + attr_reader :stopped_at, :started_at, :initial_sleep, :packet_buffer, :notification_buffer include Singleton def initialize # Set the logger up - @logger = Log4r::Logger.new(self.class.to_s) # Sleep time between pooling the @buffer buffer. @sleep = 1 @@ -50,9 +49,20 @@ module Mauve @started_at = MauveTime.now @initial_sleep = 300 + # + # Keep these queues here to prevent a crash in a subthread losing all the + # subsquent things in the queue. + # + @packet_buffer = Queue.new + @notification_buffer = Queue.new + @config = DEFAULT_CONFIGURATION end + def logger + @logger ||= Log4r::Logger.new(self.class.to_s) + end + def configure(config_spec = nil) # # Update the configuration @@ -120,46 +130,61 @@ module Mauve t.exit end - @logger.info("All threads stopped") + logger.info("All threads stopped") end def run + @stop = false + loop do thread_list = Thread.list thread_list.delete(Thread.current) THREAD_CLASSES.each do |klass| - thread_list.delete(klass.instance) - - next if @frozen or @stop - - unless klass.instance.alive? - # ugh something has died. - # - begin - klass.instance.join - rescue StandardError => ex - @logger.warn "Caught #{ex.to_s} whilst checking #{klass} thread" - @logger.debug ex.backtrace.join("\n") - end - # - # Start the stuff. - klass.instance.start unless @stop + + # + # No need to double check ourselves. + # + thread_list.delete(klass.instance.thread) + + # + # Do nothing if we're frozen or supposed to be stopping or still alive! + # + next if @frozen or @stop or klass.instance.alive? + + # + # ugh something is beginnging to smell. + # + begin + klass.instance.join + rescue StandardError => ex + logger.warn "Caught #{ex.to_s} whilst checking #{klass} thread" + logger.debug ex.backtrace.join("\n") end + # + # (re-)start the klass. + # + klass.instance.start unless @stop end + # + # Now do the same with other threads. + # thread_list.each do |t| - next unless t.alive? + + next if t.alive? + begin t.join rescue StandardError => ex - @logger.fatal "Caught #{ex.to_s} whilst checking threads" - @logger.debug ex.backtrace.join("\n") + logger.fatal "Caught #{ex.to_s} whilst checking threads" + logger.debug ex.backtrace.join("\n") self.stop break end + end break if @stop @@ -171,6 +196,53 @@ module Mauve alias start run + class << self + + # + # BUFFERS + # + # These methods are here, so that if the threads that are popping and + # processing them crash, the buffer itself is not lost with the thread. + # + + # + # These methods pop things on and off the packet_buffer + # + def packet_enq(a) + instance.packet_buffer.enq(a) + end + + def packet_deq + instance.packet_buffer.deq + end + + def packet_buffer_size + instance.packet_buffer.size + end + + alias packet_push packet_enq + alias packet_pop packet_deq + + # + # These methods pop things on and off the notification_buffer + # + def notification_enq(a) + instance.notification_buffer.enq(a) + end + + def notification_deq + instance.notification_buffer.deq + end + + def notification_buffer_size + instance.notification_buffer.size + end + + alias notification_push notification_enq + alias notification_pop notification_deq + + end + end end diff --git a/lib/mauve/timer.rb b/lib/mauve/timer.rb index 60b541c..35e7aa8 100644 --- a/lib/mauve/timer.rb +++ b/lib/mauve/timer.rb @@ -73,11 +73,6 @@ module Mauve # This is a rate-limiting step for alerts. # Kernel.sleep 0.1 - # - # Not sure if this is needed or not. But the timer thread seems to - # freeze here, apparently stuck on a select() statement. - # - Thread.pass end return if self.should_stop? or next_to_notify.nil? diff --git a/lib/mauve/udp_server.rb b/lib/mauve/udp_server.rb index a873e77..9e5f41c 100644 --- a/lib/mauve/udp_server.rb +++ b/lib/mauve/udp_server.rb @@ -70,8 +70,8 @@ module Mauve # i = 0 begin - packet = @socket.recvfrom_nonblock(65535) -# packet = @socket.recvfrom(65535) +# packet = @socket.recvfrom_nonblock(65535) + packet = @socket.recvfrom(65535) received_at = MauveTime.now rescue Errno::EAGAIN, Errno::EWOULDBLOCK => ex puts "#{i += 1} + #{ex}" @@ -91,9 +91,10 @@ module Mauve return end - - - Processor.push([[packet[0], packet[1], received_at]]) + # + # Push packet onto central queue + # + Server.packet_push([packet[0], packet[1], received_at]) end def stop -- cgit v1.2.1