summaryrefslogtreecommitdiff
path: root/lib/oxidized/nodes.rb
blob: 72a2dc46a1a0c4f7e11f3b2ff60d731262de4a4f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
module Oxidized
  require 'ipaddr'
  require 'oxidized/node'
  class Oxidized::NotSupported < OxidizedError; end
  class Oxidized::NodeNotFound < OxidizedError; end
  class Nodes < Array
    attr_accessor :source, :jobs
    alias :put :unshift
    def load node_want=nil
      with_lock do
        new = []
        @source = Oxidized.config.source.default
        Oxidized.mgr.add_source @source
        Oxidized.logger.info "lib/oxidized/nodes.rb: Loading nodes"
        Oxidized.mgr.source[@source].new.load.each do |node|
          # we want to load specific node(s), not all of them
          next unless node_want? node_want, node
          begin
            _node = Node.new node
            new.push _node
          rescue ModelNotFound => err
            Oxidized.logger.error "node %s raised %s with message '%s'" % [node, err.class, err.message]
          rescue Resolv::ResolvError => err
            Oxidized.logger.error "node %s is not resolvable, raised %s with message '%s'" % [node, err.class, err.message]
          end
        end
        size == 0 ? replace(new) : update_nodes(new)
        Oxidized.logger.info "lib/oxidized/nodes.rb: Loaded #{size} nodes"
      end
    end

    def node_want? node_want, node
      return true unless node_want
      node_want_ip = (IPAddr.new(node_want) rescue false)
      name_is_ip   = (IPAddr.new(node[:name]) rescue false)
      if name_is_ip and node_want_ip == node[:name]
        true
      elsif node[:ip] and node_want_ip == node[:ip]
        true
      elsif node_want.match node[:name]
        true unless name_is_ip
      end
    end


    def list
      with_lock do
        map { |e| e.serialize }
      end
    end

    def show node
      with_lock do
        i = find_node_index node
        self[i].serialize
      end
    end

    def fetch node_name, group
      yield_node_output(node_name) do |node, output|
        output.fetch node, group
      end
    end

    # @param node [String] name of the node moved into the head of array
    def next node, opt={}
      if waiting.find_node_index(node)
        with_lock do
          n = del node
          n.user = opt['user']
          n.msg  = opt['msg']
          n.from = opt['from']
          # set last job to nil so that the node is picked for immediate update
          n.last = nil
          put n
          jobs.want += 1 if Config.next_adds_job
        end
      end
    end
    alias :top :next

    # @return [String] node from the head of the array
    def get
      with_lock do
        (self << shift).last
      end
    end

    # @param node node whose index number in Nodes to find
    # @return [Fixnum] index number of node in Nodes
    def find_node_index node
      find_index node or raise Oxidized::NodeNotFound, "unable to find '#{node}'"
    end

    def version node_name, group
      yield_node_output(node_name) do |node, output|
        output.version node, group
      end
    end

    def get_version node_name, group, oid
      yield_node_output(node_name) do |node, output|
        output.get_version node, group, oid
      end
    end

    def get_diff node_name, group, oid1, oid2
      yield_node_output(node_name) do |node, output|
        output.get_diff node, group, oid1, oid2
      end
    end

    private

    def initialize opts={}
      super()
      node = opts.delete :node
      @mutex= Mutex.new  # we compete for the nodes with webapi thread
      if nodes = opts.delete(:nodes)
        replace nodes
      else
        load node
      end
    end

    def with_lock &block
      @mutex.synchronize(&block)
    end

    def find_index node
      index { |e| e.name == node or e.ip == node}
    end

    # @param node node which is removed from nodes list
    # @return [Node] deleted node
    def del node
      delete_at find_node_index(node)
    end

    # @return [Nodes] list of nodes running now
    def running
      Nodes.new :nodes => select { |node| node.running? }
    end

    # @return [Nodes] list of nodes waiting (not running)
    def waiting
      Nodes.new :nodes => select { |node| not node.running? }
    end

    # walks list of new nodes, if old node contains same name, adds last and
    # stats information from old to new.
    #
    # @todo can we trust name to be unique identifier, what about when groups are used?
    # @param [Array] nodes Array of nodes used to replace+update old
    def update_nodes nodes
      old = self.dup
      replace(nodes)
      each do |node|
        begin
          if i = old.find_node_index(node.name)
            node.stats = old[i].stats
            node.last  = old[i].last
          end
        rescue  Oxidized::NodeNotFound
        end
      end
      sort_by! { |x| x.last.nil? ? Time.new(0) : x.last.end }
    end

    def yield_node_output(node_name)
      with_lock do
        node = find { |n| n.name == node_name }
        output = node.output.new
        raise Oxidized::NotSupported unless output.respond_to? :fetch
        yield node, output
      end
    end
  end
end