blob: ff7f92bc2d80873f5b28d653a5ae3a3bd69f3fd7 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
module Oxidized
class Jobs < Array
AVERAGE_DURATION = 5 # initially presume nodes take 5s to complete
MAX_INTER_JOB_GAP = 300 # add job if more than X from last job started
attr_accessor :interval, :max, :want
def initialize max, interval, nodes
@max = max
@interval = interval
@nodes = nodes
@last = Time.now.utc
@durations = Array.new @nodes.size, AVERAGE_DURATION
duration AVERAGE_DURATION
super()
end
def push arg
@last = Time.now.utc
super
end
def duration last
@durations.push(last).shift
@duration = @durations.inject(:+).to_f / @nodes.size #rolling average
new_count
end
def new_count
@want = ((@nodes.size * @duration) / @interval).to_i
@want = 1 if @want < 1
@want = @nodes.size if @want > @nodes.size
@want = @max if @want > @max
end
def work
# if a) we want less or same amount of threads as we now running
# and b) we want less threads running than the total amount of nodes
# and c) there is more than MAX_INTER_JOB_GAP since last one was started
# then we want one more thread (rationale is to fix hanging thread causing HOLB)
if @want <= size and @want < @nodes.size
@want +=1 if (Time.now.utc - @last) > MAX_INTER_JOB_GAP
end
end
end
end
|