aboutsummaryrefslogtreecommitdiff
path: root/lib/mauve
diff options
context:
space:
mode:
authorPatrick J Cherry <patrick@bytemark.co.uk>2012-04-20 14:48:09 +0100
committerPatrick J Cherry <patrick@bytemark.co.uk>2012-04-20 14:48:09 +0100
commit99073c56f03ff3978e6106c9760ec389ef6c3745 (patch)
treeff724817ce4685d76446f36780a991ef488504f9 /lib/mauve
parent83860784c1d184dd6afa680aeff2e06d65f50b8d (diff)
Added configuration options to disable the notification/packet buffers.
Diffstat (limited to 'lib/mauve')
-rw-r--r--lib/mauve/configuration_builders/server.rb7
-rw-r--r--lib/mauve/notifier.rb19
-rw-r--r--lib/mauve/processor.rb82
-rw-r--r--lib/mauve/server.rb50
4 files changed, 103 insertions, 55 deletions
diff --git a/lib/mauve/configuration_builders/server.rb b/lib/mauve/configuration_builders/server.rb
index e3654b9..feb75b8 100644
--- a/lib/mauve/configuration_builders/server.rb
+++ b/lib/mauve/configuration_builders/server.rb
@@ -163,6 +163,13 @@ module Mauve
# The period of sleep during which no heartbeats are raised.
#
is_attribute "initial_sleep"
+
+ #
+ # The next two attributes determine if packet/notitication bufferes are
+ # used. These both default to "true"
+ #
+ is_attribute "use_packet_buffer"
+ is_attribute "use_notification_buffer"
def builder_setup
@result = Mauve::Server.instance
diff --git a/lib/mauve/notifier.rb b/lib/mauve/notifier.rb
index 8a26b2c..c473acf 100644
--- a/lib/mauve/notifier.rb
+++ b/lib/mauve/notifier.rb
@@ -31,6 +31,18 @@ module Mauve
end
end
+
+ #
+ # This sends the notification for an alert
+ #
+ def notify(alert, at)
+ if alert.alert_group.nil?
+ logger.warn "Could not notify for #{alert} since there are no matching alert groups"
+ else
+ alert.alert_group.notify(alert, at)
+ end
+ end
+
private
@@ -86,12 +98,7 @@ module Mauve
# Empty the buffer, one notification at a time.
#
sz.times do
- alert, at = Server.notification_pop
- if alert.alert_group.nil?
- logger.warn "Could not notify for #{alert} since there are no matching alert groups"
- else
- alert.alert_group.notify(alert, at)
- end
+ notify(*Server.notification_pop)
end
end
diff --git a/lib/mauve/processor.rb b/lib/mauve/processor.rb
index 9896530..0ac7b59 100644
--- a/lib/mauve/processor.rb
+++ b/lib/mauve/processor.rb
@@ -78,6 +78,44 @@ module Mauve
#
do_processor
end
+
+ # This processes an incoming packet. It is in a seperate method so it can
+ # be (de)coupled as needed from the UDP server.
+ #
+ def process_packet(data, client, received_at)
+ #
+ # Uh-oh. Nil data? That's craaaazy
+ #
+ return nil if data.nil?
+
+ ip_source = "#{client[3]}"
+ update = Proto::AlertUpdate.new
+
+ update.parse_from_string(data)
+
+ if @transmission_id_cache[update.transmission_id.to_s]
+ logger.debug("Ignoring duplicate transmission id #{update.transmission_id}")
+ return nil
+ end
+
+ logger.debug "Update #{update.transmission_id} sent at #{update.transmission_time} received at #{received_at.to_i} from "+
+ "'#{update.source}'@#{ip_source} alerts #{update.alert.length}"
+
+ Alert.receive_update(update, received_at, ip_source)
+
+ rescue Protobuf::InvalidWireType,
+ NotImplementedError,
+ DataObjects::IntegrityError => ex
+
+ logger.error "#{ex} (#{ex.class}) while parsing #{data.length} bytes "+
+ "starting '#{data[0..15].inspect}' from #{ip_source}"
+
+ logger.debug ex.backtrace.join("\n")
+
+ ensure
+ @transmission_id_cache[update.transmission_id.to_s] = Time.now
+ end
+
private
@@ -157,52 +195,12 @@ module Mauve
sz = Server.packet_buffer_size
sz.times do
- data, client, received_at = Server.packet_pop
-
- #
- # Uh-oh. Nil data? That's craaaazy
- #
- next if data.nil?
-
-
- # logger.debug("Got #{data.inspect} from #{client.inspect}")
-
- ip_source = "#{client[3]}"
- update = Proto::AlertUpdate.new
-
- begin
- update.parse_from_string(data)
-
- if @transmission_id_cache[update.transmission_id.to_s]
- logger.debug("Ignoring duplicate transmission id #{update.transmission_id}")
- #
- # Continue with next packet.
- #
- next
- end
-
- logger.debug "Update #{update.transmission_id} sent at #{update.transmission_time} received at #{received_at.to_i} from "+
- "'#{update.source}'@#{ip_source} alerts #{update.alert.length}"
-
- Alert.receive_update(update, received_at, ip_source)
-
- rescue Protobuf::InvalidWireType,
- NotImplementedError,
- DataObjects::IntegrityError => ex
-
- logger.error "#{ex} (#{ex.class}) while parsing #{data.length} bytes "+
- "starting '#{data[0..15].inspect}' from #{ip_source}"
-
- logger.debug ex.backtrace.join("\n")
-
- ensure
- @transmission_id_cache[update.transmission_id.to_s] = Time.now
- end
+ process_packet(*Server.packet_pop)
end
end
def timer_should_stop?
- (Server.packet_buffer_size > 0)
+ (Server.packet_buffer_size > 0) or self.should_stop?
end
end
diff --git a/lib/mauve/server.rb b/lib/mauve/server.rb
index 3e82858..db5fda6 100644
--- a/lib/mauve/server.rb
+++ b/lib/mauve/server.rb
@@ -40,7 +40,7 @@ module Mauve
@started_at = Time.now
@initial_sleep = 300
-
+
#
# Keep these queues here to prevent a crash in a subthread losing all the
# subsquent things in the queue.
@@ -69,7 +69,38 @@ module Mauve
raise ArgumentError, "database must be a string" unless d.is_a?(String)
@database = d
end
-
+
+ #
+ # Sets up the packet buffer (or not). The argument can be "false" or "no"
+ # or a FalseClass object for no. Anything else makes no change.
+ #
+ # @param [String] arg
+ # @return [Array or nil]
+ def use_packet_buffer=(arg)
+ logger.debug(arg)
+ if arg.is_a?(FalseClass) or arg =~ /^(n(o)?|f(alse)?)$/i
+ @packet_buffer = nil
+ end
+
+ @packet_buffer
+ end
+
+ #
+ # Sets up the notification buffer (or not). The argument can be "false" or
+ # "no" or a FalseClass object for no. Anything else makes no change.
+ #
+ # @param [String] arg
+ # @return [Array or nil]
+ def use_notification_buffer=(arg)
+ logger.debug(arg)
+ if arg.is_a?(FalseClass) or arg =~ /^(n(o)?|f(alse)?)$/i
+ @notification_buffer = nil
+ end
+
+ @notification_buffer
+ end
+
+
# Set the sleep period during which notifications about old alerts are
# suppressed.
#
@@ -97,11 +128,8 @@ module Mauve
# @return [NilClass]
def setup
#
+ # Set up the database
#
- #
- @packet_buffer = []
- @notification_buffer = []
-
DataMapper.setup(:default, @database)
# DataMapper.logger = Log4r::Logger.new("Mauve::DataMapper")
@@ -261,6 +289,8 @@ module Mauve
# @param [String] a Packet from the UDP server
def packet_enq(a)
instance.packet_buffer.push(a)
+ rescue NoMethodError
+ Processor.instance.process_packet(*a)
end
# Shift a packet off the front of the +packet buffer+
@@ -275,6 +305,8 @@ module Mauve
# @return [Integer}
def packet_buffer_size
instance.packet_buffer.size
+ rescue NoMethodError
+ 0
end
alias packet_push packet_enq
@@ -285,6 +317,8 @@ module Mauve
# @param [Array] a Notification array, consisting of a Person and the args to Mauve::Person#send_alert
def notification_enq(a)
instance.notification_buffer.push(a)
+ rescue NoMethodError
+ Notifier.instance.notify(*a)
end
# Shift a notification off the front of the +notification_buffer+
@@ -299,8 +333,10 @@ module Mauve
# @return [Integer]
def notification_buffer_size
instance.notification_buffer.size
+ rescue NoMethodError
+ 0
end
-
+
alias notification_push notification_enq
alias notification_pop notification_deq