diff options
Diffstat (limited to 'lib/oxidized')
-rw-r--r-- | lib/oxidized/core.rb | 1 | ||||
-rw-r--r-- | lib/oxidized/jobs.rb | 32 | ||||
-rw-r--r-- | lib/oxidized/model/model.rb | 9 | ||||
-rw-r--r-- | lib/oxidized/node.rb | 3 | ||||
-rw-r--r-- | lib/oxidized/nodes.rb | 2 | ||||
-rw-r--r-- | lib/oxidized/output/git.rb | 9 | ||||
-rw-r--r-- | lib/oxidized/worker.rb | 6 |
7 files changed, 48 insertions, 14 deletions
diff --git a/lib/oxidized/core.rb b/lib/oxidized/core.rb index ba46f3a..4c67f4b 100644 --- a/lib/oxidized/core.rb +++ b/lib/oxidized/core.rb @@ -17,6 +17,7 @@ module Oxidized Oxidized.mgr = Manager.new nodes = Nodes.new @worker = Worker.new nodes + trap 'HUP' { nodes.load } if CFG.rest? begin require 'oxidized/web' diff --git a/lib/oxidized/jobs.rb b/lib/oxidized/jobs.rb index 6476744..ff7f92b 100644 --- a/lib/oxidized/jobs.rb +++ b/lib/oxidized/jobs.rb @@ -1,24 +1,46 @@ module Oxidized class Jobs < Array - attr_accessor :interval, :duration, :max, :want + AVERAGE_DURATION = 5 # initially presume nodes take 5s to complete + MAX_INTER_JOB_GAP = 300 # add job if more than X from last job started + attr_accessor :interval, :max, :want + def initialize max, interval, nodes @max = max - #@interval = interval * 60 @interval = interval @nodes = nodes - @duration = 4 - new_count + @last = Time.now.utc + @durations = Array.new @nodes.size, AVERAGE_DURATION + duration AVERAGE_DURATION super() end + + def push arg + @last = Time.now.utc + super + end + def duration last - @duration = (@duration + last) / 2 + @durations.push(last).shift + @duration = @durations.inject(:+).to_f / @nodes.size #rolling average new_count end + def new_count @want = ((@nodes.size * @duration) / @interval).to_i @want = 1 if @want < 1 @want = @nodes.size if @want > @nodes.size @want = @max if @want > @max end + + def work + # if a) we want less or same amount of threads as we now running + # and b) we want less threads running than the total amount of nodes + # and c) there is more than MAX_INTER_JOB_GAP since last one was started + # then we want one more thread (rationale is to fix hanging thread causing HOLB) + if @want <= size and @want < @nodes.size + @want +=1 if (Time.now.utc - @last) > MAX_INTER_JOB_GAP + end + end + end end diff --git a/lib/oxidized/model/model.rb b/lib/oxidized/model/model.rb index d3f54b9..899b40a 100644 --- a/lib/oxidized/model/model.rb +++ b/lib/oxidized/model/model.rb @@ -133,10 +133,10 @@ module Oxidized outputs << out end procs[:pre].each do |pre_proc| - outputs.unshift Oxidized::String.new(instance_eval(&pre_proc)) + outputs.unshift process_cmd_output(instance_eval(&pre_proc), '') end procs[:post].each do |post_proc| - outputs << Oxidized::String.new(instance_eval(&post_proc)) + outputs << process_cmd_output(instance_eval(&post_proc), '') end outputs end @@ -152,9 +152,8 @@ module Oxidized private def process_cmd_output output, name - if output.class != Oxidized::String - output = Oxidized::String.new output - end + output = Oxidized::String.new output if ::String === output + output = Oxidized::String.new '' unless Oxidized::String === output output.set_cmd(name) output end diff --git a/lib/oxidized/node.rb b/lib/oxidized/node.rb index 6bc2b0f..2d41600 100644 --- a/lib/oxidized/node.rb +++ b/lib/oxidized/node.rb @@ -10,7 +10,8 @@ module Oxidized alias :running? :running def initialize opt @name = opt[:name] - @ip = Resolv.getaddress @name + @ip = IPAddr.new(opt[:ip]).to_s rescue nil + @ip ||= Resolv.getaddress @name @group = opt[:group] @input = resolve_input opt @output = resolve_output opt diff --git a/lib/oxidized/nodes.rb b/lib/oxidized/nodes.rb index 032118d..3586b97 100644 --- a/lib/oxidized/nodes.rb +++ b/lib/oxidized/nodes.rb @@ -1,6 +1,6 @@ module Oxidized - require 'oxidized/node' require 'ipaddr' + require 'oxidized/node' class Oxidized::NotSupported < OxidizedError; end class Oxidized::NodeNotFound < OxidizedError; end class Nodes < Array diff --git a/lib/oxidized/output/git.rb b/lib/oxidized/output/git.rb index 0c73638..d5eb8e7 100644 --- a/lib/oxidized/output/git.rb +++ b/lib/oxidized/output/git.rb @@ -1,5 +1,6 @@ module Oxidized class Git < Output + class GitError < OxidizedError; end begin gem 'rugged', '~> 0.21.0' require 'rugged' @@ -71,8 +72,12 @@ class Git < Output end repo = Rugged::Repository.new repo update_repo repo, file, data, @msg, @user, @email - rescue Rugged::OSError, Rugged::RepositoryError - Rugged::Repository.init_at repo, :bare + rescue Rugged::OSError, Rugged::RepositoryError => open_error + begin + Rugged::Repository.init_at repo, :bare + rescue => create_error + raise GitError, "first '#{open_error.message}' was raised while opening git repo, then '#{create_error.message}' was while trying to create git repo" + end retry end diff --git a/lib/oxidized/worker.rb b/lib/oxidized/worker.rb index e274e1e..7ed70ac 100644 --- a/lib/oxidized/worker.rb +++ b/lib/oxidized/worker.rb @@ -7,10 +7,12 @@ module Oxidized @jobs = Jobs.new CFG.threads, CFG.interval, @nodes Thread.abort_on_exception = true end + def work ended = [] @jobs.delete_if { |job| ended << job if not job.alive? } ended.each { |job| process job } + @jobs.work while @jobs.size < @jobs.want Log.debug "Jobs #{@jobs.size}, Want: #{@jobs.want}" # ask for next node in queue non destructive way @@ -24,6 +26,7 @@ module Oxidized @jobs.push Job.new node end end + def process job node = job.node node.last = job @@ -49,6 +52,9 @@ module Oxidized end Log.warn msg end + rescue NodeNotFound + Log.warn "#{node.name} not found, removed while collecting?" end + end end |