# encoding: UTF-8 require 'mauve/mauve_thread' module Mauve class Processor < MauveThread include Singleton attr_accessor :buffer, :transmission_cache_expire_time, :sleep_interval def initialize # Buffer for UDP socket packets. @buffer = Queue.new # Set the logger up @logger = Log4r::Logger.new(self.class.to_s) # # Set up the transmission id cache # @transmission_id_cache = {} @transmission_cache_expire_time = 300 @sleep_interval = 1 end def main_loop sz = @buffer.size return if sz == 0 Timer.instance.freeze logger.info("Buffer has #{sz} packets waiting...") triplets = [] # # Only do the loop a maximum of 10 times every @sleep_interval seconds # (sz > 20 ? 20 : sz).times do triplets += @buffer.deq end triplets.each do |data, client, received_at| @logger.debug("Got #{data.inspect} from #{client.inspect}") ip_source = "#{client[3]}:#{client[1]}" update = Proto::AlertUpdate.new begin update.parse_from_string(data) if @transmission_id_cache[update.transmission_id.to_s] @logger.debug("Ignoring duplicate transmission id #{data.transmission_id}") # # Continue with next packet. # next end @logger.debug "Update #{update.transmission_id} sent at #{update.transmission_time} from "+ "'#{update.source}'@#{ip_source} alerts #{update.alert.length}" Alert.receive_update(update, received_at) rescue Protobuf::InvalidWireType, NotImplementedError, DataObjects::IntegrityError => ex @logger.error "#{ex} (#{ex.class}) while parsing #{data.length} bytes "+ "starting '#{data[0..16].inspect}' from #{ip_source}" @logger.debug ex.backtrace.join("\n") ensure @transmission_id_cache[update.transmission_id.to_s] = MauveTime.now end end Timer.instance.thaw end def expire_transmission_id_cache now = MauveTime.now to_delete = [] @transmission_id_cache.each do |tid, received_at| to_delete << tid if (now - received_at) > @transmission_cache_expire_time end to_delete.each do |tid| @transmission_id_cache.delete(tid) end end class << self def enq(a) instance.buffer.enq(a) end alias push enq end end end