aboutsummaryrefslogtreecommitdiff
path: root/lib/mauve/processor.rb
diff options
context:
space:
mode:
authorPatrick J Cherry <patrick@bytemark.co.uk>2012-04-20 14:48:09 +0100
committerPatrick J Cherry <patrick@bytemark.co.uk>2012-04-20 14:48:09 +0100
commit99073c56f03ff3978e6106c9760ec389ef6c3745 (patch)
treeff724817ce4685d76446f36780a991ef488504f9 /lib/mauve/processor.rb
parent83860784c1d184dd6afa680aeff2e06d65f50b8d (diff)
Added configuration options to disable the notification/packet buffers.
Diffstat (limited to 'lib/mauve/processor.rb')
-rw-r--r--lib/mauve/processor.rb82
1 files changed, 40 insertions, 42 deletions
diff --git a/lib/mauve/processor.rb b/lib/mauve/processor.rb
index 9896530..0ac7b59 100644
--- a/lib/mauve/processor.rb
+++ b/lib/mauve/processor.rb
@@ -78,6 +78,44 @@ module Mauve
#
do_processor
end
+
+ # This processes an incoming packet. It is in a seperate method so it can
+ # be (de)coupled as needed from the UDP server.
+ #
+ def process_packet(data, client, received_at)
+ #
+ # Uh-oh. Nil data? That's craaaazy
+ #
+ return nil if data.nil?
+
+ ip_source = "#{client[3]}"
+ update = Proto::AlertUpdate.new
+
+ update.parse_from_string(data)
+
+ if @transmission_id_cache[update.transmission_id.to_s]
+ logger.debug("Ignoring duplicate transmission id #{update.transmission_id}")
+ return nil
+ end
+
+ logger.debug "Update #{update.transmission_id} sent at #{update.transmission_time} received at #{received_at.to_i} from "+
+ "'#{update.source}'@#{ip_source} alerts #{update.alert.length}"
+
+ Alert.receive_update(update, received_at, ip_source)
+
+ rescue Protobuf::InvalidWireType,
+ NotImplementedError,
+ DataObjects::IntegrityError => ex
+
+ logger.error "#{ex} (#{ex.class}) while parsing #{data.length} bytes "+
+ "starting '#{data[0..15].inspect}' from #{ip_source}"
+
+ logger.debug ex.backtrace.join("\n")
+
+ ensure
+ @transmission_id_cache[update.transmission_id.to_s] = Time.now
+ end
+
private
@@ -157,52 +195,12 @@ module Mauve
sz = Server.packet_buffer_size
sz.times do
- data, client, received_at = Server.packet_pop
-
- #
- # Uh-oh. Nil data? That's craaaazy
- #
- next if data.nil?
-
-
- # logger.debug("Got #{data.inspect} from #{client.inspect}")
-
- ip_source = "#{client[3]}"
- update = Proto::AlertUpdate.new
-
- begin
- update.parse_from_string(data)
-
- if @transmission_id_cache[update.transmission_id.to_s]
- logger.debug("Ignoring duplicate transmission id #{update.transmission_id}")
- #
- # Continue with next packet.
- #
- next
- end
-
- logger.debug "Update #{update.transmission_id} sent at #{update.transmission_time} received at #{received_at.to_i} from "+
- "'#{update.source}'@#{ip_source} alerts #{update.alert.length}"
-
- Alert.receive_update(update, received_at, ip_source)
-
- rescue Protobuf::InvalidWireType,
- NotImplementedError,
- DataObjects::IntegrityError => ex
-
- logger.error "#{ex} (#{ex.class}) while parsing #{data.length} bytes "+
- "starting '#{data[0..15].inspect}' from #{ip_source}"
-
- logger.debug ex.backtrace.join("\n")
-
- ensure
- @transmission_id_cache[update.transmission_id.to_s] = Time.now
- end
+ process_packet(*Server.packet_pop)
end
end
def timer_should_stop?
- (Server.packet_buffer_size > 0)
+ (Server.packet_buffer_size > 0) or self.should_stop?
end
end