diff options
Diffstat (limited to 'lib/oxidized')
| -rw-r--r-- | lib/oxidized/jobs.rb | 24 | ||||
| -rw-r--r-- | lib/oxidized/worker.rb | 6 | 
2 files changed, 22 insertions, 8 deletions
| diff --git a/lib/oxidized/jobs.rb b/lib/oxidized/jobs.rb index 3342679..ff7f92b 100644 --- a/lib/oxidized/jobs.rb +++ b/lib/oxidized/jobs.rb @@ -1,28 +1,46 @@  module Oxidized    class Jobs < Array -    AVERAGE_DURATION = 5 # initially presume nodes take 5s to complete +    AVERAGE_DURATION  = 5   # initially presume nodes take 5s to complete +    MAX_INTER_JOB_GAP = 300 # add job if more than X from last job started      attr_accessor :interval, :max, :want +      def initialize max, interval, nodes        @max       = max        @interval  = interval        @nodes     = nodes +      @last      = Time.now.utc        @durations = Array.new @nodes.size, AVERAGE_DURATION        duration AVERAGE_DURATION        super()      end + +    def push arg +      @last = Time.now.utc +      super +    end +      def duration last        @durations.push(last).shift        @duration = @durations.inject(:+).to_f / @nodes.size #rolling average        new_count      end +      def new_count        @want = ((@nodes.size * @duration) / @interval).to_i        @want = 1 if @want < 1        @want = @nodes.size if @want > @nodes.size        @want = @max if @want > @max      end -    def add_job -      @want += 1 + +    def work +      # if   a) we want less or same amount of threads as we now running +      # and  b) we want less threads running than the total amount of nodes +      # and  c) there is more than MAX_INTER_JOB_GAP since last one was started +      # then we want one more thread (rationale is to fix hanging thread causing HOLB) +      if @want <= size and @want < @nodes.size +        @want +=1 if (Time.now.utc - @last) > MAX_INTER_JOB_GAP +      end      end +    end  end diff --git a/lib/oxidized/worker.rb b/lib/oxidized/worker.rb index 99fc8b8..7ed70ac 100644 --- a/lib/oxidized/worker.rb +++ b/lib/oxidized/worker.rb @@ -2,12 +2,9 @@ module Oxidized    require 'oxidized/job'    require 'oxidized/jobs'    class Worker -    MAX_INTER_JOB_GAP = 300 -      def initialize nodes        @nodes   = nodes        @jobs    = Jobs.new CFG.threads, CFG.interval, @nodes -      @last    = Time.now.utc        Thread.abort_on_exception = true      end @@ -15,7 +12,7 @@ module Oxidized        ended = []        @jobs.delete_if { |job| ended << job if not job.alive? }        ended.each      { |job| process job } -      @jobs.add_job if Time.now.utc - @last > MAX_INTER_JOB_GAP +      @jobs.work        while @jobs.size < @jobs.want          Log.debug "Jobs #{@jobs.size}, Want: #{@jobs.want}"          # ask for next node in queue non destructive way @@ -26,7 +23,6 @@ module Oxidized          # shift nodes and get the next node          node = @nodes.get          node.running? ? next : node.running = true -        @last = Time.now.utc          @jobs.push Job.new node        end      end | 
