blob: 719bda912e2d2d93508179614ecf628b7d8c8b2b (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
module Oxidized
class Jobs < Array
AVERAGE_DURATION = 5 # initially presume nodes take 5s to complete
MAX_INTER_JOB_GAP = 300 # add job if more than X from last job started
attr_accessor :interval, :max, :want
def initialize max, interval, nodes
@max = max
# Set interval to 1 if interval is 0 (=disabled) so we don't break
# the 'ceil' function
@interval = 1 if @interval == 0
@interval = interval
@nodes = nodes
@last = Time.now.utc
@durations = Array.new @nodes.size, AVERAGE_DURATION
duration AVERAGE_DURATION
super()
end
def push arg
@last = Time.now.utc
super
end
def duration last
if @durations.size > @nodes.size
@durations.slice! @nodes.size...@durations.size
elsif @durations.size < @nodes.size
@durations.fill AVERAGE_DURATION, @durations.size...@nodes.size
end
@durations.push(last).shift
@duration = @durations.inject(:+).to_f / @nodes.size #rolling average
new_count
end
def new_count
@want = ((@nodes.size * @duration) / @interval).ceil
@want = 1 if @want < 1
@want = @nodes.size if @want > @nodes.size
@want = @max if @want > @max
end
def work
# if a) we want less or same amount of threads as we now running
# and b) we want less threads running than the total amount of nodes
# and c) there is more than MAX_INTER_JOB_GAP since last one was started
# then we want one more thread (rationale is to fix hanging thread causing HOLB)
if @want <= size and @want < @nodes.size
@want +=1 if (Time.now.utc - @last) > MAX_INTER_JOB_GAP
end
end
end
end
|