aboutsummaryrefslogtreecommitdiff
path: root/lib/mauve/processor.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mauve/processor.rb')
-rw-r--r--lib/mauve/processor.rb39
1 files changed, 15 insertions, 24 deletions
diff --git a/lib/mauve/processor.rb b/lib/mauve/processor.rb
index 414f640..86aaaec 100644
--- a/lib/mauve/processor.rb
+++ b/lib/mauve/processor.rb
@@ -8,12 +8,9 @@ module Mauve
include Singleton
- attr_accessor :buffer, :transmission_cache_expire_time, :sleep_interval
+ attr_accessor :transmission_cache_expire_time, :sleep_interval
def initialize
- # Buffer for UDP socket packets.
- @buffer = Queue.new
-
# Set the logger up
@logger = Log4r::Logger.new(self.class.to_s)
@@ -27,23 +24,20 @@ module Mauve
def main_loop
- sz = @buffer.size
+ sz = Server.packet_buffer_size
return if sz == 0
- Timer.instance.freeze
+ Timer.instance.freeze unless Timer.instance.frozen?
logger.info("Buffer has #{sz} packets waiting...")
- triplets = []
#
# Only do the loop a maximum of 10 times every @sleep_interval seconds
#
- (sz > 20 ? 20 : sz).times do
- triplets += @buffer.deq
- end
- triplets.each do |data, client, received_at|
+ (sz > 10 ? 10 : sz).times do
+ data, client, received_at = Server.packet_pop
@logger.debug("Got #{data.inspect} from #{client.inspect}")
@@ -52,7 +46,7 @@ module Mauve
begin
update.parse_from_string(data)
-
+
if @transmission_id_cache[update.transmission_id.to_s]
@logger.debug("Ignoring duplicate transmission id #{data.transmission_id}")
#
@@ -67,8 +61,8 @@ module Mauve
Alert.receive_update(update, received_at)
rescue Protobuf::InvalidWireType,
- NotImplementedError,
- DataObjects::IntegrityError => ex
+ NotImplementedError,
+ DataObjects::IntegrityError => ex
@logger.error "#{ex} (#{ex.class}) while parsing #{data.length} bytes "+
"starting '#{data[0..16].inspect}' from #{ip_source}"
@@ -79,8 +73,14 @@ module Mauve
@transmission_id_cache[update.transmission_id.to_s] = MauveTime.now
end
+
end
- Timer.instance.thaw
+
+ ensure
+ #
+ # Thaw the timer
+ #
+ Timer.instance.thaw if Timer.instance.frozen?
end
def expire_transmission_id_cache
@@ -96,14 +96,5 @@ module Mauve
end
end
- class << self
-
- def enq(a)
- instance.buffer.enq(a)
- end
-
- alias push enq
-
- end
end
end