aboutsummaryrefslogtreecommitdiff
path: root/lib/mauve/processor.rb
diff options
context:
space:
mode:
authorPatrick J Cherry <patrick@bytemark.co.uk>2011-04-13 17:03:16 +0100
committerPatrick J Cherry <patrick@bytemark.co.uk>2011-04-13 17:03:16 +0100
commit89a67770e66d11740948e90a41db6cee0482cf8e (patch)
treebe858515fb789a89d68f94975690ab019813726c /lib/mauve/processor.rb
new version.
Diffstat (limited to 'lib/mauve/processor.rb')
-rw-r--r--lib/mauve/processor.rb109
1 files changed, 109 insertions, 0 deletions
diff --git a/lib/mauve/processor.rb b/lib/mauve/processor.rb
new file mode 100644
index 0000000..414f640
--- /dev/null
+++ b/lib/mauve/processor.rb
@@ -0,0 +1,109 @@
+# encoding: UTF-8
+
+require 'mauve/mauve_thread'
+
+module Mauve
+
+ class Processor < MauveThread
+
+ include Singleton
+
+ attr_accessor :buffer, :transmission_cache_expire_time, :sleep_interval
+
+ def initialize
+ # Buffer for UDP socket packets.
+ @buffer = Queue.new
+
+ # Set the logger up
+ @logger = Log4r::Logger.new(self.class.to_s)
+
+ #
+ # Set up the transmission id cache
+ #
+ @transmission_id_cache = {}
+ @transmission_cache_expire_time = 300
+ @sleep_interval = 1
+ end
+
+ def main_loop
+
+ sz = @buffer.size
+
+ return if sz == 0
+
+ Timer.instance.freeze
+
+ logger.info("Buffer has #{sz} packets waiting...")
+
+ triplets = []
+ #
+ # Only do the loop a maximum of 10 times every @sleep_interval seconds
+ #
+ (sz > 20 ? 20 : sz).times do
+ triplets += @buffer.deq
+ end
+
+ triplets.each do |data, client, received_at|
+
+ @logger.debug("Got #{data.inspect} from #{client.inspect}")
+
+ ip_source = "#{client[3]}:#{client[1]}"
+ update = Proto::AlertUpdate.new
+
+ begin
+ update.parse_from_string(data)
+
+ if @transmission_id_cache[update.transmission_id.to_s]
+ @logger.debug("Ignoring duplicate transmission id #{data.transmission_id}")
+ #
+ # Continue with next packet.
+ #
+ next
+ end
+
+ @logger.debug "Update #{update.transmission_id} sent at #{update.transmission_time} from "+
+ "'#{update.source}'@#{ip_source} alerts #{update.alert.length}"
+
+ Alert.receive_update(update, received_at)
+
+ rescue Protobuf::InvalidWireType,
+ NotImplementedError,
+ DataObjects::IntegrityError => ex
+
+ @logger.error "#{ex} (#{ex.class}) while parsing #{data.length} bytes "+
+ "starting '#{data[0..16].inspect}' from #{ip_source}"
+
+ @logger.debug ex.backtrace.join("\n")
+
+ ensure
+ @transmission_id_cache[update.transmission_id.to_s] = MauveTime.now
+
+ end
+ end
+ Timer.instance.thaw
+ end
+
+ def expire_transmission_id_cache
+ now = MauveTime.now
+ to_delete = []
+
+ @transmission_id_cache.each do |tid, received_at|
+ to_delete << tid if (now - received_at) > @transmission_cache_expire_time
+ end
+
+ to_delete.each do |tid|
+ @transmission_id_cache.delete(tid)
+ end
+ end
+
+ class << self
+
+ def enq(a)
+ instance.buffer.enq(a)
+ end
+
+ alias push enq
+
+ end
+ end
+end