diff options
Diffstat (limited to 'lib/mauve/processor.rb')
-rw-r--r-- | lib/mauve/processor.rb | 39 |
1 files changed, 15 insertions, 24 deletions
diff --git a/lib/mauve/processor.rb b/lib/mauve/processor.rb index 414f640..86aaaec 100644 --- a/lib/mauve/processor.rb +++ b/lib/mauve/processor.rb @@ -8,12 +8,9 @@ module Mauve include Singleton - attr_accessor :buffer, :transmission_cache_expire_time, :sleep_interval + attr_accessor :transmission_cache_expire_time, :sleep_interval def initialize - # Buffer for UDP socket packets. - @buffer = Queue.new - # Set the logger up @logger = Log4r::Logger.new(self.class.to_s) @@ -27,23 +24,20 @@ module Mauve def main_loop - sz = @buffer.size + sz = Server.packet_buffer_size return if sz == 0 - Timer.instance.freeze + Timer.instance.freeze unless Timer.instance.frozen? logger.info("Buffer has #{sz} packets waiting...") - triplets = [] # # Only do the loop a maximum of 10 times every @sleep_interval seconds # - (sz > 20 ? 20 : sz).times do - triplets += @buffer.deq - end - triplets.each do |data, client, received_at| + (sz > 10 ? 10 : sz).times do + data, client, received_at = Server.packet_pop @logger.debug("Got #{data.inspect} from #{client.inspect}") @@ -52,7 +46,7 @@ module Mauve begin update.parse_from_string(data) - + if @transmission_id_cache[update.transmission_id.to_s] @logger.debug("Ignoring duplicate transmission id #{data.transmission_id}") # @@ -67,8 +61,8 @@ module Mauve Alert.receive_update(update, received_at) rescue Protobuf::InvalidWireType, - NotImplementedError, - DataObjects::IntegrityError => ex + NotImplementedError, + DataObjects::IntegrityError => ex @logger.error "#{ex} (#{ex.class}) while parsing #{data.length} bytes "+ "starting '#{data[0..16].inspect}' from #{ip_source}" @@ -79,8 +73,14 @@ module Mauve @transmission_id_cache[update.transmission_id.to_s] = MauveTime.now end + end - Timer.instance.thaw + + ensure + # + # Thaw the timer + # + Timer.instance.thaw if Timer.instance.frozen? end def expire_transmission_id_cache @@ -96,14 +96,5 @@ module Mauve end end - class << self - - def enq(a) - instance.buffer.enq(a) - end - - alias push enq - - end end end |