diff options
author | Patrick J Cherry <patrick@bytemark.co.uk> | 2011-04-13 17:03:16 +0100 |
---|---|---|
committer | Patrick J Cherry <patrick@bytemark.co.uk> | 2011-04-13 17:03:16 +0100 |
commit | 89a67770e66d11740948e90a41db6cee0482cf8e (patch) | |
tree | be858515fb789a89d68f94975690ab019813726c /lib/mauve/udp_server.rb |
new version.
Diffstat (limited to 'lib/mauve/udp_server.rb')
-rw-r--r-- | lib/mauve/udp_server.rb | 109 |
1 files changed, 109 insertions, 0 deletions
diff --git a/lib/mauve/udp_server.rb b/lib/mauve/udp_server.rb new file mode 100644 index 0000000..a570e8a --- /dev/null +++ b/lib/mauve/udp_server.rb @@ -0,0 +1,109 @@ +# encoding: UTF-8 +require 'yaml' +require 'socket' +require 'mauve/datamapper' +require 'mauve/proto' +require 'mauve/alert' +require 'log4r' + +module Mauve + + class UDPServer < MauveThread + + include Singleton + + attr_accessor :ip, :port, :sleep_interval + + def initialize + # + # Set the logger up + # + @logger = Log4r::Logger.new(self.class.to_s) + @ip = "127.0.0.1" + @port = 32741 + @socket = nil + @closing_now = false + @sleep_interval = 0 + end + + def open_socket + @socket = UDPSocket.new + @closing_now = false + + @logger.debug("Trying to increase Socket::SO_RCVBUF to 10M.") + old = @socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).unpack("i").first + + @socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, 10*1024*1024) + new = @socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).unpack("i").first + + raise "Could not increase Socket::SO_RCVBUF. Had #{old} ended up with #{new}!" if old > new + + @logger.debug("Successfully increased Socket::SO_RCVBUF from #{old} to #{new}.") + + @socket.bind(@ip, @port) + + @logger.debug("Successfully opened UDP socket on #{@ip}:#{@port}") + end + + def close_socket + return if @socket.nil? or @socket.closed? + + begin + @socket.close + rescue IOError => ex + # Just in case there is some sort of explosion! + @logger.debug("Caught IOError #{ex.to_s}") + end + + @logger.debug("Successfully closed UDP socket") + end + + def main_loop + return if self.should_stop? + + open_socket if @socket.nil? or @socket.closed? + + return if self.should_stop? + + # + # TODO: why is/isn't this non-block? + # + begin + # packet = @socket.recvfrom_nonblock(65535) + packet = @socket.recvfrom(65535) + received_at = MauveTime.now + rescue Errno::EAGAIN, Errno::EWOULDBLOCK + IO.select([@socket]) + retry unless self.should_stop? + end + + return if packet.nil? + + @logger.debug("Got new packet: #{packet.inspect}") + + # + # If we get a zero length packet, and we've been flagged to stop, we stop! + # + if packet.first.length == 0 and self.should_stop? + self.close_socket + return + end + + + + Processor.push([[packet[0], packet[1], received_at]]) + end + + def stop + @stop = true + # + # Triggers loop to close socket. + # + UDPSocket.open.send("", 0, @socket.addr[2], @socket.addr[1]) unless @socket.closed? + + super + end + + end + +end |