summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteve Kemp <steve@steve.org.uk>2015-01-19 12:27:15 +0000
committerSteve Kemp <steve@steve.org.uk>2015-01-19 12:27:15 +0000
commit9a05949ea4d669cb192d3e1d04984e27b82b0a69 (patch)
tree1e6dadcb8baf8531f38d82b5fb2c33486c73829c
parent7991f9a5ffc4d09209bf0431c188106a88cd8258 (diff)
Updated to use our queue-abstraction.
This means we can queue/dequeue to either Redis or Beanstalkd.
-rwxr-xr-xbin/custodian-dequeue7
-rwxr-xr-xbin/custodian-enqueue34
-rwxr-xr-xbin/custodian-queue25
-rw-r--r--lib/custodian/queue.rb6
-rw-r--r--lib/custodian/settings.rb12
-rw-r--r--lib/custodian/worker.rb35
6 files changed, 36 insertions, 83 deletions
diff --git a/bin/custodian-dequeue b/bin/custodian-dequeue
index 3072fbd..7a99186 100755
--- a/bin/custodian-dequeue
+++ b/bin/custodian-dequeue
@@ -53,6 +53,7 @@ require 'getoptlong'
# Our code.
#
require 'custodian/settings'
+require 'custodian/queue'
require 'custodian/worker'
@@ -71,7 +72,6 @@ if __FILE__ == $0 then
#
settings = Custodian::Settings.instance()
$SERVER = settings.queue_server
- $QUEUE = settings.queue_name
$ALERTER = settings.alerter
$LOGFILE = settings.log_file
@@ -82,7 +82,6 @@ if __FILE__ == $0 then
[ "--manual", "-m", GetoptLong::NO_ARGUMENT ],
[ "--fail", "-f", GetoptLong::NO_ARGUMENT ],
[ "--logfile", "-l", GetoptLong::REQUIRED_ARGUMENT ],
- [ "--queue", "-q", GetoptLong::REQUIRED_ARGUMENT ],
[ "--server", "-S", GetoptLong::REQUIRED_ARGUMENT ],
[ "--alerter", "-a", GetoptLong::REQUIRED_ARGUMENT ],
[ "--single", "-s", GetoptLong::NO_ARGUMENT ],
@@ -96,8 +95,6 @@ if __FILE__ == $0 then
$LOGFILE = arg
when "--server" then
$SERVER = arg
- when "--queue" then
- $QUEUE = arg
when "--alerter" then
$ALERTER = arg
when "--single" then
@@ -149,7 +146,7 @@ if __FILE__ == $0 then
#
# Create the object
#
- worker = Custodian::Worker.new( $SERVER, $QUEUE, $ALERTER, $LOGFILE, settings )
+ worker = Custodian::Worker.new( $SERVER, $ALERTER, $LOGFILE, settings )
#
diff --git a/bin/custodian-enqueue b/bin/custodian-enqueue
index a6b9828..20654c1 100755
--- a/bin/custodian-enqueue
+++ b/bin/custodian-enqueue
@@ -1,7 +1,7 @@
#!/usr/bin/ruby -Ilib/
#
# NAME
-# custodian-enqueue - Parse tests from a file and enqueue to beanstalkd.
+# custodian-enqueue - Parse tests from a file and enqueue them.
#
# SYNOPSIS
# custodian-enqueue [ -h | --help ]
@@ -19,8 +19,6 @@
#
# -d, --dump Dump the parsed tests to the console; don't insert in the queue.
#
-# -q, --queue Use the named beanstalkd tube.
-#
# --test Test the parsing of the given file, alert on errors.
#
# -f, --file FILE Parse the given configuration file.
@@ -29,17 +27,15 @@
#
# ABOUT
#
-# This tool reads a single configuration file and parses it into a
+# This tool reads a single configuration file and parses it into a
# series of network & protocol tests. These tests are then stored in
-# a beanstalkd queue.
+# a queue from which workers can retrieve and execute them.
#
-# The intention is that the tests will be pulled from the queue and
-# executed by the companion program custodian-dequeue. The dequeing
-# process may occur up numerous other hosts
+# The dequeing process may occur up numerous other hosts
#
# CONFIGURATION FILE
#
-# The configuration file is 99% compatible with that used in the tool
+# The configuration file is 99% compatible with that used in the tool
# custodian replaces.
#
#
@@ -53,13 +49,13 @@
#
# Standard modules
#
-require 'beanstalk-client'
require 'getoptlong'
#
# Our code.
#
require 'custodian/parser'
+require 'custodian/queue'
require 'custodian/settings'
@@ -77,7 +73,6 @@ if __FILE__ == $0 then
#
settings = Custodian::Settings.instance()
$SERVER = settings.queue_server
- $QUEUE = settings.queue_name
begin
opts = GetoptLong.new(
@@ -86,7 +81,6 @@ if __FILE__ == $0 then
[ "--server", GetoptLong::REQUIRED_ARGUMENT ],
[ "--file", "-f", GetoptLong::REQUIRED_ARGUMENT ],
[ "--help", "-h", GetoptLong::NO_ARGUMENT ],
- [ "--queue", "-q", GetoptLong::REQUIRED_ARGUMENT ],
[ "--manual","-m", GetoptLong::NO_ARGUMENT ]
)
opts.each do |opt, arg|
@@ -97,8 +91,6 @@ if __FILE__ == $0 then
ENV["TEST"] = "1"
when "--server" then
$SERVER = arg
- when "--queue" then
- $QUEUE= arg
when "--file" then
ENV["FILE"] = arg
when "--help" then
@@ -146,7 +138,7 @@ if __FILE__ == $0 then
#
# Connected to the server
#
- queue = Beanstalk::Pool.new([$SERVER], $QUEUE )
+ queue = Custodian::QueueType.create( "redis" )
if ( ! queue )
puts "Failed to connect to beanstalk server: #{$SERVER}"
exit 1
@@ -178,17 +170,7 @@ if __FILE__ == $0 then
elsif ( ENV['DUMP'] )
puts test
else
-
- # priority of new job.
- priority = 1000
-
- # delay before putting into queue.
- delay = 0
-
- # time-to-remove: how long job.reserve() will live for.
- ttr = 300
-
- queue.put( test.to_s, priority, delay, ttr )
+ queue.add( test.to_s )
end
end
diff --git a/bin/custodian-queue b/bin/custodian-queue
index afcba2a..23f3841 100755
--- a/bin/custodian-queue
+++ b/bin/custodian-queue
@@ -1,4 +1,4 @@
-#!/usr/bin/ruby
+#!/usr/bin/ruby -Ilib/
#
# NAME
# custodian-queue - Work with the queue.
@@ -42,7 +42,6 @@
#
# Standard modules
#
-require 'beanstalk-client'
require 'getoptlong'
@@ -50,7 +49,7 @@ require 'getoptlong'
# Our code
#
require 'custodian/settings'
-
+require 'custodian/queue'
#
@@ -69,7 +68,6 @@ if __FILE__ == $0 then
#
settings = Custodian::Settings.instance()
$SERVER = settings.queue_server
- $QUEUE = settings.queue_name
begin
opts = GetoptLong.new(
@@ -78,7 +76,6 @@ if __FILE__ == $0 then
[ "--manual", "-m", GetoptLong::NO_ARGUMENT ],
[ "--monitor", "-M", GetoptLong::OPTIONAL_ARGUMENT ],
[ "--server", "-S", GetoptLong::REQUIRED_ARGUMENT ],
- [ "--queue", "-q", GetoptLong::REQUIRED_ARGUMENT ],
[ "--stats", "-s", GetoptLong::NO_ARGUMENT ]
)
opts.each do |opt, arg|
@@ -95,8 +92,6 @@ if __FILE__ == $0 then
$FLUSH = true
when "--server" then
$SERVER = arg
- when "--queue" then
- $QUEUE = arg
when "--help" then
$help = true
when "--manual" then
@@ -140,7 +135,7 @@ if __FILE__ == $0 then
#
# Create the queue object.
#
- queue = Beanstalk::Pool.new([$SERVER], $QUEUE )
+ queue = Custodian::QueueType.create( "redis" )
#
# Alerting on a queue that is too-full?
@@ -150,8 +145,7 @@ if __FILE__ == $0 then
#
# Find the number of jobs
#
- stats = queue.stats()
- jobs = stats['current-jobs-ready'] || 0
+ jobs = queue.size?
if ( jobs > $MONITOR )
exit 1
@@ -165,8 +159,8 @@ if __FILE__ == $0 then
# Showing stats?
#
if ( $STATS )
- stats = queue.stats()
- puts "There are #{stats['current-jobs-ready'] || 0} jobs pending."
+ jobs = queue.size?()
+ puts "There are #{jobs || 0} jobs pending."
exit( 0 )
end
@@ -176,10 +170,11 @@ if __FILE__ == $0 then
#
if ( $FLUSH )
count = 0
- while( true )
+ run = true
+ while( run )
begin
- job = queue.reserve(1)
- job.delete
+ job = queue.fetch(1)
+ exit(0) if ( job.nil? )
count += 1
rescue Beanstalk::TimedOut => ex
puts "Flushed #{count} pending jobs."
diff --git a/lib/custodian/queue.rb b/lib/custodian/queue.rb
index a6e062c..d1f2057 100644
--- a/lib/custodian/queue.rb
+++ b/lib/custodian/queue.rb
@@ -82,7 +82,11 @@ module Custodian
# Fetch a job from the queue - the timeout parameter is ignored.
#
def fetch(timeout)
- @redis.lpop( "queue" )
+ job = false
+ while( ! job )
+ job = @redis.lpop( "queue" )
+ end
+ return( job )
end
#
diff --git a/lib/custodian/settings.rb b/lib/custodian/settings.rb
index 453a35a..edbd798 100644
--- a/lib/custodian/settings.rb
+++ b/lib/custodian/settings.rb
@@ -140,7 +140,7 @@ module Custodian
#
- # The beanstalkd server address
+ # The address of the queue.
#
def queue_server
_load() unless( _loaded? )
@@ -149,16 +149,6 @@ module Custodian
end
- #
- # The name of the beanstalkd tube we'll use
- #
- def queue_name
- _load() unless( _loaded? )
-
- @settings['queue_name'] || "Custodian"
- end
-
-
#
# The filename for the logfile.
diff --git a/lib/custodian/worker.rb b/lib/custodian/worker.rb
index b9c9995..964d345 100644
--- a/lib/custodian/worker.rb
+++ b/lib/custodian/worker.rb
@@ -3,7 +3,6 @@
#
# Standard modules
#
-require 'beanstalk-client'
require 'logger'
@@ -13,7 +12,7 @@ require 'logger'
#
require 'custodian/alerts'
require 'custodian/settings'
-
+require 'custodian/queue'
#
@@ -29,7 +28,7 @@ require 'custodian/protocoltests'
#
-# This class contains the code for connecting to a Beanstalk queue,
+# This class contains the code for connecting to the queue,
# fetching tests from it, and executing them
#
module Custodian
@@ -38,7 +37,7 @@ module Custodian
#
- # The beanstalk queue.
+ # The queue we're using for retrieving tests.
#
attr_reader :queue
@@ -77,10 +76,10 @@ module Custodian
#
# Constructor: Connect to the queue
#
- def initialize( server, queue, alerter, logfile, settings )
+ def initialize( server, alerter, logfile, settings )
# Connect to the queue
- @queue = Beanstalk::Pool.new([server], queue )
+ @queue = QueueType.create( "redis" )
# Get the alerter-type to instantiate
@alerter = alerter
@@ -141,20 +140,14 @@ module Custodian
#
# Acquire a job.
#
- job = @queue.reserve()
- log_message( "Job aquired - Job ID : #{job.id}" )
+ job = @queue.fetch(1)
+ log_message( "Job aquired: #{job}" )
#
# Get the job body
#
- body = job.body
- raise ArgumentError, "Job was empty" if (body.nil?)
- raise ArgumentError, "Job was not a string" unless body.kind_of?(String)
-
- #
- # Output the job.
- #
- log_message( "Job: #{body}" )
+ raise ArgumentError, "Job was empty" if (job.nil?)
+ raise ArgumentError, "Job was not a string" unless job.kind_of?(String)
#
@@ -166,7 +159,7 @@ module Custodian
#
# Create the test-object.
#
- test = Custodian::TestFactory.create( body )
+ test = Custodian::TestFactory.create( job )
start_time = Time.now
@@ -240,14 +233,6 @@ module Custodian
rescue => ex
log_message( "Exception raised processing job: #{ex}" )
- ensure
- #
- # Delete the job - either we received an error, in which case
- # we should remove it to avoid picking it up again, or we handled
- # it successfully so it should be removed.
- #
- log_message( "Job ID : #{job.id} - Removed" )
- job.delete if ( job )
end
return result