summaryrefslogtreecommitdiff
path: root/lib/oxidized
diff options
context:
space:
mode:
Diffstat (limited to 'lib/oxidized')
-rw-r--r--lib/oxidized/core.rb1
-rw-r--r--lib/oxidized/jobs.rb32
-rw-r--r--lib/oxidized/model/model.rb9
-rw-r--r--lib/oxidized/node.rb3
-rw-r--r--lib/oxidized/nodes.rb2
-rw-r--r--lib/oxidized/output/git.rb9
-rw-r--r--lib/oxidized/worker.rb6
7 files changed, 48 insertions, 14 deletions
diff --git a/lib/oxidized/core.rb b/lib/oxidized/core.rb
index ba46f3a..4c67f4b 100644
--- a/lib/oxidized/core.rb
+++ b/lib/oxidized/core.rb
@@ -17,6 +17,7 @@ module Oxidized
Oxidized.mgr = Manager.new
nodes = Nodes.new
@worker = Worker.new nodes
+ trap 'HUP' { nodes.load }
if CFG.rest?
begin
require 'oxidized/web'
diff --git a/lib/oxidized/jobs.rb b/lib/oxidized/jobs.rb
index 6476744..ff7f92b 100644
--- a/lib/oxidized/jobs.rb
+++ b/lib/oxidized/jobs.rb
@@ -1,24 +1,46 @@
module Oxidized
class Jobs < Array
- attr_accessor :interval, :duration, :max, :want
+ AVERAGE_DURATION = 5 # initially presume nodes take 5s to complete
+ MAX_INTER_JOB_GAP = 300 # add job if more than X from last job started
+ attr_accessor :interval, :max, :want
+
def initialize max, interval, nodes
@max = max
- #@interval = interval * 60
@interval = interval
@nodes = nodes
- @duration = 4
- new_count
+ @last = Time.now.utc
+ @durations = Array.new @nodes.size, AVERAGE_DURATION
+ duration AVERAGE_DURATION
super()
end
+
+ def push arg
+ @last = Time.now.utc
+ super
+ end
+
def duration last
- @duration = (@duration + last) / 2
+ @durations.push(last).shift
+ @duration = @durations.inject(:+).to_f / @nodes.size #rolling average
new_count
end
+
def new_count
@want = ((@nodes.size * @duration) / @interval).to_i
@want = 1 if @want < 1
@want = @nodes.size if @want > @nodes.size
@want = @max if @want > @max
end
+
+ def work
+ # if a) we want less or same amount of threads as we now running
+ # and b) we want less threads running than the total amount of nodes
+ # and c) there is more than MAX_INTER_JOB_GAP since last one was started
+ # then we want one more thread (rationale is to fix hanging thread causing HOLB)
+ if @want <= size and @want < @nodes.size
+ @want +=1 if (Time.now.utc - @last) > MAX_INTER_JOB_GAP
+ end
+ end
+
end
end
diff --git a/lib/oxidized/model/model.rb b/lib/oxidized/model/model.rb
index d3f54b9..899b40a 100644
--- a/lib/oxidized/model/model.rb
+++ b/lib/oxidized/model/model.rb
@@ -133,10 +133,10 @@ module Oxidized
outputs << out
end
procs[:pre].each do |pre_proc|
- outputs.unshift Oxidized::String.new(instance_eval(&pre_proc))
+ outputs.unshift process_cmd_output(instance_eval(&pre_proc), '')
end
procs[:post].each do |post_proc|
- outputs << Oxidized::String.new(instance_eval(&post_proc))
+ outputs << process_cmd_output(instance_eval(&post_proc), '')
end
outputs
end
@@ -152,9 +152,8 @@ module Oxidized
private
def process_cmd_output output, name
- if output.class != Oxidized::String
- output = Oxidized::String.new output
- end
+ output = Oxidized::String.new output if ::String === output
+ output = Oxidized::String.new '' unless Oxidized::String === output
output.set_cmd(name)
output
end
diff --git a/lib/oxidized/node.rb b/lib/oxidized/node.rb
index 6bc2b0f..2d41600 100644
--- a/lib/oxidized/node.rb
+++ b/lib/oxidized/node.rb
@@ -10,7 +10,8 @@ module Oxidized
alias :running? :running
def initialize opt
@name = opt[:name]
- @ip = Resolv.getaddress @name
+ @ip = IPAddr.new(opt[:ip]).to_s rescue nil
+ @ip ||= Resolv.getaddress @name
@group = opt[:group]
@input = resolve_input opt
@output = resolve_output opt
diff --git a/lib/oxidized/nodes.rb b/lib/oxidized/nodes.rb
index 032118d..3586b97 100644
--- a/lib/oxidized/nodes.rb
+++ b/lib/oxidized/nodes.rb
@@ -1,6 +1,6 @@
module Oxidized
- require 'oxidized/node'
require 'ipaddr'
+ require 'oxidized/node'
class Oxidized::NotSupported < OxidizedError; end
class Oxidized::NodeNotFound < OxidizedError; end
class Nodes < Array
diff --git a/lib/oxidized/output/git.rb b/lib/oxidized/output/git.rb
index 0c73638..d5eb8e7 100644
--- a/lib/oxidized/output/git.rb
+++ b/lib/oxidized/output/git.rb
@@ -1,5 +1,6 @@
module Oxidized
class Git < Output
+ class GitError < OxidizedError; end
begin
gem 'rugged', '~> 0.21.0'
require 'rugged'
@@ -71,8 +72,12 @@ class Git < Output
end
repo = Rugged::Repository.new repo
update_repo repo, file, data, @msg, @user, @email
- rescue Rugged::OSError, Rugged::RepositoryError
- Rugged::Repository.init_at repo, :bare
+ rescue Rugged::OSError, Rugged::RepositoryError => open_error
+ begin
+ Rugged::Repository.init_at repo, :bare
+ rescue => create_error
+ raise GitError, "first '#{open_error.message}' was raised while opening git repo, then '#{create_error.message}' was while trying to create git repo"
+ end
retry
end
diff --git a/lib/oxidized/worker.rb b/lib/oxidized/worker.rb
index e274e1e..7ed70ac 100644
--- a/lib/oxidized/worker.rb
+++ b/lib/oxidized/worker.rb
@@ -7,10 +7,12 @@ module Oxidized
@jobs = Jobs.new CFG.threads, CFG.interval, @nodes
Thread.abort_on_exception = true
end
+
def work
ended = []
@jobs.delete_if { |job| ended << job if not job.alive? }
ended.each { |job| process job }
+ @jobs.work
while @jobs.size < @jobs.want
Log.debug "Jobs #{@jobs.size}, Want: #{@jobs.want}"
# ask for next node in queue non destructive way
@@ -24,6 +26,7 @@ module Oxidized
@jobs.push Job.new node
end
end
+
def process job
node = job.node
node.last = job
@@ -49,6 +52,9 @@ module Oxidized
end
Log.warn msg
end
+ rescue NodeNotFound
+ Log.warn "#{node.name} not found, removed while collecting?"
end
+
end
end