diff options
author | Patrick J Cherry <patrick@bytemark.co.uk> | 2011-04-13 17:03:16 +0100 |
---|---|---|
committer | Patrick J Cherry <patrick@bytemark.co.uk> | 2011-04-13 17:03:16 +0100 |
commit | 89a67770e66d11740948e90a41db6cee0482cf8e (patch) | |
tree | be858515fb789a89d68f94975690ab019813726c /lib/mauve/processor.rb |
new version.
Diffstat (limited to 'lib/mauve/processor.rb')
-rw-r--r-- | lib/mauve/processor.rb | 109 |
1 files changed, 109 insertions, 0 deletions
diff --git a/lib/mauve/processor.rb b/lib/mauve/processor.rb new file mode 100644 index 0000000..414f640 --- /dev/null +++ b/lib/mauve/processor.rb @@ -0,0 +1,109 @@ +# encoding: UTF-8 + +require 'mauve/mauve_thread' + +module Mauve + + class Processor < MauveThread + + include Singleton + + attr_accessor :buffer, :transmission_cache_expire_time, :sleep_interval + + def initialize + # Buffer for UDP socket packets. + @buffer = Queue.new + + # Set the logger up + @logger = Log4r::Logger.new(self.class.to_s) + + # + # Set up the transmission id cache + # + @transmission_id_cache = {} + @transmission_cache_expire_time = 300 + @sleep_interval = 1 + end + + def main_loop + + sz = @buffer.size + + return if sz == 0 + + Timer.instance.freeze + + logger.info("Buffer has #{sz} packets waiting...") + + triplets = [] + # + # Only do the loop a maximum of 10 times every @sleep_interval seconds + # + (sz > 20 ? 20 : sz).times do + triplets += @buffer.deq + end + + triplets.each do |data, client, received_at| + + @logger.debug("Got #{data.inspect} from #{client.inspect}") + + ip_source = "#{client[3]}:#{client[1]}" + update = Proto::AlertUpdate.new + + begin + update.parse_from_string(data) + + if @transmission_id_cache[update.transmission_id.to_s] + @logger.debug("Ignoring duplicate transmission id #{data.transmission_id}") + # + # Continue with next packet. + # + next + end + + @logger.debug "Update #{update.transmission_id} sent at #{update.transmission_time} from "+ + "'#{update.source}'@#{ip_source} alerts #{update.alert.length}" + + Alert.receive_update(update, received_at) + + rescue Protobuf::InvalidWireType, + NotImplementedError, + DataObjects::IntegrityError => ex + + @logger.error "#{ex} (#{ex.class}) while parsing #{data.length} bytes "+ + "starting '#{data[0..16].inspect}' from #{ip_source}" + + @logger.debug ex.backtrace.join("\n") + + ensure + @transmission_id_cache[update.transmission_id.to_s] = MauveTime.now + + end + end + Timer.instance.thaw + end + + def expire_transmission_id_cache + now = MauveTime.now + to_delete = [] + + @transmission_id_cache.each do |tid, received_at| + to_delete << tid if (now - received_at) > @transmission_cache_expire_time + end + + to_delete.each do |tid| + @transmission_id_cache.delete(tid) + end + end + + class << self + + def enq(a) + instance.buffer.enq(a) + end + + alias push enq + + end + end +end |