summaryrefslogtreecommitdiff
path: root/lib/oxidized
diff options
context:
space:
mode:
Diffstat (limited to 'lib/oxidized')
-rw-r--r--lib/oxidized/config.rb3
-rw-r--r--lib/oxidized/core.rb2
-rw-r--r--lib/oxidized/hook.rb88
-rw-r--r--lib/oxidized/hook/exec.rb84
-rw-r--r--lib/oxidized/hook/noophook.rb9
-rw-r--r--lib/oxidized/manager.rb11
-rw-r--r--lib/oxidized/worker.rb6
7 files changed, 201 insertions, 2 deletions
diff --git a/lib/oxidized/config.rb b/lib/oxidized/config.rb
index 96c8fbf..f45004a 100644
--- a/lib/oxidized/config.rb
+++ b/lib/oxidized/config.rb
@@ -9,10 +9,11 @@ module Oxidized
OutputDir = File.join Directory, %w(lib oxidized output)
ModelDir = File.join Directory, %w(lib oxidized model)
SourceDir = File.join Directory, %w(lib oxidized source)
+ HookDir = File.join Directory, %w(lib oxidized hook)
Sleep = 1
end
class << self
- attr_accessor :mgr
+ attr_accessor :mgr, :Hooks
end
CFGS = Asetus.new :name=>'oxidized', :load=>false, :key_to_s=>true
CFGS.default.username = 'username'
diff --git a/lib/oxidized/core.rb b/lib/oxidized/core.rb
index 71267dd..b70443c 100644
--- a/lib/oxidized/core.rb
+++ b/lib/oxidized/core.rb
@@ -6,6 +6,7 @@ module Oxidized
require 'oxidized/worker'
require 'oxidized/nodes'
require 'oxidized/manager'
+ require 'oxidized/hook'
class << self
def new *args
Core.new args
@@ -15,6 +16,7 @@ module Oxidized
class Core
def initialize args
Oxidized.mgr = Manager.new
+ Oxidized.Hooks = HookManager.from_config CFG
nodes = Nodes.new
@worker = Worker.new nodes
trap('HUP') { nodes.load }
diff --git a/lib/oxidized/hook.rb b/lib/oxidized/hook.rb
new file mode 100644
index 0000000..2763c4f
--- /dev/null
+++ b/lib/oxidized/hook.rb
@@ -0,0 +1,88 @@
+module Oxidized
+class HookManager
+ class << self
+ def from_config cfg
+ mgr = new
+ cfg.hooks.each do |name,h_cfg|
+ h_cfg.events.each do |event|
+ mgr.register event.to_sym, name, h_cfg.type, h_cfg
+ end
+ end
+ mgr
+ end
+ end
+
+ # HookContext is passed to each hook. It can contain anything related to the
+ # event in question. At least it contains the event name
+ class HookContext < OpenStruct; end
+
+ # RegisteredHook is a container for a Hook instance
+ class RegisteredHook < Struct.new(:name, :hook); end
+
+ Events = [
+ :node_success,
+ :node_fail,
+ :post_store,
+ ]
+ attr_reader :registered_hooks
+
+ def initialize
+ @registered_hooks = Hash.new {|h,k| h[k] = []}
+ end
+
+ def register event, name, hook_type, cfg
+ unless Events.include? event
+ raise ArgumentError,
+ "unknown event #{event}, available: #{Events.join ','}"
+ end
+
+ Oxidized.mgr.add_hook hook_type
+ begin
+ hook = Oxidized.mgr.hook.fetch(hook_type).new
+ rescue KeyError
+ raise KeyError, "cannot find hook #{hook_type.inspect}"
+ end
+
+ hook.cfg = cfg
+
+ @registered_hooks[event] << RegisteredHook.new(name, hook)
+ Log.debug "Hook #{name.inspect} registered #{hook.class} for event #{event.inspect}"
+ end
+
+ def handle event, **ctx_params
+ ctx = HookContext.new ctx_params
+ ctx.event = event
+
+ @registered_hooks[event].each do |r_hook|
+ begin
+ r_hook.hook.run_hook ctx
+ rescue => e
+ Log.error "Hook #{r_hook.name} (#{r_hook.hook}) failed " +
+ "(#{e.inspect}) for event #{event.inspect}"
+ end
+ end
+ end
+end
+
+# Hook abstract base class
+class Hook
+ attr_accessor :cfg
+
+ def initialize
+ end
+
+ def cfg=(cfg)
+ @cfg = cfg
+ validate_cfg! if self.respond_to? :validate_cfg!
+ end
+
+ def run_hook ctx
+ raise NotImplementedError
+ end
+
+ def log(msg, level=:info)
+ Log.send(level, "#{self.class.name}: #{msg}")
+ end
+
+end
+end
diff --git a/lib/oxidized/hook/exec.rb b/lib/oxidized/hook/exec.rb
new file mode 100644
index 0000000..eb71466
--- /dev/null
+++ b/lib/oxidized/hook/exec.rb
@@ -0,0 +1,84 @@
+class Exec < Oxidized::Hook
+ include Process
+
+ def initialize
+ super
+ @timeout = 60
+ @async = false
+ end
+
+ def validate_cfg!
+ # Syntax check
+ if cfg.has_key? "timeout"
+ @timeout = cfg.timeout
+ raise "invalid timeout value" unless @timeout.is_a?(Integer) &&
+ @timeout > 0
+ end
+
+ if cfg.has_key? "async"
+ @async = !!cfg.async
+ end
+
+ if cfg.has_key? "cmd"
+ @cmd = cfg.cmd
+ raise "invalid cmd value" unless @cmd.is_a?(String) || @cmd.is_a?(Array)
+ end
+
+ rescue RuntimeError => e
+ raise ArgumentError,
+ "#{self.class.name}: configuration invalid: #{e.message}"
+ end
+
+ def run_hook ctx
+ env = make_env ctx
+ log "Execute: #{@cmd.inspect}", :debug
+ th = Thread.new do
+ begin
+ run_cmd! env
+ rescue => e
+ raise e unless @async
+ end
+ end
+ th.join unless @async
+ end
+
+ def run_cmd! env
+ pid, status = nil, nil
+ Timeout.timeout(@timeout) do
+ pid = spawn env, @cmd , :unsetenv_others => true
+ pid, status = wait2 pid
+ unless status.exitstatus.zero?
+ msg = "#{@cmd.inspect} failed with exit value #{status.exitstatus}"
+ log msg, :error
+ raise msg
+ end
+ end
+ rescue TimeoutError
+ kill "TERM", pid
+ msg = "#{@cmd} timed out"
+ log msg, :error
+ raise TimeoutError, msg
+ end
+
+ def make_env ctx
+ env = {
+ "OX_EVENT" => ctx.event.to_s
+ }
+ if ctx.node
+ env.merge!(
+ "OX_NODE_NAME" => ctx.node.name.to_s,
+ "OX_NODE_FROM" => ctx.node.from.to_s,
+ "OX_NODE_MSG" => ctx.node.msg.to_s,
+ "OX_NODE_GROUP" => ctx.node.group.to_s,
+ "OX_EVENT" => ctx.event.to_s,
+ )
+ end
+ if ctx.job
+ env.merge!(
+ "OX_JOB_STATUS" => ctx.job.status.to_s,
+ "OX_JOB_TIME" => ctx.job.time.to_s,
+ )
+ end
+ env
+ end
+end
diff --git a/lib/oxidized/hook/noophook.rb b/lib/oxidized/hook/noophook.rb
new file mode 100644
index 0000000..d4673ba
--- /dev/null
+++ b/lib/oxidized/hook/noophook.rb
@@ -0,0 +1,9 @@
+class NoopHook < Oxidized::Hook
+ def validate_cfg!
+ log "Validate config"
+ end
+
+ def run_hook ctx
+ log "Run hook with context: #{ctx}"
+ end
+end
diff --git a/lib/oxidized/manager.rb b/lib/oxidized/manager.rb
index b4eaecd..bf28ae7 100644
--- a/lib/oxidized/manager.rb
+++ b/lib/oxidized/manager.rb
@@ -23,12 +23,13 @@ module Oxidized
end
end
end
- attr_reader :input, :output, :model, :source
+ attr_reader :input, :output, :model, :source, :hook
def initialize
@input = {}
@output = {}
@model = {}
@source = {}
+ @hook = {}
end
def add_input method
method = Manager.load Config::InputDir, method
@@ -53,5 +54,13 @@ module Oxidized
return false if _source.empty?
@source.merge! _source
end
+ def add_hook _hook
+ return nil if @hook.key? _hook
+ name = _hook
+ _hook = Manager.load File.join(Config::Root, 'hook'), name
+ _hook = Manager.load Config::HookDir, name if _hook.empty?
+ return false if _hook.empty?
+ @hook.merge! _hook
+ end
end
end
diff --git a/lib/oxidized/worker.rb b/lib/oxidized/worker.rb
index 6bb2a22..eea747e 100644
--- a/lib/oxidized/worker.rb
+++ b/lib/oxidized/worker.rb
@@ -34,12 +34,16 @@ module Oxidized
@jobs.duration job.time
node.running = false
if job.status == :success
+ Oxidized.Hooks.handle :node_success, :node => node,
+ :job => job
msg = "update #{node.name}"
msg += " from #{node.from}" if node.from
msg += " with message '#{node.msg}'" if node.msg
if node.output.new.store node.name, job.config,
:msg => msg, :user => node.user, :group => node.group
Log.info "Configuration updated for #{node.group}/#{node.name}"
+ Oxidized.Hooks.handle :post_store, :node => node,
+ :job => job
end
node.reset
else
@@ -51,6 +55,8 @@ module Oxidized
else
msg += ", retries exhausted, giving up"
node.retry = 0
+ Oxidized.Hooks.handle :node_fail, :node => node,
+ :job => job
end
Log.warn msg
end