aboutsummaryrefslogtreecommitdiff
path: root/lib/mauve/udp_server.rb
blob: 99bfab1939f2c0ed197d31ef372bb9a3b36a3cee (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# encoding: UTF-8
require 'yaml'
require 'socket'
require 'mauve/datamapper'
require 'mauve/proto'
require 'mauve/alert'
require 'ipaddr'

module Mauve

  class UDPServer < MauveThread

    include Singleton

    attr_reader   :ip, :port

    def initialize
      super
      self.ip     = "127.0.0.1"
      self.port   = 32741
      @socket = nil
    end
 
    def ip=(i)
      raise ArgumentError, "ip must be a string" unless i.is_a?(String)
      @ip = IPAddr.new(i)
    end
 
    def port=(pr)
      raise ArgumentError, "port must be an integer between 0 and #{2**16-1}" unless pr.is_a?(Integer) and pr < 2**16 and pr > 0
      @port = pr
    end

    def open_socket
      #
      # Specify the family when opening the socket.
      #
      @socket = UDPSocket.new(@ip.family)
      
      logger.debug("Trying to increase Socket::SO_RCVBUF to 10M.")
      old = @socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).unpack("i").first

      @socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, 10*1024*1024)
      new = @socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).unpack("i").first

      raise "Could not increase Socket::SO_RCVBUF.  Had #{old} ended up with #{new}!" if old > new 

      logger.debug("Successfully increased Socket::SO_RCVBUF from #{old} to #{new}.")

      @socket.bind(@ip.to_s, @port)

      logger.info("Opened socket on #{@ip.to_s}:#{@port}")
    end

    def close_socket
      return if @socket.nil?  or @socket.closed?

      begin
        @socket.close 
      rescue IOError => ex
        # Just in case there is some sort of explosion! 
        logger.error "Caught IOError #{ex.to_s}"
        logger.debug ex.backtrace.join("\n")
      end

      logger.info("Closed socket")
    end

    def main_loop
      return if self.should_stop?

      open_socket if @socket.nil? or @socket.closed?

      return if self.should_stop? 

      #
      # TODO: why is/isn't this non-block?
      #
      i = 0
      begin
#        packet      = @socket.recvfrom_nonblock(65535)
        packet      = @socket.recvfrom(65535)
        received_at = Time.now
      rescue Errno::EAGAIN, Errno::EWOULDBLOCK => ex
        IO.select([@socket])
        retry unless self.should_stop?
      end

      return if packet.nil?

      logger.debug("Got new packet: #{packet.inspect}")

      #
      # If we get a zero length packet, and we've been flagged to stop, we stop!
      #
      if packet.first.length == 0 and self.should_stop?
        self.close_socket 
        return
      end

      #
      # Push packet onto central queue
      #
      Server.packet_push([packet[0], packet[1], received_at])
    end

    def stop
      @stop = true
      #
      # Triggers loop to close socket.
      #
      UDPSocket.open(Socket.const_get(@socket.addr[0])).send("", 0, @socket.addr[2], @socket.addr[1]) unless @socket.nil? or @socket.closed?

      super
    end

  end

end