From 99073c56f03ff3978e6106c9760ec389ef6c3745 Mon Sep 17 00:00:00 2001 From: Patrick J Cherry Date: Fri, 20 Apr 2012 14:48:09 +0100 Subject: Added configuration options to disable the notification/packet buffers. --- lib/mauve/configuration_builders/server.rb | 7 +++ lib/mauve/notifier.rb | 19 ++++--- lib/mauve/processor.rb | 82 +++++++++++++++--------------- lib/mauve/server.rb | 50 +++++++++++++++--- 4 files changed, 103 insertions(+), 55 deletions(-) diff --git a/lib/mauve/configuration_builders/server.rb b/lib/mauve/configuration_builders/server.rb index e3654b9..feb75b8 100644 --- a/lib/mauve/configuration_builders/server.rb +++ b/lib/mauve/configuration_builders/server.rb @@ -163,6 +163,13 @@ module Mauve # The period of sleep during which no heartbeats are raised. # is_attribute "initial_sleep" + + # + # The next two attributes determine if packet/notitication bufferes are + # used. These both default to "true" + # + is_attribute "use_packet_buffer" + is_attribute "use_notification_buffer" def builder_setup @result = Mauve::Server.instance diff --git a/lib/mauve/notifier.rb b/lib/mauve/notifier.rb index 8a26b2c..c473acf 100644 --- a/lib/mauve/notifier.rb +++ b/lib/mauve/notifier.rb @@ -31,6 +31,18 @@ module Mauve end end + + # + # This sends the notification for an alert + # + def notify(alert, at) + if alert.alert_group.nil? + logger.warn "Could not notify for #{alert} since there are no matching alert groups" + else + alert.alert_group.notify(alert, at) + end + end + private @@ -86,12 +98,7 @@ module Mauve # Empty the buffer, one notification at a time. # sz.times do - alert, at = Server.notification_pop - if alert.alert_group.nil? - logger.warn "Could not notify for #{alert} since there are no matching alert groups" - else - alert.alert_group.notify(alert, at) - end + notify(*Server.notification_pop) end end diff --git a/lib/mauve/processor.rb b/lib/mauve/processor.rb index 9896530..0ac7b59 100644 --- a/lib/mauve/processor.rb +++ b/lib/mauve/processor.rb @@ -78,6 +78,44 @@ module Mauve # do_processor end + + # This processes an incoming packet. It is in a seperate method so it can + # be (de)coupled as needed from the UDP server. + # + def process_packet(data, client, received_at) + # + # Uh-oh. Nil data? That's craaaazy + # + return nil if data.nil? + + ip_source = "#{client[3]}" + update = Proto::AlertUpdate.new + + update.parse_from_string(data) + + if @transmission_id_cache[update.transmission_id.to_s] + logger.debug("Ignoring duplicate transmission id #{update.transmission_id}") + return nil + end + + logger.debug "Update #{update.transmission_id} sent at #{update.transmission_time} received at #{received_at.to_i} from "+ + "'#{update.source}'@#{ip_source} alerts #{update.alert.length}" + + Alert.receive_update(update, received_at, ip_source) + + rescue Protobuf::InvalidWireType, + NotImplementedError, + DataObjects::IntegrityError => ex + + logger.error "#{ex} (#{ex.class}) while parsing #{data.length} bytes "+ + "starting '#{data[0..15].inspect}' from #{ip_source}" + + logger.debug ex.backtrace.join("\n") + + ensure + @transmission_id_cache[update.transmission_id.to_s] = Time.now + end + private @@ -157,52 +195,12 @@ module Mauve sz = Server.packet_buffer_size sz.times do - data, client, received_at = Server.packet_pop - - # - # Uh-oh. Nil data? That's craaaazy - # - next if data.nil? - - - # logger.debug("Got #{data.inspect} from #{client.inspect}") - - ip_source = "#{client[3]}" - update = Proto::AlertUpdate.new - - begin - update.parse_from_string(data) - - if @transmission_id_cache[update.transmission_id.to_s] - logger.debug("Ignoring duplicate transmission id #{update.transmission_id}") - # - # Continue with next packet. - # - next - end - - logger.debug "Update #{update.transmission_id} sent at #{update.transmission_time} received at #{received_at.to_i} from "+ - "'#{update.source}'@#{ip_source} alerts #{update.alert.length}" - - Alert.receive_update(update, received_at, ip_source) - - rescue Protobuf::InvalidWireType, - NotImplementedError, - DataObjects::IntegrityError => ex - - logger.error "#{ex} (#{ex.class}) while parsing #{data.length} bytes "+ - "starting '#{data[0..15].inspect}' from #{ip_source}" - - logger.debug ex.backtrace.join("\n") - - ensure - @transmission_id_cache[update.transmission_id.to_s] = Time.now - end + process_packet(*Server.packet_pop) end end def timer_should_stop? - (Server.packet_buffer_size > 0) + (Server.packet_buffer_size > 0) or self.should_stop? end end diff --git a/lib/mauve/server.rb b/lib/mauve/server.rb index 3e82858..db5fda6 100644 --- a/lib/mauve/server.rb +++ b/lib/mauve/server.rb @@ -40,7 +40,7 @@ module Mauve @started_at = Time.now @initial_sleep = 300 - + # # Keep these queues here to prevent a crash in a subthread losing all the # subsquent things in the queue. @@ -69,7 +69,38 @@ module Mauve raise ArgumentError, "database must be a string" unless d.is_a?(String) @database = d end - + + # + # Sets up the packet buffer (or not). The argument can be "false" or "no" + # or a FalseClass object for no. Anything else makes no change. + # + # @param [String] arg + # @return [Array or nil] + def use_packet_buffer=(arg) + logger.debug(arg) + if arg.is_a?(FalseClass) or arg =~ /^(n(o)?|f(alse)?)$/i + @packet_buffer = nil + end + + @packet_buffer + end + + # + # Sets up the notification buffer (or not). The argument can be "false" or + # "no" or a FalseClass object for no. Anything else makes no change. + # + # @param [String] arg + # @return [Array or nil] + def use_notification_buffer=(arg) + logger.debug(arg) + if arg.is_a?(FalseClass) or arg =~ /^(n(o)?|f(alse)?)$/i + @notification_buffer = nil + end + + @notification_buffer + end + + # Set the sleep period during which notifications about old alerts are # suppressed. # @@ -97,11 +128,8 @@ module Mauve # @return [NilClass] def setup # + # Set up the database # - # - @packet_buffer = [] - @notification_buffer = [] - DataMapper.setup(:default, @database) # DataMapper.logger = Log4r::Logger.new("Mauve::DataMapper") @@ -261,6 +289,8 @@ module Mauve # @param [String] a Packet from the UDP server def packet_enq(a) instance.packet_buffer.push(a) + rescue NoMethodError + Processor.instance.process_packet(*a) end # Shift a packet off the front of the +packet buffer+ @@ -275,6 +305,8 @@ module Mauve # @return [Integer} def packet_buffer_size instance.packet_buffer.size + rescue NoMethodError + 0 end alias packet_push packet_enq @@ -285,6 +317,8 @@ module Mauve # @param [Array] a Notification array, consisting of a Person and the args to Mauve::Person#send_alert def notification_enq(a) instance.notification_buffer.push(a) + rescue NoMethodError + Notifier.instance.notify(*a) end # Shift a notification off the front of the +notification_buffer+ @@ -299,8 +333,10 @@ module Mauve # @return [Integer] def notification_buffer_size instance.notification_buffer.size + rescue NoMethodError + 0 end - + alias notification_push notification_enq alias notification_pop notification_deq -- cgit v1.2.1