1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
# encoding: UTF-8
require 'yaml'
require 'socket'
require 'mauve/datamapper'
require 'mauve/proto'
require 'mauve/alert'
require 'log4r'
module Mauve
class UDPServer < MauveThread
include Singleton
attr_accessor :ip, :port, :sleep_interval
def initialize
#
# Set the logger up
#
@logger = Log4r::Logger.new(self.class.to_s)
@ip = "127.0.0.1"
@port = 32741
@socket = nil
@closing_now = false
@sleep_interval = 0
end
def open_socket
@socket = UDPSocket.new
@closing_now = false
@logger.debug("Trying to increase Socket::SO_RCVBUF to 10M.")
old = @socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).unpack("i").first
@socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, 10*1024*1024)
new = @socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).unpack("i").first
raise "Could not increase Socket::SO_RCVBUF. Had #{old} ended up with #{new}!" if old > new
@logger.debug("Successfully increased Socket::SO_RCVBUF from #{old} to #{new}.")
@socket.bind(@ip, @port)
@logger.debug("Successfully opened UDP socket on #{@ip}:#{@port}")
end
def close_socket
return if @socket.nil? or @socket.closed?
begin
@socket.close
rescue IOError => ex
# Just in case there is some sort of explosion!
@logger.debug("Caught IOError #{ex.to_s}")
end
@logger.debug("Successfully closed UDP socket")
end
def main_loop
return if self.should_stop?
open_socket if @socket.nil? or @socket.closed?
return if self.should_stop?
#
# TODO: why is/isn't this non-block?
#
begin
# packet = @socket.recvfrom_nonblock(65535)
packet = @socket.recvfrom(65535)
received_at = MauveTime.now
rescue Errno::EAGAIN, Errno::EWOULDBLOCK
IO.select([@socket])
retry unless self.should_stop?
end
return if packet.nil?
@logger.debug("Got new packet: #{packet.inspect}")
#
# If we get a zero length packet, and we've been flagged to stop, we stop!
#
if packet.first.length == 0 and self.should_stop?
self.close_socket
return
end
Processor.push([[packet[0], packet[1], received_at]])
end
def stop
@stop = true
#
# Triggers loop to close socket.
#
UDPSocket.open.send("", 0, @socket.addr[2], @socket.addr[1]) unless @socket.closed?
super
end
end
end
|