diff options
Diffstat (limited to 'lib/mauve/processor.rb')
-rw-r--r-- | lib/mauve/processor.rb | 82 |
1 files changed, 40 insertions, 42 deletions
diff --git a/lib/mauve/processor.rb b/lib/mauve/processor.rb index 9896530..0ac7b59 100644 --- a/lib/mauve/processor.rb +++ b/lib/mauve/processor.rb @@ -78,6 +78,44 @@ module Mauve # do_processor end + + # This processes an incoming packet. It is in a seperate method so it can + # be (de)coupled as needed from the UDP server. + # + def process_packet(data, client, received_at) + # + # Uh-oh. Nil data? That's craaaazy + # + return nil if data.nil? + + ip_source = "#{client[3]}" + update = Proto::AlertUpdate.new + + update.parse_from_string(data) + + if @transmission_id_cache[update.transmission_id.to_s] + logger.debug("Ignoring duplicate transmission id #{update.transmission_id}") + return nil + end + + logger.debug "Update #{update.transmission_id} sent at #{update.transmission_time} received at #{received_at.to_i} from "+ + "'#{update.source}'@#{ip_source} alerts #{update.alert.length}" + + Alert.receive_update(update, received_at, ip_source) + + rescue Protobuf::InvalidWireType, + NotImplementedError, + DataObjects::IntegrityError => ex + + logger.error "#{ex} (#{ex.class}) while parsing #{data.length} bytes "+ + "starting '#{data[0..15].inspect}' from #{ip_source}" + + logger.debug ex.backtrace.join("\n") + + ensure + @transmission_id_cache[update.transmission_id.to_s] = Time.now + end + private @@ -157,52 +195,12 @@ module Mauve sz = Server.packet_buffer_size sz.times do - data, client, received_at = Server.packet_pop - - # - # Uh-oh. Nil data? That's craaaazy - # - next if data.nil? - - - # logger.debug("Got #{data.inspect} from #{client.inspect}") - - ip_source = "#{client[3]}" - update = Proto::AlertUpdate.new - - begin - update.parse_from_string(data) - - if @transmission_id_cache[update.transmission_id.to_s] - logger.debug("Ignoring duplicate transmission id #{update.transmission_id}") - # - # Continue with next packet. - # - next - end - - logger.debug "Update #{update.transmission_id} sent at #{update.transmission_time} received at #{received_at.to_i} from "+ - "'#{update.source}'@#{ip_source} alerts #{update.alert.length}" - - Alert.receive_update(update, received_at, ip_source) - - rescue Protobuf::InvalidWireType, - NotImplementedError, - DataObjects::IntegrityError => ex - - logger.error "#{ex} (#{ex.class}) while parsing #{data.length} bytes "+ - "starting '#{data[0..15].inspect}' from #{ip_source}" - - logger.debug ex.backtrace.join("\n") - - ensure - @transmission_id_cache[update.transmission_id.to_s] = Time.now - end + process_packet(*Server.packet_pop) end end def timer_should_stop? - (Server.packet_buffer_size > 0) + (Server.packet_buffer_size > 0) or self.should_stop? end end |