1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
|
# encoding: UTF-8
require 'yaml'
require 'socket'
require 'mauve/datamapper'
require 'mauve/proto'
require 'mauve/alert'
require 'mauve/mauve_thread'
require 'ipaddr'
module Mauve
#
# This is the thread that accepts the packets.
#
class UDPServer < MauveThread
include Singleton
attr_reader :ip, :port
# Yup. Creates a new UDPServer.
#
# Defaults:
# * listening IP: 127.0.0.1
# * listening port: 32741
#
def initialize
#
# Set up some defaults.
#
self.ip = "127.0.0.1"
self.port = 32741
@socket = nil
super
end
#
# This sets the IP which the server will listen on.
#
# @param [String] i The new IP
# @return [IPAddr]
#
def ip=(i)
raise ArgumentError, "ip must be a string" unless i.is_a?(String)
@ip = IPAddr.new(i)
end
# Sets the listening port
#
# @param [Integer] pr The new port
# @return [Integer]
#
def port=(pr)
raise ArgumentError, "port must be an integer between 0 and #{2**16-1}" unless pr.is_a?(Integer) and pr < 2**16 and pr > 0
@port = pr
end
# This stops the UDP server, by signalling to the thread to stop, and
# sending a zero-length packet to the socket.
#
def stop
@stop = true
#
# Triggers loop to close socket.
#
UDPSocket.open(Socket.const_get(@socket.addr[0])).send("", 0, @socket.addr[2], @socket.addr[1]) unless @socket.nil? or @socket.closed?
super
end
private
#
# This opens the socket for listening.
#
# It tries to increase the default receiving buffer to 10M, and will warn
# if this fails to increase the buffer at all.
#
# @return [Nilclass]
#
def open_socket
#
# Specify the family when opening the socket.
#
@socket = UDPSocket.new(@ip.family)
logger.debug("Trying to increase Socket::SO_RCVBUF to 10M.")
old = @socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).unpack("i").first
@socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, 10*1024*1024)
new = @socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).unpack("i").first
logger.warn "Could not increase Socket::SO_RCVBUF. Had #{old} ended up with #{new}!" if old > new
logger.debug("Successfully increased Socket::SO_RCVBUF from #{old} to #{new}.")
@socket.bind(@ip.to_s, @port)
logger.info("Opened socket on #{@ip.to_s}:#{@port}")
end
# This closes the socket. IOErrors are caught and logged.
#
def close_socket
return if @socket.nil? or @socket.closed?
begin
@socket.close
rescue IOError => ex
# Just in case there is some sort of explosion!
logger.error "Caught IOError #{ex.to_s}"
logger.debug ex.backtrace.join("\n")
end
logger.info("Closed socket")
end
# This is the main loop. It receives from the UDP port, and records the
# time the packet arrives. It then pushes the packet onto the Server's
# packet buffer for the processor to pick up later.
#
# If a zero-length packet is received, and the thread has been signalled to
# stop, then the socket is closed, and the thread exits.
#
def main_loop
return if self.should_stop?
open_socket if @socket.nil? or @socket.closed?
return if self.should_stop?
#
# TODO: why is/isn't this non-block?
#
i = 0
begin
# packet = @socket.recvfrom_nonblock(65535)
packet = @socket.recvfrom(65535)
received_at = Time.now
rescue Errno::EAGAIN, Errno::EWOULDBLOCK => ex
IO.select([@socket])
retry unless self.should_stop?
end
return if packet.nil?
logger.debug("Got new packet: #{packet.inspect}")
#
# If we get a zero length packet, and we've been flagged to stop, we stop!
#
if packet.first.length == 0 and self.should_stop?
close_socket
return
end
#
# Push packet onto central queue
#
Server.packet_push([packet[0], packet[1], received_at])
end
end
end
|