summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorSteve Kemp <steve@steve.org.uk>2015-01-19 12:27:15 +0000
committerSteve Kemp <steve@steve.org.uk>2015-01-19 12:27:15 +0000
commit7d90bf7fba34d586c25ed096b8a2fc0fdf311879 (patch)
tree006a0737b075effa6b526bd3ff96a3b30167aa54 /lib
parentca9be00dc45d0a71210ad16423d6d319c87b6727 (diff)
Updated to use our queue-abstraction.
This means we can queue/dequeue to either Redis or Beanstalkd.
Diffstat (limited to 'lib')
-rw-r--r--lib/custodian/queue.rb6
-rw-r--r--lib/custodian/settings.rb12
-rw-r--r--lib/custodian/worker.rb35
3 files changed, 16 insertions, 37 deletions
diff --git a/lib/custodian/queue.rb b/lib/custodian/queue.rb
index a6e062c..d1f2057 100644
--- a/lib/custodian/queue.rb
+++ b/lib/custodian/queue.rb
@@ -82,7 +82,11 @@ module Custodian
# Fetch a job from the queue - the timeout parameter is ignored.
#
def fetch(timeout)
- @redis.lpop( "queue" )
+ job = false
+ while( ! job )
+ job = @redis.lpop( "queue" )
+ end
+ return( job )
end
#
diff --git a/lib/custodian/settings.rb b/lib/custodian/settings.rb
index 453a35a..edbd798 100644
--- a/lib/custodian/settings.rb
+++ b/lib/custodian/settings.rb
@@ -140,7 +140,7 @@ module Custodian
#
- # The beanstalkd server address
+ # The address of the queue.
#
def queue_server
_load() unless( _loaded? )
@@ -149,16 +149,6 @@ module Custodian
end
- #
- # The name of the beanstalkd tube we'll use
- #
- def queue_name
- _load() unless( _loaded? )
-
- @settings['queue_name'] || "Custodian"
- end
-
-
#
# The filename for the logfile.
diff --git a/lib/custodian/worker.rb b/lib/custodian/worker.rb
index b9c9995..964d345 100644
--- a/lib/custodian/worker.rb
+++ b/lib/custodian/worker.rb
@@ -3,7 +3,6 @@
#
# Standard modules
#
-require 'beanstalk-client'
require 'logger'
@@ -13,7 +12,7 @@ require 'logger'
#
require 'custodian/alerts'
require 'custodian/settings'
-
+require 'custodian/queue'
#
@@ -29,7 +28,7 @@ require 'custodian/protocoltests'
#
-# This class contains the code for connecting to a Beanstalk queue,
+# This class contains the code for connecting to the queue,
# fetching tests from it, and executing them
#
module Custodian
@@ -38,7 +37,7 @@ module Custodian
#
- # The beanstalk queue.
+ # The queue we're using for retrieving tests.
#
attr_reader :queue
@@ -77,10 +76,10 @@ module Custodian
#
# Constructor: Connect to the queue
#
- def initialize( server, queue, alerter, logfile, settings )
+ def initialize( server, alerter, logfile, settings )
# Connect to the queue
- @queue = Beanstalk::Pool.new([server], queue )
+ @queue = QueueType.create( "redis" )
# Get the alerter-type to instantiate
@alerter = alerter
@@ -141,20 +140,14 @@ module Custodian
#
# Acquire a job.
#
- job = @queue.reserve()
- log_message( "Job aquired - Job ID : #{job.id}" )
+ job = @queue.fetch(1)
+ log_message( "Job aquired: #{job}" )
#
# Get the job body
#
- body = job.body
- raise ArgumentError, "Job was empty" if (body.nil?)
- raise ArgumentError, "Job was not a string" unless body.kind_of?(String)
-
- #
- # Output the job.
- #
- log_message( "Job: #{body}" )
+ raise ArgumentError, "Job was empty" if (job.nil?)
+ raise ArgumentError, "Job was not a string" unless job.kind_of?(String)
#
@@ -166,7 +159,7 @@ module Custodian
#
# Create the test-object.
#
- test = Custodian::TestFactory.create( body )
+ test = Custodian::TestFactory.create( job )
start_time = Time.now
@@ -240,14 +233,6 @@ module Custodian
rescue => ex
log_message( "Exception raised processing job: #{ex}" )
- ensure
- #
- # Delete the job - either we received an error, in which case
- # we should remove it to avoid picking it up again, or we handled
- # it successfully so it should be removed.
- #
- log_message( "Job ID : #{job.id} - Removed" )
- job.delete if ( job )
end
return result