# encoding: UTF-8 require 'yaml' require 'socket' require 'mauve/datamapper' require 'mauve/proto' require 'mauve/alert' require 'ipaddr' module Mauve class UDPServer < MauveThread include Singleton attr_reader :ip, :port def initialize super self.ip = "127.0.0.1" self.port = 32741 @socket = nil end def ip=(i) raise ArgumentError, "ip must be a string" unless i.is_a?(String) @ip = IPAddr.new(i) end def port=(pr) raise ArgumentError, "port must be an integer between 0 and #{2**16-1}" unless pr.is_a?(Integer) and pr < 2**16 and pr > 0 @port = pr end def open_socket # # Specify the family when opening the socket. # @socket = UDPSocket.new(@ip.family) logger.debug("Trying to increase Socket::SO_RCVBUF to 10M.") old = @socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).unpack("i").first @socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, 10*1024*1024) new = @socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).unpack("i").first raise "Could not increase Socket::SO_RCVBUF. Had #{old} ended up with #{new}!" if old > new logger.debug("Successfully increased Socket::SO_RCVBUF from #{old} to #{new}.") @socket.bind(@ip.to_s, @port) logger.info("Opened socket on #{@ip.to_s}:#{@port}") end def close_socket return if @socket.nil? or @socket.closed? begin @socket.close rescue IOError => ex # Just in case there is some sort of explosion! logger.error "Caught IOError #{ex.to_s}" logger.debug ex.backtrace.join("\n") end logger.info("Closed socket") end def main_loop return if self.should_stop? open_socket if @socket.nil? or @socket.closed? return if self.should_stop? # # TODO: why is/isn't this non-block? # i = 0 begin # packet = @socket.recvfrom_nonblock(65535) packet = @socket.recvfrom(65535) received_at = MauveTime.now rescue Errno::EAGAIN, Errno::EWOULDBLOCK => ex IO.select([@socket]) retry unless self.should_stop? end return if packet.nil? logger.debug("Got new packet: #{packet.inspect}") # # If we get a zero length packet, and we've been flagged to stop, we stop! # if packet.first.length == 0 and self.should_stop? self.close_socket return end # # Push packet onto central queue # Server.packet_push([packet[0], packet[1], received_at]) end def stop @stop = true # # Triggers loop to close socket. # UDPSocket.open(Socket.const_get(@socket.addr[0])).send("", 0, @socket.addr[2], @socket.addr[1]) unless @socket.nil? or @socket.closed? super end end end