aboutsummaryrefslogtreecommitdiff
path: root/lib/mauve/udp_server.rb
diff options
context:
space:
mode:
authorPatrick J Cherry <patrick@bytemark.co.uk>2011-04-13 17:03:16 +0100
committerPatrick J Cherry <patrick@bytemark.co.uk>2011-04-13 17:03:16 +0100
commit89a67770e66d11740948e90a41db6cee0482cf8e (patch)
treebe858515fb789a89d68f94975690ab019813726c /lib/mauve/udp_server.rb
new version.
Diffstat (limited to 'lib/mauve/udp_server.rb')
-rw-r--r--lib/mauve/udp_server.rb109
1 files changed, 109 insertions, 0 deletions
diff --git a/lib/mauve/udp_server.rb b/lib/mauve/udp_server.rb
new file mode 100644
index 0000000..a570e8a
--- /dev/null
+++ b/lib/mauve/udp_server.rb
@@ -0,0 +1,109 @@
+# encoding: UTF-8
+require 'yaml'
+require 'socket'
+require 'mauve/datamapper'
+require 'mauve/proto'
+require 'mauve/alert'
+require 'log4r'
+
+module Mauve
+
+ class UDPServer < MauveThread
+
+ include Singleton
+
+ attr_accessor :ip, :port, :sleep_interval
+
+ def initialize
+ #
+ # Set the logger up
+ #
+ @logger = Log4r::Logger.new(self.class.to_s)
+ @ip = "127.0.0.1"
+ @port = 32741
+ @socket = nil
+ @closing_now = false
+ @sleep_interval = 0
+ end
+
+ def open_socket
+ @socket = UDPSocket.new
+ @closing_now = false
+
+ @logger.debug("Trying to increase Socket::SO_RCVBUF to 10M.")
+ old = @socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).unpack("i").first
+
+ @socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, 10*1024*1024)
+ new = @socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).unpack("i").first
+
+ raise "Could not increase Socket::SO_RCVBUF. Had #{old} ended up with #{new}!" if old > new
+
+ @logger.debug("Successfully increased Socket::SO_RCVBUF from #{old} to #{new}.")
+
+ @socket.bind(@ip, @port)
+
+ @logger.debug("Successfully opened UDP socket on #{@ip}:#{@port}")
+ end
+
+ def close_socket
+ return if @socket.nil? or @socket.closed?
+
+ begin
+ @socket.close
+ rescue IOError => ex
+ # Just in case there is some sort of explosion!
+ @logger.debug("Caught IOError #{ex.to_s}")
+ end
+
+ @logger.debug("Successfully closed UDP socket")
+ end
+
+ def main_loop
+ return if self.should_stop?
+
+ open_socket if @socket.nil? or @socket.closed?
+
+ return if self.should_stop?
+
+ #
+ # TODO: why is/isn't this non-block?
+ #
+ begin
+ # packet = @socket.recvfrom_nonblock(65535)
+ packet = @socket.recvfrom(65535)
+ received_at = MauveTime.now
+ rescue Errno::EAGAIN, Errno::EWOULDBLOCK
+ IO.select([@socket])
+ retry unless self.should_stop?
+ end
+
+ return if packet.nil?
+
+ @logger.debug("Got new packet: #{packet.inspect}")
+
+ #
+ # If we get a zero length packet, and we've been flagged to stop, we stop!
+ #
+ if packet.first.length == 0 and self.should_stop?
+ self.close_socket
+ return
+ end
+
+
+
+ Processor.push([[packet[0], packet[1], received_at]])
+ end
+
+ def stop
+ @stop = true
+ #
+ # Triggers loop to close socket.
+ #
+ UDPSocket.open.send("", 0, @socket.addr[2], @socket.addr[1]) unless @socket.closed?
+
+ super
+ end
+
+ end
+
+end