aboutsummaryrefslogtreecommitdiff
path: root/lib/mauve/udp_server.rb
blob: 9dafdd066b1bbba45626e9b60d21e9c727090241 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# encoding: UTF-8
require 'yaml'
require 'socket'
require 'mauve/datamapper'
require 'mauve/proto'
require 'mauve/alert'
require 'ipaddr'

module Mauve

  #
  # This is the thread that accepts the packets.
  #
  class UDPServer < MauveThread

    include Singleton

    attr_reader   :ip, :port

    # Yup.  Creates a new UDPServer.
    #
    # Defaults:
    #   * listening IP: 127.0.0.1
    #   * listening port: 32741
    #
    def initialize
      #
      # Set up some defaults.
      #
      self.ip     = "127.0.0.1"
      self.port   = 32741
      @socket = nil

      super
    end
 
    #
    # This sets the IP which the server will listen on.
    # 
    # @param [String] i The new IP 
    # @return [IPAddr]
    #
    def ip=(i)
      raise ArgumentError, "ip must be a string" unless i.is_a?(String)
      @ip = IPAddr.new(i)
    end
 
    # Sets the listening port
    #
    # @param [Integer] pr The new port
    # @return [Integer]
    #
    def port=(pr)
      raise ArgumentError, "port must be an integer between 0 and #{2**16-1}" unless pr.is_a?(Integer) and pr < 2**16 and pr > 0
      @port = pr
    end
   
    # This stops the UDP server, by signalling to the thread to stop, and
    # sending a zero-length packet to the socket.
    #
    def stop
      @stop = true
      #
      # Triggers loop to close socket.
      #
      UDPSocket.open(Socket.const_get(@socket.addr[0])).send("", 0, @socket.addr[2], @socket.addr[1]) unless @socket.nil? or @socket.closed?

      super
    end

    private

    #
    # This opens the socket for listening.
    #
    # It tries to increase the default receiving buffer to 10M, and will warn
    # if this fails to increase the buffer at all.
    #
    # @return [Nilclass]
    #
    def open_socket
      #
      # Specify the family when opening the socket.
      #
      @socket = UDPSocket.new(@ip.family)
      
      logger.debug("Trying to increase Socket::SO_RCVBUF to 10M.")
      old = @socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).unpack("i").first

      @socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, 10*1024*1024)
      new = @socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).unpack("i").first

      logger.warn "Could not increase Socket::SO_RCVBUF.  Had #{old} ended up with #{new}!" if old > new 

      logger.debug("Successfully increased Socket::SO_RCVBUF from #{old} to #{new}.")

      @socket.bind(@ip.to_s, @port)

      logger.info("Opened socket on #{@ip.to_s}:#{@port}")
    end

    # This closes the socket.  IOErrors are caught and logged.
    #
    def close_socket
      return if @socket.nil?  or @socket.closed?

      begin
        @socket.close 
      rescue IOError => ex
        # Just in case there is some sort of explosion! 
        logger.error "Caught IOError #{ex.to_s}"
        logger.debug ex.backtrace.join("\n")
      end

      logger.info("Closed socket")
    end

    # This is the main loop.  It receives from the UDP port, and records the
    # time the packet arrives.  It then pushes the packet onto the Server's
    # packet buffer for the processor to pick up later.
    #
    # If a zero-length packet is received, and the thread has been signalled to
    # stop, then the socket is closed, and the thread exits.
    #
    def main_loop
      return if self.should_stop?

      open_socket if @socket.nil? or @socket.closed?

      return if self.should_stop? 

      #
      # TODO: why is/isn't this non-block?
      #
      i = 0
      begin
#        packet      = @socket.recvfrom_nonblock(65535)
        packet      = @socket.recvfrom(65535)
        received_at = Time.now
      rescue Errno::EAGAIN, Errno::EWOULDBLOCK => ex
        IO.select([@socket])
        retry unless self.should_stop?
      end

      return if packet.nil?

      logger.debug("Got new packet: #{packet.inspect}")

      #
      # If we get a zero length packet, and we've been flagged to stop, we stop!
      #
      if packet.first.length == 0 and self.should_stop?
        close_socket 
        return
      end

      #
      # Push packet onto central queue
      #
      Server.packet_push([packet[0], packet[1], received_at])
    end

  end

end