aboutsummaryrefslogtreecommitdiff
path: root/lib/mauve/udp_server.rb
blob: a570e8aba01f008aa18113bd7ca7da4056221132 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# encoding: UTF-8
require 'yaml'
require 'socket'
require 'mauve/datamapper'
require 'mauve/proto'
require 'mauve/alert'
require 'log4r'

module Mauve

  class UDPServer < MauveThread

    include Singleton

    attr_accessor :ip, :port, :sleep_interval

    def initialize
      # 
      # Set the logger up
      #
      @logger = Log4r::Logger.new(self.class.to_s)
      @ip     = "127.0.0.1"
      @port   = 32741
      @socket = nil
      @closing_now = false
      @sleep_interval = 0
    end
   
    def open_socket
      @socket = UDPSocket.new
      @closing_now = false
      
      @logger.debug("Trying to increase Socket::SO_RCVBUF to 10M.")
      old = @socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).unpack("i").first

      @socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, 10*1024*1024)
      new = @socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).unpack("i").first

      raise "Could not increase Socket::SO_RCVBUF.  Had #{old} ended up with #{new}!" if old > new 

      @logger.debug("Successfully increased Socket::SO_RCVBUF from #{old} to #{new}.")

      @socket.bind(@ip, @port)

      @logger.debug("Successfully opened UDP socket on #{@ip}:#{@port}")
    end

    def close_socket
      return if @socket.nil?  or @socket.closed?

      begin
        @socket.close 
      rescue IOError => ex
        # Just in case there is some sort of explosion! 
        @logger.debug("Caught IOError #{ex.to_s}")
      end

      @logger.debug("Successfully closed UDP socket")
    end

    def main_loop
      return if self.should_stop?

      open_socket if @socket.nil? or @socket.closed?

      return if self.should_stop? 

      #
      # TODO: why is/isn't this non-block?
      #
      begin
        # packet = @socket.recvfrom_nonblock(65535)
        packet      = @socket.recvfrom(65535)
        received_at = MauveTime.now
      rescue Errno::EAGAIN, Errno::EWOULDBLOCK
        IO.select([@socket])
        retry unless self.should_stop?
      end

      return if packet.nil?

      @logger.debug("Got new packet: #{packet.inspect}")

      #
      # If we get a zero length packet, and we've been flagged to stop, we stop!
      #
      if packet.first.length == 0 and self.should_stop?
        self.close_socket 
        return
      end

      

      Processor.push([[packet[0], packet[1], received_at]])
    end

    def stop
      @stop = true
      #
      # Triggers loop to close socket.
      #
      UDPSocket.open.send("", 0, @socket.addr[2], @socket.addr[1]) unless @socket.closed?

      super
    end

  end

end