summaryrefslogtreecommitdiff
path: root/lib/oxidized/nodes.rb
blob: 24f52aa326ec078f8d2d85ce96181c564e0b811b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
module Oxidized
 require 'oxidized/node'
 require 'ipaddr'
 class Oxidized::NotSupported < OxidizedError; end
 class Oxidized::NodeNotFound < OxidizedError; end
  class Nodes < Array
    attr_accessor :source
    alias :put :unshift
    def load node_want=nil
      with_lock do
        new = []
        node_want_ip = (IPAddr.new(node_want) rescue nil) if node_want
        @source = CFG.source.default
        Oxidized.mgr.add_source @source
        Oxidized.mgr.source[@source].new.load.each do |node|

          # we want to load specific node(s), not all of them
          if node_want
            if node_want_ip
              next unless node_want_ip == node[:ip]
            else
              next unless node[:name].match node_want
            end
          end

          begin
            _node = Node.new node
            new.push _node
          rescue ModelNotFound => err
            Log.error "node %s raised %s with message '%s'" % [node, err.class, err.message]
          rescue Resolv::ResolvError => err
            Log.error "node %s is not resolvable, raised %s with message '%s'" % [node, err.class, err.message]
          end
        end
        replace new
      end
    end

    def list
      with_lock do
        map { |e| e.serialize }
      end
    end

    def show node
      with_lock do
        i = find_node_index node
        self[i].serialize
      end
    end

    def fetch node, group
      with_lock do
        i = find_node_index node
        output = self[i].output.new
        raise Oxidized::NotSupported unless output.respond_to? :fetch
        output.fetch node, group
      end
    end

    # @param node [String] name of the node moved into the head of array
    def next node, opt={}
      with_lock do
        n = waiting.del node
        if n
          n = del node
          n.user = opt['user']
          n.msg  = opt['msg']
          n.from = opt['from']
          # set last job to nil so that the node is picked for immediate update
          n.last = nil
          put n
        end
      end
    end
    alias :top :next

    # @return [String] node from the head of the array
    def get
      with_lock do
        (self << shift).last
      end
    end

    # @param node node which is removed from nodes list
    # @return [Node] deleted node
    def del node
      with_lock do
        delete_at find_node_index(node)
      end
    end

    private

    def initialize opts={}
      super()
      node = opts.delete :node
      @mutex= Mutex.new  # we compete for the nodes with webapi thread
      if nodes = opts.delete(:nodes)
        replace nodes
      else
        load node
      end
    end

    def with_lock &block
      @mutex.synchronize(&block)
    end

    def find_index node
      index { |e| e.name == node }
    end

    def find_node_index node
      find_index node or raise Oxidized::NodeNotFound, "unable to find '#{node}'"
    end

    # @return [Nodes] list of nodes running now
    def running
      Nodes.new :nodes => select { |node| node.running? }
    end

    # @return [Nodes] list of nodes waiting (not running)
    def waiting
      Nodes.new :nodes => select { |node| not node.running? }
    end

  end
end