aboutsummaryrefslogtreecommitdiff
path: root/lib/mauve
diff options
context:
space:
mode:
authorPatrick J Cherry <patrick@bytemark.co.uk>2011-06-17 12:48:53 +0100
committerPatrick J Cherry <patrick@bytemark.co.uk>2011-06-17 12:48:53 +0100
commita2fc458d4c1bc6027760546653cb153e775576ce (patch)
treef3af3775adab48d9a18495eeb410cf48269ffd9e /lib/mauve
parent4b39583e63b59d73e2855d776f7cca12d734f2af (diff)
* Notifications are now run in their separate threads.
* Queues are now just arrays instead of "Queue"s * Updated templates to be saner. * Added flusing of queues when threads stop
Diffstat (limited to 'lib/mauve')
-rw-r--r--lib/mauve/alert.rb31
-rw-r--r--lib/mauve/mauve_thread.rb5
-rw-r--r--lib/mauve/notifier.rb25
-rw-r--r--lib/mauve/notifiers/templates/email.html.erb4
-rw-r--r--lib/mauve/notifiers/templates/email.txt.erb4
-rw-r--r--lib/mauve/notifiers/templates/xmpp.txt.erb6
-rw-r--r--lib/mauve/person.rb16
-rw-r--r--lib/mauve/processor.rb36
-rw-r--r--lib/mauve/server.rb19
9 files changed, 97 insertions, 49 deletions
diff --git a/lib/mauve/alert.rb b/lib/mauve/alert.rb
index 6e5249f..86b49e3 100644
--- a/lib/mauve/alert.rb
+++ b/lib/mauve/alert.rb
@@ -162,9 +162,10 @@ module Mauve
raise ArgumentError unless ack_until.is_a?(Time)
self.acknowledged_by = person.username
- self.acknowledged_at = Time.now
- self.update_type = :acknowledged
+ self.acknowledged_at = MauveTime.now
self.will_unacknowledge_at = ack_until
+ self.update_type = :acknowledged
+
logger.error("Couldn't save #{self}") unless save
AlertGroup.notify([self])
end
@@ -172,7 +173,9 @@ module Mauve
def unacknowledge!
self.acknowledged_by = nil
self.acknowledged_at = nil
- self.update_type = :raised
+ self.will_unacknowledge_at = nil
+ self.update_type = (raised? ? :raised : :cleared)
+
logger.error("Couldn't save #{self}") unless save
AlertGroup.notify([self])
end
@@ -182,19 +185,27 @@ module Mauve
self.acknowledged_by = nil
self.acknowledged_at = nil
self.will_unacknowledge_at = nil
- self.will_raise_at = nil
- self.update_type = :raised
self.raised_at = MauveTime.now
+ self.will_raise_at = nil
self.cleared_at = nil
+ # Don't clear will_clear_at
+ self.update_type = :raised
+
logger.error("Couldn't save #{self}") unless save
AlertGroup.notify([self]) unless already_raised
end
def clear!(notify=true)
already_cleared = cleared?
+ self.acknowledged_by = nil
+ self.acknowledged_at = nil
+ self.will_unacknowledge_at = nil
+ self.raised_at = nil
+ # Don't clear will_raise_at
self.cleared_at = MauveTime.now
self.will_clear_at = nil
self.update_type = :cleared
+
logger.error("Couldn't save #{self}") unless save
AlertGroup.notify([self]) unless !notify || already_cleared
end
@@ -208,8 +219,8 @@ module Mauve
end
def poll
- raise! if will_unacknowledge_at && will_unacknowledge_at.to_time <= MauveTime.now ||
- will_raise_at && will_raise_at.to_time <= MauveTime.now
+ raise! if (will_unacknowledge_at and will_unacknowledge_at.to_time <= MauveTime.now) or
+ (will_raise_at and will_raise_at.to_time <= MauveTime.now)
clear! if will_clear_at && will_clear_at.to_time <= MauveTime.now
end
@@ -338,8 +349,6 @@ module Mauve
raise_time = reception_time
end
- logger.debug("received at #{reception_time}, transmitted at #{transmission_time}, raised at #{raise_time}, clear at #{clear_time}")
-
#
# Make sure there's no HTML in the ID... paranoia. The rest of the
# HTML removal is done elsewhere.
@@ -406,8 +415,6 @@ module Mauve
alert_db.subject = alert_db.source
end
- logger.debug [alert_db.source, alert_db.subject].inspect
-
alert_db.summary = Alert.remove_html(alert.summary) if alert.summary && !alert.summary.empty?
#
@@ -465,7 +472,7 @@ module Mauve
end
end
- logger.debug "Got #{alerts_updated.length} alerts to notify about"
+ logger.debug "Got #{alerts_updated.length} alerts to notify about" if alerts_updated.length > 0
AlertGroup.notify(alerts_updated)
end
diff --git a/lib/mauve/mauve_thread.rb b/lib/mauve/mauve_thread.rb
index 3aba6fe..79e742a 100644
--- a/lib/mauve/mauve_thread.rb
+++ b/lib/mauve/mauve_thread.rb
@@ -12,7 +12,7 @@ module Mauve
@logger ||= Log4r::Logger.new(self.class.to_s)
end
- def run_thread(interval = 0.2)
+ def run_thread(interval = 0.1)
#
# Good to go.
#
@@ -98,7 +98,7 @@ module Mauve
self.stop
self.start
end
-
+
def stop
logger.debug("Stopping")
@@ -117,6 +117,7 @@ module Mauve
self.join
end
+
alias exit stop
def kill
diff --git a/lib/mauve/notifier.rb b/lib/mauve/notifier.rb
index 5c0e5a7..90df895 100644
--- a/lib/mauve/notifier.rb
+++ b/lib/mauve/notifier.rb
@@ -25,12 +25,24 @@ module Mauve
logger.debug("Notifier buffer is #{sz} in length")
- (sz > 50 ? 50 : sz).times do
- person, level, alert = Server.notification_pop
+ my_threads = []
+ sz.times do
+ person, *args = Server.notification_pop
+
+ #
+ # Nil person.. that's craaazy too!
+ #
+ break if person.nil?
+ my_threads << Thread.new {
+ person.do_send_alert(*args)
+ }
+ end
+
+ my_threads.each do |t|
begin
- person.do_send_alert(level, alert)
+ t.join
rescue StandardError => ex
- logger.debug ex.to_s
+ logger.error ex.to_s
logger.debug ex.backtrace.join("\n")
end
end
@@ -73,6 +85,11 @@ module Mauve
end
super
+
+ #
+ # flush the queue
+ #
+ main_loop
end
end
diff --git a/lib/mauve/notifiers/templates/email.html.erb b/lib/mauve/notifiers/templates/email.html.erb
index 6c323b0..46b2ebc 100644
--- a/lib/mauve/notifiers/templates/email.html.erb
+++ b/lib/mauve/notifiers/templates/email.html.erb
@@ -2,7 +2,7 @@
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml"><head><title></title></head><body>
<p><strong><%= alert.update_type.upcase %>:</strong> <%
-case alert.update_type
+case alert.update_type.to_sym
when :cleared
%><%= alert.cleared_at.to_s_relative %><%
when :acknowleged
@@ -10,7 +10,7 @@ when :acknowleged
else
%><%= alert.raised_at.to_s_relative %><%
end
-%>: <%= alert.summary %><%
+%>: <%= alert.subject %>: <%= alert.summary %><%
if alert.source != alert.subject
%> -- <em>from <%= alert.source %></em><%
end
diff --git a/lib/mauve/notifiers/templates/email.txt.erb b/lib/mauve/notifiers/templates/email.txt.erb
index 1bc79a4..5db9bd6 100644
--- a/lib/mauve/notifiers/templates/email.txt.erb
+++ b/lib/mauve/notifiers/templates/email.txt.erb
@@ -1,5 +1,5 @@
<%= alert.update_type.upcase %>: <%
-case alert.update_type
+case alert.update_type.to_sym
when :cleared
%><%= alert.cleared_at.to_s_relative %><%
when :acknowleged
@@ -7,7 +7,7 @@ when :acknowleged
else
%><%= alert.raised_at.to_s_relative %><%
end
-%>: <%= alert.summary %><%
+%>: <%= alert.subject %>: <%= alert.summary %><%
if alert.source != alert.subject
%> -- from <%= alert.source %><%
end
diff --git a/lib/mauve/notifiers/templates/xmpp.txt.erb b/lib/mauve/notifiers/templates/xmpp.txt.erb
index eafc12b..282f370 100644
--- a/lib/mauve/notifiers/templates/xmpp.txt.erb
+++ b/lib/mauve/notifiers/templates/xmpp.txt.erb
@@ -1,13 +1,13 @@
<%= alert.update_type.upcase %>: <%
-case alert.update_type
+case alert.update_type.to_sym
when :cleared
%><%= alert.cleared_at.to_s_relative %><%
when :acknowleged
-%><%= alert.acknowledged_at.to_time.to_s_relative %><%
+%><%= alert.acknowledged_at.to_s_relative %><%
else
%><%= alert.raised_at.to_s_relative %><%
end
-%>: <%= alert.subject %> <%= alert.summary %><%
+%>: <%= alert.subject %>: <%= alert.summary %><%
if alert.source != alert.subject
%> -- from <%= alert.source %><%
end
diff --git a/lib/mauve/person.rb b/lib/mauve/person.rb
index 82845e6..2689bb3 100644
--- a/lib/mauve/person.rb
+++ b/lib/mauve/person.rb
@@ -8,7 +8,7 @@ module Mauve
attr_reader :notification_thresholds
def initialize(*args)
- @notification_thresholds = { 60 => Array.new(10) }
+ @notification_thresholds = { } # 60 => Array.new(10) }
@suppressed = false
super(*args)
end
@@ -151,10 +151,6 @@ module Mauve
# This just wraps send_alert by sending the job to a queue.
#
def send_alert(level, alert)
- Server.notification_push([self, level, alert])
- end
-
- def do_send_alert(level, alert)
now = MauveTime.now
suppressed_changed = nil
threshold_breached = @notification_thresholds.any? do |period, previous_alert_times|
@@ -185,6 +181,10 @@ module Mauve
return if suppressed? or this_alert_suppressed
+ Server.notification_push([self, level, alert, suppressed_changed])
+ end
+
+ def do_send_alert(level, alert, suppressed_changed)
result = NotificationCaller.new(
self,
alert,
@@ -198,7 +198,11 @@ module Mauve
# Remember that we've sent an alert
#
@notification_thresholds.each do |period, previous_alert_times|
- @notification_thresholds[period].replace(previous_alert_times[1..period-1] + [now])
+ #
+ # Hmm.. not sure how to make this thread-safe.
+ #
+ @notification_thresholds[period].push MauveTime.now
+ @notification_thresholds[period].shift
end
logger.info("Notification for #{username} of #{alert} at level #{level} has been successful")
diff --git a/lib/mauve/processor.rb b/lib/mauve/processor.rb
index a46f229..083b9a1 100644
--- a/lib/mauve/processor.rb
+++ b/lib/mauve/processor.rb
@@ -19,26 +19,26 @@ module Mauve
#
@transmission_id_cache = {}
@transmission_cache_expire_time = 300
- @sleep_interval = 1
+ @transmission_cache_checked_at = Time.now
end
def main_loop
- sz = Server.packet_buffer_size
-
- return if sz == 0
-
- Timer.instance.freeze unless Timer.instance.frozen?
-
- logger.info("Buffer has #{sz} packets waiting...")
+ logger.info("Buffer has packets waiting...") if Server.packet_buffer_size > 0
#
# Only do the loop a maximum of 10 times every @sleep_interval seconds
#
-
- (sz > 50 ? 50 : sz).times do
+ 10.times do
data, client, received_at = Server.packet_pop
+ #
+ # Uh-oh. Nil data? That's craaaazy
+ #
+ break if data.nil?
+
+ Timer.instance.freeze unless Timer.instance.frozen?
+
@logger.debug("Got #{data.inspect} from #{client.inspect}")
ip_source = "#{client[3]}:#{client[1]}"
@@ -71,7 +71,6 @@ module Mauve
ensure
@transmission_id_cache[update.transmission_id.to_s] = MauveTime.now
-
end
end
@@ -85,6 +84,11 @@ module Mauve
def expire_transmission_id_cache
now = MauveTime.now
+ #
+ # Only check once every minute.
+ #
+ return unless (now - @transmission_cache_checked_at) > 60
+
to_delete = []
@transmission_id_cache.each do |tid, received_at|
@@ -94,7 +98,17 @@ module Mauve
to_delete.each do |tid|
@transmission_id_cache.delete(tid)
end
+
+ @transmission_cache_checked_at = now
end
+ def stop
+ super
+
+ #
+ # flush the queue
+ #
+ main_loop
+ end
end
end
diff --git a/lib/mauve/server.rb b/lib/mauve/server.rb
index 2df3888..c985e15 100644
--- a/lib/mauve/server.rb
+++ b/lib/mauve/server.rb
@@ -53,8 +53,8 @@ module Mauve
# Keep these queues here to prevent a crash in a subthread losing all the
# subsquent things in the queue.
#
- @packet_buffer = Queue.new
- @notification_buffer = Queue.new
+ @packet_buffer = []
+ @notification_buffer = []
@config = DEFAULT_CONFIGURATION
end
@@ -115,13 +115,18 @@ module Mauve
end
def stop
+ if @stop
+ logger.debug("Stop already called!")
+ return
+ end
+
@stop = true
thread_list = Thread.list
thread_list.delete(Thread.current)
- THREAD_CLASSES.reverse.each do |klass|
+ THREAD_CLASSES.each do |klass|
thread_list.delete(klass.instance)
klass.instance.stop unless klass.instance.nil?
end
@@ -209,11 +214,11 @@ module Mauve
# These methods pop things on and off the packet_buffer
#
def packet_enq(a)
- instance.packet_buffer.enq(a)
+ instance.packet_buffer.push(a)
end
def packet_deq
- instance.packet_buffer.deq
+ instance.packet_buffer.shift
end
def packet_buffer_size
@@ -227,11 +232,11 @@ module Mauve
# These methods pop things on and off the notification_buffer
#
def notification_enq(a)
- instance.notification_buffer.enq(a)
+ instance.notification_buffer.push(a)
end
def notification_deq
- instance.notification_buffer.deq
+ instance.notification_buffer.shift
end
def notification_buffer_size